Server/Docker

Kafka와 Debezium을 활용한 MariaDB CDC 구축 (Docker&Kafka&Debezium)

wonhy2ok 2024. 9. 20. 15:26

목표 : Kafka와 Debezium을 통해 MariaDB의 변경사항을 Kafka 토픽으로 발행/구독

 

환경 : Linux(x86_64, OS:CentOS Stream release 9)

 

1. Docker compose 작성

(docker compose는 사전에 설치하여 준비하기)

적당한 경로에 docer-compose를 실행하고 컨테이너 볼률을 저장할 디렉터리를 생성

cd /
mkdir data
cd /data
mkdir kafka
cd kafka
chown -R 1000:1000 /data/kafka #1000은 유저ID 맞게 설정
chmod -R 777 /data/kafka
mkdir data	#브로커 볼륨을 저장할 디렉터리 생성
chmod -R 777 /data/kafka/data

 

docker-compose 실행 시 함께 수행될 환경셋팅 파일(.env) 작성
위치 : /data/kafka

# Kafka Ports
KAFKA_1_PORT=9092
KAFKA_2_PORT=9093
KAFKA_3_PORT=9094

# Kafka Cluster ID
KAFKA_CLUSTER_ID=8071505ff4c5f37d346b2b45e649f8e9  # 이 값은 자유롭게 작성. 난수로 작성하는 것을 추천

# Kafka Cluster Name
KAFKA_CLUSTER_NAME=원하는 이름 작성 # 클러스터 이름

# Docker Host IP for External Listeners
DOCKER_HOST_IP=XXX.XXX.XXX.XXX # 현재 로컬 IP 작성. 네트워크 환경에 맞게 수정

# Kafka Directories
KAFKA_DIR=/data/kafka/data  # Kafka 데이터가 저장되는 디렉토리 이름

# Kafka UI Port
KAFKA_UI_PORT=9090  # Kafka UI에 접근하는 포트. 알맞게 변경하여 사용

# Debezium Port
DEBEZIUM_PORT=8083  # Debezium에 접근하는 포트. 알맞게 변경하여 사용

 

docker-compose.yaml 작성

위치 : /data/kafka

services:
  #카프카 브로커 1 정보
  kafka-1:
    container_name: kafka-1
    image: confluentinc/cp-kafka:7.7.1
    platform: linux/amd64
    ports:
      - "${KAFKA_1_PORT}:9092"
    volumes:
      - ${KAFKA_DIR}/kafka-1:/var/lib/kafka/data
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://${DOCKER_HOST_IP}:${KAFKA_1_PORT}
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:${KAFKA_1_PORT}
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      CLUSTER_ID: ${KAFKA_CLUSTER_ID}
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
      
  #카프카 브로커 2 정보
  kafka-2:
    container_name: kafka-2
    image: confluentinc/cp-kafka:7.7.1
    platform: linux/amd64
    ports:
      - "${KAFKA_2_PORT}:9093"
    volumes:
      - ${KAFKA_DIR}/kafka-2:/var/lib/kafka/data
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29092,EXTERNAL://${DOCKER_HOST_IP}:${KAFKA_2_PORT}
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:${KAFKA_2_PORT}
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      CLUSTER_ID: ${KAFKA_CLUSTER_ID}
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true

  #카프카 브로커 3 정보
  kafka-3:
    container_name: kafka-3
    image: confluentinc/cp-kafka:7.7.1
    platform: linux/amd64
    ports:
      - "${KAFKA_3_PORT}:9094"
    volumes:
      - ${KAFKA_DIR}/kafka-3:/var/lib/kafka/data
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29092,EXTERNAL://${DOCKER_HOST_IP}:${KAFKA_3_PORT}
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:${KAFKA_3_PORT}
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      CLUSTER_ID: ${KAFKA_CLUSTER_ID}
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true

  # 웹 UI 환경에서 조회하는 카프카 대시보드
  kafka-ui:
    image: provectuslabs/kafka-ui
    platform: linux/amd64
    container_name: kafka-ui
    ports:
      - "${KAFKA_UI_PORT}:8080"
    restart: always
    environment:
      KAFKA_CLUSTERS_0_NAME: ${KAFKA_CLUSTER_NAME}
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3

  # Debezium 정보
  kafka-connect:
    container_name: kafka-connect
    image: debezium/connect:latest
    platform: linux/amd64
    ports:
      - "${DEBEZIUM_PORT}:8083"
    environment:
      REST_PORT: 8083
      REST_LISTENERS: http://0.0.0.0:${DEBEZIUM_PORT}
      GROUP_ID: ${KAFKA_CLUSTER_ID}
      BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092
      REST_ADVERTISED_LISTENERS: http://${DOCKER_HOST_IP}:${DEBEZIUM_PORT}
      LISTENERS: http://0.0.0.0:${DEBEZIUM_PORT}
      KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      OFFSET_FLUSH_INTERVAL_MS: 10000
      CONFIG_STORAGE_TOPIC: connect-configs
      OFFSET_STORAGE_TOPIC: connect-offsets
      STATUS_STORAGE_TOPIC: connect-status
      CONFIG_STORAGE_REPLICATION_FACTOR: 3
      OFFSET_STORAGE_REPLICATION_FACTOR: 3
      STATUS_STORAGE_REPLICATION_FACTOR: 3
      LOG4J_LOGGERS: org.apache.kafka.connect.runtime.WorkerSinkTask=DEBUG
      #LOG4J_ROOT_LOGLEVEL: INFO
      #PLUGIN_PATH: /usr/share/java
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3

 

2. docker compose 실행

/data/kafka/ 디렉터리에 .env 파일과 docker-compose.yaml 파일이 같은 경로에 있는 상태에서 docker-compose up -d 명령어 수행

docker-compose up -d

docker-compose 실행 시 .env 파일이 같은 경로에 있으면 자동으로 적용 됨

 

3. Debezium 필수 토픽 생성(connect-configs, connect-offsets, connect-status)

docker exec -it kafka-1 kafka-topics --create --topic connect-configs --partitions 1 --replication-factor 3 --config cleanup.policy=compact --bootstrap-server kafka-1:29092,kafka-2:29092,kafka-3:29092

docker exec -it kafka-1 kafka-topics --create --topic connect-offsets --partitions 1 --replication-factor 3 --config cleanup.policy=compact --bootstrap-server kafka-1:29092,kafka-2:29092,kafka-3:29092

docker exec -it kafka-1 kafka-topics --create --topic connect-status --partitions 1 --replication-factor 3 --config cleanup.policy=compact --bootstrap-server kafka-1:29092,kafka-2:29092,kafka-3:29092

 

 

4. MariaDB 수정

4.1 MariaDB 환경변수 수정

 

MariaDB 컨테이너 접속

docker exec -it {mariadb 컨테이너} bash

 

my.cnf 수정

(경로는 버젼따라 다를 수 있음)

vi /etc/mysql/my.cnf

 

환경변수 주석해제 및 추가(binlog_format, server-id, log_bin)

[mysqld]
...
binlog_format=row
server-id = 1	# 아래에서 사용 됨. 다른 값으로 적용하여도 됨
log_bin = /var/log/mysql/mariadb-bin
...

 

4.2 Debezium에서 스트리밍하기 위한 유저생성

mysql -u root -p
use mysql
create user 'kafka'@'localhost' identified by '{비밀번호}';
create user 'kafka'@'%' identified by '{비밀번호}';
grant select, reload, replication client, replication slave on *.* to 'kafka'@'%'; # 모니터링이 목적인 계정이므로 delete, update, insert 권한은 제외함
flush privileges;
show grants for 'kafka'@'%';

 

5. DB 커넥션 정보 등록

curl -X POST -H "Content-Type: application/json" --data '{
  "name": "{알맞게 작성}-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "{DB 호스트 주소}",
    "database.port": "{DB 연결 포트}",
    "database.user": "{DB 접속 계정}",
    "database.password": "{DB 접속 계정의 비밀번호}",
    "database.server.id": "{위 my.cnf에서 작성한 server-id 작성. 위 예시 기준 1로 작성하면 됨}",
    "database.server.name": "{DB 명}",
    "database.whitelist": "{감시할 테이블}",
    "database.history.kafka.bootstrap.servers": "kafka-1:29092,kafka-2:29092,kafka-3:29092",
    "database.history.kafka.topic": "schema-changes.{알맞게 작성}",
    "topic.prefix": "{알맞게 작성}",
   "schema.history.internal.kafka.topic": "schema-changes.internal.{알맞게 작성}",
   "schema.history.internal.kafka.bootstrap.servers": "kafka-1:29092,kafka-2:29092,kafka-3:29092"
  }
}' http://{Debezium 접속 호스트}:{Debezium 접속 포트}/connectors

 

새 토픽 추가

docker exec -it kafka-1 kafka-topics --create --topic {바로 위에서 작성한 database.history.kafka.topic 넣어주기} --bootstrap-server kafka-1:29092,kafka-2:29092,kafka-3:29092 --partitions 1 --replication-factor 3

 

 

6. 실행 확인

6.1 커넥션 상태 확인

curl -X GET http://localhost:8083/{바로 위에서 작성한 name 넣어주기}/status

error 가 있다면 웹서핑 등으로 수정하기

 

6.2 config 정보 확인

curl -X GET http://localhost:8083/connectors/{바로 위에서 작성한 name 넣어주기}/config

 

6.3 모니터링 확인

http://{kafka 호스트 서버주소} 접속

 

브로커 확인

 

토픽 확인(DB 스트리밍 토픽 확인)

 

완료~~( 'ᵕ' و(و ♪‪( 'o' و(و ♪‪

'Server > Docker' 카테고리의 다른 글

[Docker]M1 Mac mysql wordpress 연동  (0) 2021.09.19
[Docker]M1 Mac에서 homebrew를 활용한 docker 설치  (2) 2021.07.15