필수 라이브러리

kafka-python==2.1.5 httpx==0.28.1 openpyxl==3.1.5 pandas==2.2.3 xmltodict==0.14.2 schedule==1.2.2

import pandas as pd
import httpx
import schedule
import json
from urllib.parse import quote
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from variable import KAFKA_BROKER, TOPIC_NAME, API_KEY, BASE_URL, EXCEL_FILE
import xmltodict
import time

# Kafka 토픽 자동 생성 함수
def create_topic_if_not_exists(topic_name):
    admin = KafkaAdminClient(bootstrap_servers=KAFKA_BROKER)
    if topic_name not in admin.list_topics():
        topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
        admin.create_topics(new_topics=[topic])
        print(f"Kafka 토픽 생성: {topic_name}")
    else:
        print(f"Kafka 토픽 이미 존재: {topic_name}")
    admin.close()

def run_task():
    # 카프카 프로듀서
    producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),  # JSON 직렬화
    key_serializer=lambda k: k.encode('utf-8')
    )
    # 토픽 생성
    create_topic_if_not_exists(TOPIC_NAME)

    # 엑셀 로드
    df = pd.read_excel(EXCEL_FILE)

    # 도시별 API 호출
    for CITY_NAME in df['AREA_NM'].values:
        encoded_city = quote(CITY_NAME, safe='')
        url = f"{BASE_URL}/{API_KEY}/xml/citydata/1/5/{encoded_city}"

        success = False
        for attempt in range(3):  # 최대 3회 재시도
            try:
                response = httpx.get(url, timeout=10.0)  # ⏱ 타임아웃 10초로 연장
                if response.status_code == 200:
                    xml_content = response.text
                    json_data = xmltodict.parse(xml_content)

                    # 일단 지우기
                    del json_data["SeoulRtd.citydata"]['RESULT']
                    del json_data["SeoulRtd.citydata"]['CITYDATA']['BUS_STN_STTS']
                    del json_data["SeoulRtd.citydata"]['CITYDATA']['SUB_STTS']
                    del json_data["SeoulRtd.citydata"]['CITYDATA']['CHARGER_STTS']
                    del json_data["SeoulRtd.citydata"]['CITYDATA']['LIVE_CMRCL_STTS']
                    
                    # Kafka 전송 (JSON 형식)
                    producer.send(TOPIC_NAME, key=CITY_NAME, value=json_data)
                    print(f"전송 성공: {CITY_NAME}")
                    success = True
                    break
                else:
                    print(f"응답 오류 ({response.status_code}) - {CITY_NAME}")
            except Exception as e:
                print(f"재시도 {attempt+1}/3 - {CITY_NAME} 오류: {e}")
                time.sleep(2)
        if not success:
            print(f"최종 실패: {CITY_NAME}")
    # 마무리
    producer.flush()
    producer.close()
    with open("kafka_producer_log.txt", "a") as log_file:
            log_file.write(f"Kafka Producer 종료: {time.strftime('%Y-%m-%d %H:%M:%S')}\\n")

if __name__ == "__main__":
    run_task()  # 즉시 실행
    # 5분마다 스케줄링 실행
    schedule.every(5).minutes.do(run_task)
    while True:
        schedule.run_pending()
        time.sleep(1)