* 문제
- 주문을 넣으면 round robin으로 data가 각각의 db에 나누어 저장되는 문제
* 해결
- 설계
- order-service의 db를 mariadb로 변경
datasource:
driver-class-name: org.mariadb.jdbc.Driver
url: jdbc:mariadb://localhost:3307/mydb # MariaDB URL
username: root
password: qwerty
- kafka에서 사용가능한 형식의 data로 만들어 줘야 한다.
- 생성되는 json
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "order_id"
},
{
"type": "string",
"optional": true,
"field": "user_id"
},
{
"type": "string",
"optional": true,
"field": "product_id"
},
{
"type": "int32",
"optional": true,
"field": "qty"
},
{
"type": "int32",
"optional": true,
"field": "unit_price"
},
{
"type": "int32",
"optional": true,
"field": "total_price"
}
],
"optional": false,
"name": "orders"
},
"payload": {
"order_id": "2430e50a-3b84-41da-9d73-e1c7231f03dd",
"user_id": "f8f829d1-b3c0-4f1b-99f5-14aafe815b02",
"product_id": "CATALOG-001",
"qty": 10,
"unit_price": 1500,
"total_price": 15000
}
}
- connect 부분 임시 수정 (이게 원인은 아닌듯 함)
{
"name": "my-order-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mariadb://mariadb:3306/mydb",
"connection.user": "root",
"connection.password": "qwerty",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topics": "orders"
}
}
- 수정후
{
"name": "my-order-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mariadb://mariadb:3306/mydb",
"connection.user": "root",
"connection.password": "qwerty",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topics": "orders",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"insert.mode": "insert",
"table.name.format": "orders"
}
}
* 주의사항
- type을 int로 하면 못 알아먹음
List<Field> fileds = Arrays.asList(
new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"), // int 로 하면 에러 발생!
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price"));
- null값없이 값을 채워줘야함
- 결과
'DevOps > Kafka' 카테고리의 다른 글
[kafka] 데이터 동기화 테스트, 스프링 - 도커의 kafka 연결하기 (0) | 2025.02.16 |
---|