관리 메뉴

Mini

[kafka] kafka connect를 활용한 다중서버 db 동기화 문제 해결 본문

DevOps/Kafka

[kafka] kafka connect를 활용한 다중서버 db 동기화 문제 해결

Mini_96 2025. 2. 16. 18:09

* 문제

  • 주문을 넣으면 round robin으로 data가 각각의 db에 나누어 저장되는 문제

 

* 해결

  • 설계

  • order-service의 db를 mariadb로 변경
datasource:
  driver-class-name: org.mariadb.jdbc.Driver
  url: jdbc:mariadb://localhost:3307/mydb   # MariaDB URL
  username: root
  password: qwerty
  • kafka에서 사용가능한 형식의 data로 만들어 줘야 한다.

  • 생성되는 json
{
	"schema": {
		"type": "struct",
		"fields": [
			{
				"type": "string",
				"optional": true,
				"field": "order_id"
			},
			{
				"type": "string",
				"optional": true,
				"field": "user_id"
			},
			{
				"type": "string",
				"optional": true,
				"field": "product_id"
			},
			{
				"type": "int32",
				"optional": true,
				"field": "qty"
			},
			{
				"type": "int32",
				"optional": true,
				"field": "unit_price"
			},
			{
				"type": "int32",
				"optional": true,
				"field": "total_price"
			}
		],
		"optional": false,
		"name": "orders"
	},
	"payload": {
		"order_id": "2430e50a-3b84-41da-9d73-e1c7231f03dd",
		"user_id": "f8f829d1-b3c0-4f1b-99f5-14aafe815b02",
		"product_id": "CATALOG-001",
		"qty": 10,
		"unit_price": 1500,
		"total_price": 15000
	}
}
  • connect 부분 임시 수정 (이게 원인은 아닌듯 함)
{
   "name": "my-order-sink-connect",
   "config": {
       "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
       "connection.url": "jdbc:mariadb://mariadb:3306/mydb",
       "connection.user": "root", 
       "connection.password": "qwerty",
       "auto.create": "true",
       "auto.evolve": "true",
       "delete.enabled": "false",
       "tasks.max": "1",
       "topics": "orders"
   }
}
  • 수정후
{
   "name": "my-order-sink-connect",
   "config": {
       "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
       "connection.url": "jdbc:mariadb://mariadb:3306/mydb",
       "connection.user": "root", 
       "connection.password": "qwerty",
       "auto.create": "true",
       "auto.evolve": "true",
       "delete.enabled": "false",
       "tasks.max": "1",
       "topics": "orders",
       
       "key.converter": "org.apache.kafka.connect.json.JsonConverter",
       "key.converter.schemas.enable": "true",
       "value.converter": "org.apache.kafka.connect.json.JsonConverter",
       "value.converter.schemas.enable": "true",
       
       "insert.mode": "insert",
       "table.name.format": "orders"
   }
}

 

* 주의사항

  • type을 int로 하면 못 알아먹음
List<Field> fileds = Arrays.asList(
        new Field("string", true, "order_id"),
        new Field("string", true, "user_id"),
        new Field("string", true, "product_id"),
        new Field("int32", true, "qty"), // int 로 하면 에러 발생!
        new Field("int32", true, "unit_price"),
        new Field("int32", true, "total_price"));

  • null값없이 값을 채워줘야함

  • 결과