Kafka와 Kafka Connect 쉽게 이해하기
*Kafka Connect와 es-sink.json 쉽게 이해하기
- Kafka Connect란? (우체국 배달 시스템 비유)
Kafka Connect는 Apache Kafka의 일부로, Kafka와 외부 시스템(예: Elasticsearch, 데이터베이스 등)을 연결해주는 "자동 배달 시스템"이에요. Kafka 자체는 메시지를 주고받는 "우체국" 같은 역할을 하지만, 그 메시지를 다른 곳으로 옮기려면 추가 작업이 필요하죠. Kafka Connect는 이 작업을 자동화해줘요.
비유: Kafka가 "우체국"이라면, Kafka Connect는 "배달원"이에요. 여러분이 Python에서 Kafka로 편지(데이터)를 보내면, Kafka Connect가 그 편지를 꺼내서 Elasticsearch(ES)라는 "집"으로 배달해줍니다.
핵심 역할: 코드를 작성하지 않고 설정 파일만으로 데이터를 옮길 수 있게 해줘요. 이게 여러분이 기존에 "Spring" 같은 중간 계층을 썼던 걸 대체한 이유예요.
Kafka Connect에는 두 가지 주요 구성 요소가 있어요:
Source Connector: 외부에서 Kafka로 데이터를 가져오는 역할 (예: 파일 → Kafka).
Sink Connector: Kafka에서 외부로 데이터를 보내는 역할 (예: Kafka → ES). 여러분이 이번에 쓴 건 Sink Connector예요.*
Kafka Connect 설정 및 테스트 단계 (업데이트 버전)
목표
- Python에서 Kafka로 메시지를 발행하고, Kafka Connect를 통해 Elasticsearch(ES)로 데이터를 자동 싱크하는 파이프라인 구축.
- Kafka Connect는 Kafka 토픽 데이터를 ES로 "배달"해주는 중간 다리 역할.
Step 1: Docker Compose에 Kafka Connect 추가
- 역할: Kafka Connect를 Docker 환경에 추가해서 Kafka와 ES를 연결할 "자동 배달 시스템" 준비.
- 수정된
docker-compose.yml
(Kafka Connect 부분):kafka-connect: image: confluentinc/cp-kafka-connect:7.3.0 container_name: DA-kafka-connect ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: "kafka:19092" CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" CONNECT_GROUP_ID: "connect-group" CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs" CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets" CONNECT_STATUS_STORAGE_TOPIC: "connect-status" CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" depends_on: - kafka - es volumes: - ./docker-data/kafka-connect:/usr/share/confluent-hub-components networks: - da_net
- 명령어:
docker-compose up -d
- 설명:
CONNECT_BOOTSTRAP_SERVERS
: Kafka 우체국 주소.8083
: Kafka Connect 관리용 REST API 포트.depends_on
: Kafka와 ES가 먼저 실행돼야 배달 시작 가능.
Step 2: Kafka Connect에 Elasticsearch Sink Connector 설치
- 역할: Kafka Connect가 ES로 데이터를 배달할 수 있게 "배달 트럭" 설치.
- 명령어:
docker exec -it DA-kafka-connect bash confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.5 exit docker restart DA-kafka-connect
- 설명:
confluent-hub install
: ES용 배달 트럭(Sink Connector) 설치.11.1.5
: 안정적인 버전.restart
: 설치 후 트럭 재시동.
- 확인 명령어:
curl -X GET "http://localhost:8083/connector-plugins"
"confluentinc-kafka-connect-elasticsearch"
가 보이면 설치 성공.
Step 3: Sink Connector 설정 파일 생성 (es-sink.json
)
- 역할: 배달 트럭에 "어디서 뭘 가져와 어디로 보낼지" 배달 지시서 작성.
- 파일 내용 (
es-sink.json
):{ "name": "es-sink", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "topics": "image_tags", "connection.url": "http://DA-es:9200", "type.name": "_doc", "key.ignore": "true", "schema.ignore": "true", "index.name": "image_tags", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false" } }
- 설명:
topics
:image_tags
토픽에서 편지 꺼냄.connection.url
: ES 집 주소 (DA-es:9200
).schemas.enable=false
: 편지가 단순 JSON이라 추가 포장 없이 배달.
- 파일 생성:
vi es-sink.json # 위 내용 붙여넣기 후 :wq
각 줄의 의미:
name
: "es-sink"- 배달 작업의 이름. Kafka Connect가 이 이름을 보고 어떤 배달인지 구분해요.
connector.class
: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"- 배달 트럭의 종류. 이건 "Elasticsearch로 보내는 전용 트럭"을 뜻해요. 다른 트럭(예: MySQL Sink Connector)도 있지만, 여러분은 ES로 보내니까 이걸 썼어요.
tasks.max
: "1"- 배달원을 몇 명 쓸지. "1"은 배달원이 한 명뿐이라는 뜻. 데이터가 많아지면 이걸 늘려서 병렬로 배달할 수 있어요.
topics
: "image_tags"- 어떤 사서함에서 편지를 꺼낼지. 여러분의 Python이
image_tags
토픽에 데이터를 넣으니까 여기서 가져오라고 알려줘요.
- 어떤 사서함에서 편지를 꺼낼지. 여러분의 Python이
connection.url
: "http://DA-es:9200"- 배달할 집 주소. ES가
DA-es
라는 컨테이너에서 실행되고, 포트9200
에서 기다리고 있어요.
- 배달할 집 주소. ES가
type.name
: "_doc"- ES에서 데이터를 저장할 때 붙이는 라벨. 예전 ES 버전에서는 중요했지만, 최신 버전(7.x 이상)에서는 거의 무시돼요. 그냥 "_doc"으로 두면 안전해요.
key.ignore
: "true"- Kafka 메시지의 "키"를 무시할지 여부. 여러분의 데이터는 키가 필요 없어서 무시하도록 설정했어요. (키는 보통 고유 식별자 역할을 하지만 여기선 필요 없음.)
schema.ignore
: "true"- 데이터의 "형식(스키마)"를 무시할지 여부. 여러분의 데이터는 단순 JSON이라 복잡한 형식 정의가 없어도 ES가 알아서 처리해요.
index.name
: "image_tags"- ES에서 데이터를 저장할 "폴더" 이름.
image_tags
라는 인덱스에 데이터가 쌓여요. (ES에서 인덱스는 데이터베이스의 테이블 같은 개념.)
- ES에서 데이터를 저장할 "폴더" 이름.
value.converter
: "org.apache.kafka.connect.json.JsonConverter"- 편지를 어떻게 읽을지. 여러분의 데이터는 JSON 형식이라 JSON으로 변환하는 도구를 써요.
value.converter.schemas.enable
: "false"- 편지에 "설명서(스키마)"를 붙일지 여부. 여러분은 단순 JSON을 쓰니까 설명서 없이 보내라고 설정했어요.
Step 4: Sink Connector 등록
- 역할: 배달 지시서를 Kafka Connect에 등록해서 배달 시작.
- 명령어:
curl -X POST -H "Content-Type: application/json" --data @es-sink.json "http://localhost:8083/connectors"
- 성공 응답:
{"name":"es-sink","config":{...},"tasks":[],"type":"sink"}
- 상태 확인:
curl -X GET "http://localhost:8083/connectors/es-sink/status"
"state": "RUNNING"
이면 배달 중.
Step 5: Kafka 토픽 생성 및 메시지 발행 테스트
- 역할: Kafka 우체국에
image_tags
사서함 만들고, Python이 편지 넣는지 확인. - 토픽 생성 (필요 시):
docker exec -it DA-kafka-1 /usr/bin/kafka-topics --create --topic image_tags --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
- Python 실행:
python main.py
~/Desktop/duukrchive
에 이미지 추가.- 로그:
"Sent to Kafka topic 'image_tags': ..."
- 메시지 확인:
docker exec -it DA-kafka-1 /usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic image_tags --from-beginning
- 출력 예:
{"file_path": "...", "primary_tag": "vehicle", "tags": ["white"], "timestamp": "..."}
- 출력 예:
Step 6: ES 데이터 저장 확인
- 역할: 배달된 편지가 ES 집에 잘 도착했는지 확인.
- 명령어:
curl -X GET "http://localhost:9200/image_tags/_search?pretty"
- 성공 응답:
{ "hits": { "hits": [ {"_source": {"file_path": "...", "primary_tag": "vehicle", "tags": ["white"], "timestamp": "..."}} ] } }
- 문제 시 로그 확인:
docker logs DA-kafka-connect
Step 7: 문제 해결 (필요 시)
- 역할: 배달이 안 됐을 때 문제 찾아 수정.
- 명령어:
- 상태 확인:
curl -X GET "http://localhost:8083/connectors/es-sink/status"
- 커넥터 재등록:
curl -X DELETE "http://localhost:8083/connectors/es-sink" curl -X POST -H "Content-Type: application/json" --data @es-sink.json "http://localhost:8083/connectors"
- 상태 확인:
- 설명: 배달 지시서 잘못됐으면 삭제 후 다시 등록.
전체 흐름 (우체국 비유)
- Python: 이미지 태그를 편지로 써서 Kafka 우체국(
image_tags
사서함)에 넣음. - Kafka: 편지를 보관.
- Kafka Connect: 우체국에서 편지를 꺼내 ES 집으로 자동 배달.
- ES: 편지(태그 데이터)를 저장하고 검색 가능하게 함.
나중에 다시 할 때 팁
- 파일 준비:
docker-compose.yml
,es-sink.json
을 프로젝트 폴더에 저장. - 스크립트化:
#!/bin/bash docker-compose up -d docker exec -it DA-kafka-connect bash -c "confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.5" docker restart DA-kafka-connect curl -X POST -H "Content-Type: application/json" --data @es-sink.json "http://localhost:8083/connectors" python main.py
chmod +x setup.sh
후./setup.sh
로 한 방에 실행.