필수 라이브러리
kafka-python==2.1.5 httpx==0.28.1 openpyxl==3.1.5 pandas==2.2.3 xmltodict==0.14.2 schedule==1.2.2
import pandas as pd
import httpx
import schedule
import json
from urllib.parse import quote
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from variable import KAFKA_BROKER, TOPIC_NAME, API_KEY, BASE_URL, EXCEL_FILE
import xmltodict
import time
# Kafka 토픽 자동 생성 함수
def create_topic_if_not_exists(topic_name):
admin = KafkaAdminClient(bootstrap_servers=KAFKA_BROKER)
if topic_name not in admin.list_topics():
topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
admin.create_topics(new_topics=[topic])
print(f"Kafka 토픽 생성: {topic_name}")
else:
print(f"Kafka 토픽 이미 존재: {topic_name}")
admin.close()
def run_task():
# 카프카 프로듀서
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKER,
value_serializer=lambda v: json.dumps(v).encode('utf-8'), # JSON 직렬화
key_serializer=lambda k: k.encode('utf-8')
)
# 토픽 생성
create_topic_if_not_exists(TOPIC_NAME)
# 엑셀 로드
df = pd.read_excel(EXCEL_FILE)
# 도시별 API 호출
for CITY_NAME in df['AREA_NM'].values:
encoded_city = quote(CITY_NAME, safe='')
url = f"{BASE_URL}/{API_KEY}/xml/citydata/1/5/{encoded_city}"
success = False
for attempt in range(3): # 최대 3회 재시도
try:
response = httpx.get(url, timeout=10.0) # ⏱ 타임아웃 10초로 연장
if response.status_code == 200:
xml_content = response.text
json_data = xmltodict.parse(xml_content)
# 일단 지우기
del json_data["SeoulRtd.citydata"]['RESULT']
del json_data["SeoulRtd.citydata"]['CITYDATA']['BUS_STN_STTS']
del json_data["SeoulRtd.citydata"]['CITYDATA']['SUB_STTS']
del json_data["SeoulRtd.citydata"]['CITYDATA']['CHARGER_STTS']
del json_data["SeoulRtd.citydata"]['CITYDATA']['LIVE_CMRCL_STTS']
# Kafka 전송 (JSON 형식)
producer.send(TOPIC_NAME, key=CITY_NAME, value=json_data)
print(f"전송 성공: {CITY_NAME}")
success = True
break
else:
print(f"응답 오류 ({response.status_code}) - {CITY_NAME}")
except Exception as e:
print(f"재시도 {attempt+1}/3 - {CITY_NAME} 오류: {e}")
time.sleep(2)
if not success:
print(f"최종 실패: {CITY_NAME}")
# 마무리
producer.flush()
producer.close()
with open("kafka_producer_log.txt", "a") as log_file:
log_file.write(f"Kafka Producer 종료: {time.strftime('%Y-%m-%d %H:%M:%S')}\\n")
if __name__ == "__main__":
run_task() # 즉시 실행
# 5분마다 스케줄링 실행
schedule.every(5).minutes.do(run_task)
while True:
schedule.run_pending()
time.sleep(1)