상세 컨텐츠

본문 제목

모니터링 항목별 스케줄러 로직

파이썬

by amanda.hyon 2024. 9. 20. 00:29

본문

Use Case (요구사항정의)

  1. 모니터링 항목의 데이터들을 수집하기 위해 파이썬 프로그램을 만들어야 한다.
  2. 이를 위해 나는 '마지막 실행시각', '실행주기', '모니터링 결과값' 등을 저장하는 history 테이블을 만들었다.
  3. 모니터링 프로그램이 실행되면 history 테이블에 저장된 "설정주기"를 참조하여 모니터링 데이터들을 하나씩 수집해야 한다.
  4. 모니터링 실행 결과 데이터는 "결과값"과 "마지막 실행시각"을 저장하고 다음 모니터링 데이터를 수집할 항목을 실행한다. 만약 다음 모니터링할 항목이 없다면 가장 마지막에 실행된 항목의 "마지막실행시각"과 "실행주기"를 참조하여 자동으로 그만큼 대기할 수 있다
  5. 대기시간이 완료되면 그 다음을 실행한다.
  6. 모니터링 항목별 데이터를 수집할때 실행 지연시간이 있을 수 있으며, 이런 부분의 영향을 받지 않도록 비동기적으로 실행될 수 있도록 하는 방안이나 더 좋은 방법이 있으면 그 방법에 따라 구현해야한다.    


Flow chart

테이블 정보

* mysql 기준

CREATE TABLE history (
    item VARCHAR(255) PRIMARY KEY,
    last_execution_time TIMESTAMP NOT NULL,
    execution_interval INT NOT NULL,
    result FLOAT
);

 


목업데이터 생성기

import mysql.connector
from datetime import datetime
import random

config = {
    'user': 'root',
    'password': '비번',
    'host': '127.0.0.1',
    'database': 'monitoring',
    'raise_on_warnings': True
}

conn = mysql.connector.connect(**config)
cursor = conn.cursor()

# 20개의 테스트 데이터 생성

for i in range(20):
    item = f"item{i}"
    last_execution_time = datetime.now()
    execution_interval = 10
    cursor.execute("INSERT INTO history (item, last_execution_time, execution_interval) VALUES (%s, %s, %s)",
                   (item, last_execution_time, execution_interval))

conn.commit()


스케줄러 실행 PoC 프로그램

import asyncio
import mysql.connector
from datetime import datetime, timedelta
import random

# 이 함수는 모니터링 항목을 실행하고 결과를 반환하는 함수를 대신합니다.

async def monitoring_task(item):
    random_num = random.random()
    await asyncio.sleep(random_num*3)  # 실제 모니터링 작업을 수행한다고 가정.
    return 100.0+random_num*10  # 모니터링 작업 결과에 대한 임의의 값.

async def main():
    config = {
        'user': 'root',
        'password': '비번',
        'host': '127.0.0.1',
        'database': 'monitoring',
        'raise_on_warnings': True
    }

    conn = mysql.connector.connect(**config)
    cursor = conn.cursor()

    while True:
        cursor.execute("SELECT * FROM history ORDER BY last_execution_time")
        rows = cursor.fetchall()

        for row in rows:
            item, last_execution_time, execution_interval, _ = row
            next_execution_time = last_execution_time.timestamp() + execution_interval
            current_time = datetime.now().timestamp()
            if next_execution_time <= current_time:
                # 이 항목을 모니터링해야 합니다.
                result = await monitoring_task(row)
                print(f"실행아이템 {item} : {result}, 주기 : {execution_interval}")

                try:
                    cursor.execute("UPDATE history SET last_execution_time = FROM_UNIXTIME(%s), result = %s WHERE item = %s",
                                (current_time, result, row[0]))
                    conn.commit()                               
                except Exception as e:
                    print(e)
            else:
                # 다음 실행까지 대기
                await asyncio.sleep(next_execution_time - current_time)

if __name__ == "__main__":
    asyncio.run(main())

 

  • 실행환경은 파이썬 3.10
  • 테스트 방법은
    • 그냥 실행
    • 중간에 주기를 바꿔서 실행


추가 개선의 의지

  • 상기 코드에는 큐관리가 되어 있지 않다.
  • 만약 큐관리를 추가한다면 더 나은 프로세스로 구현이 가능할 것으로 보인다.
       

고려할만한 전략

  • Job grouping : monitoring pre-run을 통한 사전 process 정보 획득 후, 큰 틀의 느낌으로 실행 주기별 모니터링 항목 그룹
  • Error handling : 실패한 job의 경우, 해당 job의 group정보를 기반으로 재 스케줄링.
  • Queue Management : 스케줄링 구현에 python이 이용된다면, ‘asyncio.Queue’를 사용해 queue를 관리.  [ asyncio.Queue외의 다른 queue관리 fcn : RabbitMQ, Kafka, Redis, etc… ]

관련글 더보기

댓글 영역