관리 메뉴

Mini

[kafka] docker로 kafka, kafka-connect 설치하기, 실습 본문

DevOps/Docker

[kafka] docker로 kafka, kafka-connect 설치하기, 실습

Mini_96 2025. 2. 16. 01:51

* network 만들기

docker network create --gateway 172.18.0.1 --subnet 172.18.0.0/16 ecommerce-network
  • 같은 네트워크내에서는 다른 서비스 호출시 이름으로 가능하다.

 

* 삽질

  • kafka connect에 jdbc 설치
# 컨테이너 ID가 예를 들어 'abc123'이라면
docker exec -it abc123 confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
docker exec -it abc123 confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
  • 로컬에서 mariadb client 다운

  • 볼륨으로 파일 공유
  kafka-connect:
    image: confluentinc/cp-kafka-connect:latest
    ports:
      - "8083:8083"
    volumes:
      - D:/JavaProject/ecommerce/jar:/usr/share/java/kafka-connect-jdbc/

 

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      my-network:
        ipv4_address: 172.18.0.100 #ip 할당 직접했습니다

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.18.0.101
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
    networks:
      my-network:
        ipv4_address: 172.18.0.101 #ip 할당 직접했습니다

  kafka-connector-mariadb:
    image: confluentinc/cp-kafka-connect:latest
    ports:
      - 8083:8083
    links:
      - kafka
      - zookeeper
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "quickstart-avro"
      CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
    volumes:
      - $PWD/jars:/etc/kafka-connect/jars #jars파일들 volume을 통하여 사용
    networks:
      my-network:
        ipv4_address: 172.18.0.102 #ip 할당 직접했습니다

networks:
  my-network:
    name: ecommerce-network

 

* 드디어 해결

  • mariadb와 kafka는 같은 네트워크에 있어야함
  • jars(jdbc driver, maria client) 를 로컬에 저장한다음에 볼륨마운트를 통해 kafka-connect 폴더내의 etc/kafka-connect/jars에 옮겨야함!

version: '2'
services:

  mariadb:
    image: mariadb:latest
    ports:
      - "3307:3306"
    environment:
      MYSQL_ROOT_PASSWORD: qwerty
      MYSQL_DATABASE: mydb
    networks:
      my-network:
        ipv4_address: 172.18.0.104
  
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks: 
      my-network:
        ipv4_address: 172.18.0.100

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.18.0.101
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on: 
      - zookeeper
    networks: 
      my-network:
        ipv4_address: 172.18.0.101

  kafka-ui:
    image: provectuslabs/kafka-ui
    ports:
      - "8989:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
    depends_on:
      - kafka
    networks: 
      my-network:
        ipv4_address: 172.18.0.102

  kafka-connect:
    image: confluentinc/cp-kafka-connect:latest
    ports:
      - "8083:8083"
    volumes:
      - D:/JavaProject/kafka/jars:/etc/kafka-connect/jars #jars파일들 volume을 통하여 사용
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/share/java/kafka-connect-jdbc"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
    depends_on:
      - kafka
      - zookeeper
    networks: 
      my-network:
        ipv4_address: 172.18.0.103

  rabbitmq:
    image: rabbitmq:management
    ports:
      - "15671:15671"
      - "15672:15672"
      - "5671:5671"
      - "5672:5672"
      - "4369:4369"
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
    networks: 
      my-network:

networks: 
  my-network:
    external: true
    name: ecommerce-network
  • kafka-connect에 날릴 post 요청
  • users 테이블이 겹쳐서(시스템의 users와 겹침) db명을 명시해줘야함 // mydb.users
  • localhost대신 컨테이너이름(mariadb) 기입
{
    "name": "my-source-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mariadb://mariadb:3306/mydb",
        "connection.user": "root",
        "connection.password": "qwerty",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "table.whitelist": "mydb.users",
        "topic.prefix": "my_topic_",
        "tasks.max": "1"
    }
}
  • kafka-connect에서 해당 db에 지속적으로 요청을 날려서 변경사항이 있는지 감시한다!

create table users(

    id int auto_increment primary key,

    user_id varchar(20),

    pwd varchar(20),

    name varchar(20),

    created_at datetime default NOW()

);

create table orders (

    id int auto_increment primary key,

    product_id varchar(20) not null,

    qty int default 0,

    unit_price int default 0,

    total_price int default 0,

    user_id varchar(50) not null,

    order_id varchar(50) not null,

    created_at datetime default NOW()

);

 

* 테스트

  • data insert후 topic이 자동으로 생성된다.

  • 생성된 메시지의 모습
    • 메타데이터, 페이로드가 생긴다.

  • kafka에 접속해 컨슈머를 터미널로 만들어서 test

  • 데이터 하나 더 생산 후
  • 컨슈머에 바뀐값이 업데이트 됨

 

* 다음으로

  • 아래가 카프카의 데이터 전달 형식이다.
  • 이 형식을 맞추면 카프카를 통해 db에 입/출력을 할수있다!
{
	"schema": {
		"type": "struct",
		"fields": [
			{
				"type": "int32",
				"optional": false,
				"field": "id"
			},
			{
				"type": "string",
				"optional": true,
				"field": "user_id"
			},
			{
				"type": "string",
				"optional": true,
				"field": "pwd"
			},
			{
				"type": "string",
				"optional": true,
				"field": "name"
			},
			{
				"type": "int64",
				"optional": true,
				"name": "org.apache.kafka.connect.data.Timestamp",
				"version": 1,
				"field": "created_at"
			}
		],
		"optional": false,
		"name": "users"
	},
	"payload": {
		"id": 1,
		"user_id": "a",
		"pwd": "b",
		"name": "c",
		"created_at": 1739635123000
	}
}

 

 

* sink connect

{
   "name": "my-sink-connect",
   "config": {
       "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
       "connection.url": "jdbc:mysql://localhost:3306/mydb",
       "connection.user": "root", 
       "connection.password": "test1357",
       "auto.create": "true",
       "auto.evolve": "true",
       "delete.enabled": "false",
       "tasks.max": "1",
       "topics": "my_topic_users"
   }
}
  • 테이블이 생겼다?

  • 같은값들이 들어가있네?
  • users테이블에 값넣으면 여기도 자동으로 insert 된다.

  • test용 생산자 만들기
  • 데이터 생산

  • users에는 값이 없음.

  • topic 명의 테이블에는 값이 업데이트 됨

  • 주의사항
    • json 형식이 아닌 데이터를 보내면, sink connector가 죽어버림
    • message들을 삭제해줘야 살릴수있음.
    • 당연히 db에도 안들어감.
    •  
  • 실험
    • 같은 id로 보내면?
    • 값이 들어가진다???
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"pwd"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"}],"optional":false,"name":"users"},"payload":{"id":4,"user_id":"4","pwd":"4","name":"4","created_at":1739000079000}}

  • 생성된 table에는 pk 안걸려있네?

 

* 개념정리

  • source(user table)에 데이터 삽입 -> kafka에게 msg 전달 -> sink는 target table에 insert 작업

 

'DevOps > Docker' 카테고리의 다른 글

[Docker] docker-compose 작성, 인텔리제이에서 연결  (0) 2024.09.17