카테고리 없음

kafka connect

99duuk 2025. 3. 16. 21:28

Kafka와 Kafka Connect 쉽게 이해하기

*Kafka Connect와 es-sink.json 쉽게 이해하기

  1. Kafka Connect란? (우체국 배달 시스템 비유)

Kafka Connect는 Apache Kafka의 일부로, Kafka와 외부 시스템(예: Elasticsearch, 데이터베이스 등)을 연결해주는 "자동 배달 시스템"이에요. Kafka 자체는 메시지를 주고받는 "우체국" 같은 역할을 하지만, 그 메시지를 다른 곳으로 옮기려면 추가 작업이 필요하죠. Kafka Connect는 이 작업을 자동화해줘요.

비유: Kafka가 "우체국"이라면, Kafka Connect는 "배달원"이에요. 여러분이 Python에서 Kafka로 편지(데이터)를 보내면, Kafka Connect가 그 편지를 꺼내서 Elasticsearch(ES)라는 "집"으로 배달해줍니다.
핵심 역할: 코드를 작성하지 않고 설정 파일만으로 데이터를 옮길 수 있게 해줘요. 이게 여러분이 기존에 "Spring" 같은 중간 계층을 썼던 걸 대체한 이유예요.

Kafka Connect에는 두 가지 주요 구성 요소가 있어요:

Source Connector: 외부에서 Kafka로 데이터를 가져오는 역할 (예: 파일 → Kafka).
Sink Connector: Kafka에서 외부로 데이터를 보내는 역할 (예: Kafka → ES). 여러분이 이번에 쓴 건 Sink Connector예요.*

Kafka Connect 설정 및 테스트 단계 (업데이트 버전)

목표

  • Python에서 Kafka로 메시지를 발행하고, Kafka Connect를 통해 Elasticsearch(ES)로 데이터를 자동 싱크하는 파이프라인 구축.
  • Kafka Connect는 Kafka 토픽 데이터를 ES로 "배달"해주는 중간 다리 역할.

Step 1: Docker Compose에 Kafka Connect 추가

  • 역할: Kafka Connect를 Docker 환경에 추가해서 Kafka와 ES를 연결할 "자동 배달 시스템" 준비.
  • 수정된 docker-compose.yml (Kafka Connect 부분):
    kafka-connect:
      image: confluentinc/cp-kafka-connect:7.3.0
      container_name: DA-kafka-connect
      ports:
        - "8083:8083"
      environment:
        CONNECT_BOOTSTRAP_SERVERS: "kafka:19092"
        CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
        CONNECT_GROUP_ID: "connect-group"
        CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
        CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
        CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
        CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
        CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
        CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
        CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
        CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      depends_on:
        - kafka
        - es
      volumes:
        - ./docker-data/kafka-connect:/usr/share/confluent-hub-components
      networks:
        - da_net
  • 명령어:
    docker-compose up -d
  • 설명:
    • CONNECT_BOOTSTRAP_SERVERS: Kafka 우체국 주소.
    • 8083: Kafka Connect 관리용 REST API 포트.
    • depends_on: Kafka와 ES가 먼저 실행돼야 배달 시작 가능.

Step 2: Kafka Connect에 Elasticsearch Sink Connector 설치

  • 역할: Kafka Connect가 ES로 데이터를 배달할 수 있게 "배달 트럭" 설치.
  • 명령어:
    docker exec -it DA-kafka-connect bash
    confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.5
    exit
    docker restart DA-kafka-connect
  • 설명:
    • confluent-hub install: ES용 배달 트럭(Sink Connector) 설치.
    • 11.1.5: 안정적인 버전.
    • restart: 설치 후 트럭 재시동.
  • 확인 명령어:
    curl -X GET "http://localhost:8083/connector-plugins"
    • "confluentinc-kafka-connect-elasticsearch"가 보이면 설치 성공.

Step 3: Sink Connector 설정 파일 생성 (es-sink.json)

  • 역할: 배달 트럭에 "어디서 뭘 가져와 어디로 보낼지" 배달 지시서 작성.
  • 파일 내용 (es-sink.json):
    {
        "name": "es-sink",
        "config": {
            "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
            "tasks.max": "1",
            "topics": "image_tags",
            "connection.url": "http://DA-es:9200",
            "type.name": "_doc",
            "key.ignore": "true",
            "schema.ignore": "true",
            "index.name": "image_tags",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "false"
        }
    }
  • 설명:
    • topics: image_tags 토픽에서 편지 꺼냄.
    • connection.url: ES 집 주소 (DA-es:9200).
    • schemas.enable=false: 편지가 단순 JSON이라 추가 포장 없이 배달.
  • 파일 생성:
    vi es-sink.json
    # 위 내용 붙여넣기 후 :wq

각 줄의 의미:

  1. name: "es-sink"

    • 배달 작업의 이름. Kafka Connect가 이 이름을 보고 어떤 배달인지 구분해요.
  2. connector.class: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"

    • 배달 트럭의 종류. 이건 "Elasticsearch로 보내는 전용 트럭"을 뜻해요. 다른 트럭(예: MySQL Sink Connector)도 있지만, 여러분은 ES로 보내니까 이걸 썼어요.
  3. tasks.max: "1"

    • 배달원을 몇 명 쓸지. "1"은 배달원이 한 명뿐이라는 뜻. 데이터가 많아지면 이걸 늘려서 병렬로 배달할 수 있어요.
  4. topics: "image_tags"

    • 어떤 사서함에서 편지를 꺼낼지. 여러분의 Python이 image_tags 토픽에 데이터를 넣으니까 여기서 가져오라고 알려줘요.
  5. connection.url: "http://DA-es:9200"

    • 배달할 집 주소. ES가 DA-es라는 컨테이너에서 실행되고, 포트 9200에서 기다리고 있어요.
  6. type.name: "_doc"

    • ES에서 데이터를 저장할 때 붙이는 라벨. 예전 ES 버전에서는 중요했지만, 최신 버전(7.x 이상)에서는 거의 무시돼요. 그냥 "_doc"으로 두면 안전해요.
  7. key.ignore: "true"

    • Kafka 메시지의 "키"를 무시할지 여부. 여러분의 데이터는 키가 필요 없어서 무시하도록 설정했어요. (키는 보통 고유 식별자 역할을 하지만 여기선 필요 없음.)
  8. schema.ignore: "true"

    • 데이터의 "형식(스키마)"를 무시할지 여부. 여러분의 데이터는 단순 JSON이라 복잡한 형식 정의가 없어도 ES가 알아서 처리해요.
  9. index.name: "image_tags"

    • ES에서 데이터를 저장할 "폴더" 이름. image_tags라는 인덱스에 데이터가 쌓여요. (ES에서 인덱스는 데이터베이스의 테이블 같은 개념.)
  10. value.converter: "org.apache.kafka.connect.json.JsonConverter"

    • 편지를 어떻게 읽을지. 여러분의 데이터는 JSON 형식이라 JSON으로 변환하는 도구를 써요.
  11. value.converter.schemas.enable: "false"

    • 편지에 "설명서(스키마)"를 붙일지 여부. 여러분은 단순 JSON을 쓰니까 설명서 없이 보내라고 설정했어요.

Step 4: Sink Connector 등록

  • 역할: 배달 지시서를 Kafka Connect에 등록해서 배달 시작.
  • 명령어:
    curl -X POST -H "Content-Type: application/json" --data @es-sink.json "http://localhost:8083/connectors"
  • 성공 응답:
    {"name":"es-sink","config":{...},"tasks":[],"type":"sink"}
  • 상태 확인:
    curl -X GET "http://localhost:8083/connectors/es-sink/status"
    • "state": "RUNNING"이면 배달 중.

Step 5: Kafka 토픽 생성 및 메시지 발행 테스트

  • 역할: Kafka 우체국에 image_tags 사서함 만들고, Python이 편지 넣는지 확인.
  • 토픽 생성 (필요 시):
    docker exec -it DA-kafka-1 /usr/bin/kafka-topics --create --topic image_tags --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  • Python 실행:
    python main.py
    • ~/Desktop/duukrchive에 이미지 추가.
    • 로그: "Sent to Kafka topic 'image_tags': ..."
  • 메시지 확인:
    docker exec -it DA-kafka-1 /usr/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic image_tags --from-beginning
    • 출력 예: {"file_path": "...", "primary_tag": "vehicle", "tags": ["white"], "timestamp": "..."}

Step 6: ES 데이터 저장 확인

  • 역할: 배달된 편지가 ES 집에 잘 도착했는지 확인.
  • 명령어:
    curl -X GET "http://localhost:9200/image_tags/_search?pretty"
  • 성공 응답:
    {
      "hits": {
        "hits": [
          {"_source": {"file_path": "...", "primary_tag": "vehicle", "tags": ["white"], "timestamp": "..."}}
        ]
      }
    }
  • 문제 시 로그 확인:
    docker logs DA-kafka-connect

Step 7: 문제 해결 (필요 시)

  • 역할: 배달이 안 됐을 때 문제 찾아 수정.
  • 명령어:
    • 상태 확인:
      curl -X GET "http://localhost:8083/connectors/es-sink/status"
    • 커넥터 재등록:
      curl -X DELETE "http://localhost:8083/connectors/es-sink"
      curl -X POST -H "Content-Type: application/json" --data @es-sink.json "http://localhost:8083/connectors"
  • 설명: 배달 지시서 잘못됐으면 삭제 후 다시 등록.

전체 흐름 (우체국 비유)

  1. Python: 이미지 태그를 편지로 써서 Kafka 우체국(image_tags 사서함)에 넣음.
  2. Kafka: 편지를 보관.
  3. Kafka Connect: 우체국에서 편지를 꺼내 ES 집으로 자동 배달.
  4. ES: 편지(태그 데이터)를 저장하고 검색 가능하게 함.

나중에 다시 할 때 팁

  • 파일 준비: docker-compose.yml, es-sink.json을 프로젝트 폴더에 저장.
  • 스크립트化:
    #!/bin/bash
    docker-compose up -d
    docker exec -it DA-kafka-connect bash -c "confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.5"
    docker restart DA-kafka-connect
    curl -X POST -H "Content-Type: application/json" --data @es-sink.json "http://localhost:8083/connectors"
    python main.py
    • chmod +x setup.sh./setup.sh로 한 방에 실행.