Use Case (요구사항정의)
- 모니터링 항목의 데이터들을 수집하기 위해 파이썬 프로그램을 만들어야 한다.
- 이를 위해 나는 '마지막 실행시각', '실행주기', '모니터링 결과값' 등을 저장하는 history 테이블을 만들었다.
- 모니터링 프로그램이 실행되면 history 테이블에 저장된 "설정주기"를 참조하여 모니터링 데이터들을 하나씩 수집해야 한다.
- 모니터링 실행 결과 데이터는 "결과값"과 "마지막 실행시각"을 저장하고 다음 모니터링 데이터를 수집할 항목을 실행한다. 만약 다음 모니터링할 항목이 없다면 가장 마지막에 실행된 항목의 "마지막실행시각"과 "실행주기"를 참조하여 자동으로 그만큼 대기할 수 있다
- 대기시간이 완료되면 그 다음을 실행한다.
- 모니터링 항목별 데이터를 수집할때 실행 지연시간이 있을 수 있으며, 이런 부분의 영향을 받지 않도록 비동기적으로 실행될 수 있도록 하는 방안이나 더 좋은 방법이 있으면 그 방법에 따라 구현해야한다.
Flow chart
테이블 정보
* mysql 기준
| CREATE TABLE history ( |
| item VARCHAR(255) PRIMARY KEY, |
| last_execution_time TIMESTAMP NOT NULL, |
| execution_interval INT NOT NULL, |
| result FLOAT |
| ); |
목업데이터 생성기
| import mysql.connector |
| from datetime import datetime |
| import random |
| |
| config = { |
| 'user': 'root', |
| 'password': '비번', |
| 'host': '127.0.0.1', |
| 'database': 'monitoring', |
| 'raise_on_warnings': True |
| } |
| |
| conn = mysql.connector.connect(**config) |
| cursor = conn.cursor() |
| |
| |
| |
| for i in range(20): |
| item = f"item{i}" |
| last_execution_time = datetime.now() |
| execution_interval = 10 |
| cursor.execute("INSERT INTO history (item, last_execution_time, execution_interval) VALUES (%s, %s, %s)", |
| (item, last_execution_time, execution_interval)) |
| |
| conn.commit() |
스케줄러 실행 PoC 프로그램
| import asyncio |
| import mysql.connector |
| from datetime import datetime, timedelta |
| import random |
| |
| |
| |
| async def monitoring_task(item): |
| random_num = random.random() |
| await asyncio.sleep(random_num*3) |
| return 100.0+random_num*10 |
| |
| async def main(): |
| config = { |
| 'user': 'root', |
| 'password': '비번', |
| 'host': '127.0.0.1', |
| 'database': 'monitoring', |
| 'raise_on_warnings': True |
| } |
| |
| conn = mysql.connector.connect(**config) |
| cursor = conn.cursor() |
| |
| while True: |
| cursor.execute("SELECT * FROM history ORDER BY last_execution_time") |
| rows = cursor.fetchall() |
| |
| for row in rows: |
| item, last_execution_time, execution_interval, _ = row |
| next_execution_time = last_execution_time.timestamp() + execution_interval |
| current_time = datetime.now().timestamp() |
| if next_execution_time <= current_time: |
| |
| result = await monitoring_task(row) |
| print(f"실행아이템 {item} : {result}, 주기 : {execution_interval}") |
| |
| try: |
| cursor.execute("UPDATE history SET last_execution_time = FROM_UNIXTIME(%s), result = %s WHERE item = %s", |
| (current_time, result, row[0])) |
| conn.commit() |
| except Exception as e: |
| print(e) |
| else: |
| |
| await asyncio.sleep(next_execution_time - current_time) |
| |
| if __name__ == "__main__": |
| asyncio.run(main()) |
추가 개선의 의지
- 상기 코드에는 큐관리가 되어 있지 않다.
- 만약 큐관리를 추가한다면 더 나은 프로세스로 구현이 가능할 것으로 보인다.
고려할만한 전략
- Job grouping : monitoring pre-run을 통한 사전 process 정보 획득 후, 큰 틀의 느낌으로 실행 주기별 모니터링 항목 그룹
- Error handling : 실패한 job의 경우, 해당 job의 group정보를 기반으로 재 스케줄링.
- Queue Management : 스케줄링 구현에 python이 이용된다면, ‘asyncio.Queue’를 사용해 queue를 관리. [ asyncio.Queue외의 다른 queue관리 fcn : RabbitMQ, Kafka, Redis, etc… ]
댓글 영역