* network 만들기
docker network create --gateway 172.18.0.1 --subnet 172.18.0.0/16 ecommerce-network
- 같은 네트워크내에서는 다른 서비스 호출시 이름으로 가능하다.


* 삽질
- kafka connect에 jdbc 설치
# 컨테이너 ID가 예를 들어 'abc123'이라면
docker exec -it abc123 confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
docker exec -it abc123 confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
- 로컬에서 mariadb client 다운

- 볼륨으로 파일 공유
kafka-connect:
image: confluentinc/cp-kafka-connect:latest
ports:
- "8083:8083"
volumes:
- D:/JavaProject/ecommerce/jar:/usr/share/java/kafka-connect-jdbc/

version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
my-network:
ipv4_address: 172.18.0.100 #ip 할당 직접했습니다
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.18.0.101
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
networks:
my-network:
ipv4_address: 172.18.0.101 #ip 할당 직접했습니다
kafka-connector-mariadb:
image: confluentinc/cp-kafka-connect:latest
ports:
- 8083:8083
links:
- kafka
- zookeeper
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "quickstart-avro"
CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
volumes:
- $PWD/jars:/etc/kafka-connect/jars #jars파일들 volume을 통하여 사용
networks:
my-network:
ipv4_address: 172.18.0.102 #ip 할당 직접했습니다
networks:
my-network:
name: ecommerce-network
* 드디어 해결
- mariadb와 kafka는 같은 네트워크에 있어야함
- jars(jdbc driver, maria client) 를 로컬에 저장한다음에 볼륨마운트를 통해 kafka-connect 폴더내의 etc/kafka-connect/jars에 옮겨야함!


version: '2'
services:
mariadb:
image: mariadb:latest
ports:
- "3307:3306"
environment:
MYSQL_ROOT_PASSWORD: qwerty
MYSQL_DATABASE: mydb
networks:
my-network:
ipv4_address: 172.18.0.104
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
my-network:
ipv4_address: 172.18.0.100
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.18.0.101
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
networks:
my-network:
ipv4_address: 172.18.0.101
kafka-ui:
image: provectuslabs/kafka-ui
ports:
- "8989:8080"
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
depends_on:
- kafka
networks:
my-network:
ipv4_address: 172.18.0.102
kafka-connect:
image: confluentinc/cp-kafka-connect:latest
ports:
- "8083:8083"
volumes:
- D:/JavaProject/kafka/jars:/etc/kafka-connect/jars #jars파일들 volume을 통하여 사용
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/share/java/kafka-connect-jdbc"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
depends_on:
- kafka
- zookeeper
networks:
my-network:
ipv4_address: 172.18.0.103
rabbitmq:
image: rabbitmq:management
ports:
- "15671:15671"
- "15672:15672"
- "5671:5671"
- "5672:5672"
- "4369:4369"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
networks:
my-network:
networks:
my-network:
external: true
name: ecommerce-network
- kafka-connect에 날릴 post 요청
- users 테이블이 겹쳐서(시스템의 users와 겹침) db명을 명시해줘야함 // mydb.users
- localhost대신 컨테이너이름(mariadb) 기입
{
"name": "my-source-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mariadb://mariadb:3306/mydb",
"connection.user": "root",
"connection.password": "qwerty",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "mydb.users",
"topic.prefix": "my_topic_",
"tasks.max": "1"
}
}
- kafka-connect에서 해당 db에 지속적으로 요청을 날려서 변경사항이 있는지 감시한다!

create table users(
id int auto_increment primary key,
user_id varchar(20),
pwd varchar(20),
name varchar(20),
created_at datetime default NOW()
);
create table orders (
id int auto_increment primary key,
product_id varchar(20) not null,
qty int default 0,
unit_price int default 0,
total_price int default 0,
user_id varchar(50) not null,
order_id varchar(50) not null,
created_at datetime default NOW()
);
* 테스트
- data insert후 topic이 자동으로 생성된다.


- 생성된 메시지의 모습
- 메타데이터, 페이로드가 생긴다.

- kafka에 접속해 컨슈머를 터미널로 만들어서 test

- 데이터 하나 더 생산 후
- 컨슈머에 바뀐값이 업데이트 됨

* 다음으로
- 아래가 카프카의 데이터 전달 형식이다.
- 이 형식을 맞추면 카프카를 통해 db에 입/출력을 할수있다!
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "user_id"
},
{
"type": "string",
"optional": true,
"field": "pwd"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "created_at"
}
],
"optional": false,
"name": "users"
},
"payload": {
"id": 1,
"user_id": "a",
"pwd": "b",
"name": "c",
"created_at": 1739635123000
}
}
* sink connect
{
"name": "my-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "root",
"connection.password": "test1357",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topics": "my_topic_users"
}
}
- 테이블이 생겼다?

- 같은값들이 들어가있네?
- users테이블에 값넣으면 여기도 자동으로 insert 된다.

- test용 생산자 만들기
- 데이터 생산

- users에는 값이 없음.

- topic 명의 테이블에는 값이 업데이트 됨

- 주의사항
- json 형식이 아닌 데이터를 보내면, sink connector가 죽어버림
- message들을 삭제해줘야 살릴수있음.
- 당연히 db에도 안들어감.
- 실험
- 같은 id로 보내면?
- 값이 들어가진다???
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"pwd"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"}],"optional":false,"name":"users"},"payload":{"id":4,"user_id":"4","pwd":"4","name":"4","created_at":1739000079000}}

- 생성된 table에는 pk 안걸려있네?

* 개념정리

- source(user table)에 데이터 삽입 -> kafka에게 msg 전달 -> sink는 target table에 insert 작업

'DevOps > Docker' 카테고리의 다른 글
[Docker] docker-compose 작성, 인텔리제이에서 연결 (0) | 2024.09.17 |
---|