목표 : Kafka와 Debezium을 통해 MariaDB의 변경사항을 Kafka 토픽으로 발행/구독
환경 : Linux(x86_64, OS:CentOS Stream release 9)
1. Docker compose 작성
(docker compose는 사전에 설치하여 준비하기)
적당한 경로에 docer-compose를 실행하고 컨테이너 볼률을 저장할 디렉터리를 생성
cd /
mkdir data
cd /data
mkdir kafka
cd kafka
chown -R 1000:1000 /data/kafka #1000은 유저ID 맞게 설정
chmod -R 777 /data/kafka
mkdir data #브로커 볼륨을 저장할 디렉터리 생성
chmod -R 777 /data/kafka/data
docker-compose 실행 시 함께 수행될 환경셋팅 파일(.env) 작성
위치 : /data/kafka
# Kafka Ports
KAFKA_1_PORT=9092
KAFKA_2_PORT=9093
KAFKA_3_PORT=9094
# Kafka Cluster ID
KAFKA_CLUSTER_ID=8071505ff4c5f37d346b2b45e649f8e9 # 이 값은 자유롭게 작성. 난수로 작성하는 것을 추천
# Kafka Cluster Name
KAFKA_CLUSTER_NAME=원하는 이름 작성 # 클러스터 이름
# Docker Host IP for External Listeners
DOCKER_HOST_IP=XXX.XXX.XXX.XXX # 현재 로컬 IP 작성. 네트워크 환경에 맞게 수정
# Kafka Directories
KAFKA_DIR=/data/kafka/data # Kafka 데이터가 저장되는 디렉토리 이름
# Kafka UI Port
KAFKA_UI_PORT=9090 # Kafka UI에 접근하는 포트. 알맞게 변경하여 사용
# Debezium Port
DEBEZIUM_PORT=8083 # Debezium에 접근하는 포트. 알맞게 변경하여 사용
docker-compose.yaml 작성
위치 : /data/kafka
services:
#카프카 브로커 1 정보
kafka-1:
container_name: kafka-1
image: confluentinc/cp-kafka:7.7.1
platform: linux/amd64
ports:
- "${KAFKA_1_PORT}:9092"
volumes:
- ${KAFKA_DIR}/kafka-1:/var/lib/kafka/data
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://${DOCKER_HOST_IP}:${KAFKA_1_PORT}
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:${KAFKA_1_PORT}
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
CLUSTER_ID: ${KAFKA_CLUSTER_ID}
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
#카프카 브로커 2 정보
kafka-2:
container_name: kafka-2
image: confluentinc/cp-kafka:7.7.1
platform: linux/amd64
ports:
- "${KAFKA_2_PORT}:9093"
volumes:
- ${KAFKA_DIR}/kafka-2:/var/lib/kafka/data
environment:
KAFKA_NODE_ID: 2
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29092,EXTERNAL://${DOCKER_HOST_IP}:${KAFKA_2_PORT}
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:${KAFKA_2_PORT}
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
CLUSTER_ID: ${KAFKA_CLUSTER_ID}
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
#카프카 브로커 3 정보
kafka-3:
container_name: kafka-3
image: confluentinc/cp-kafka:7.7.1
platform: linux/amd64
ports:
- "${KAFKA_3_PORT}:9094"
volumes:
- ${KAFKA_DIR}/kafka-3:/var/lib/kafka/data
environment:
KAFKA_NODE_ID: 3
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29092,EXTERNAL://${DOCKER_HOST_IP}:${KAFKA_3_PORT}
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:${KAFKA_3_PORT}
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
CLUSTER_ID: ${KAFKA_CLUSTER_ID}
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
# 웹 UI 환경에서 조회하는 카프카 대시보드
kafka-ui:
image: provectuslabs/kafka-ui
platform: linux/amd64
container_name: kafka-ui
ports:
- "${KAFKA_UI_PORT}:8080"
restart: always
environment:
KAFKA_CLUSTERS_0_NAME: ${KAFKA_CLUSTER_NAME}
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092
depends_on:
- kafka-1
- kafka-2
- kafka-3
# Debezium 정보
kafka-connect:
container_name: kafka-connect
image: debezium/connect:latest
platform: linux/amd64
ports:
- "${DEBEZIUM_PORT}:8083"
environment:
REST_PORT: 8083
REST_LISTENERS: http://0.0.0.0:${DEBEZIUM_PORT}
GROUP_ID: ${KAFKA_CLUSTER_ID}
BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092
REST_ADVERTISED_LISTENERS: http://${DOCKER_HOST_IP}:${DEBEZIUM_PORT}
LISTENERS: http://0.0.0.0:${DEBEZIUM_PORT}
KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
OFFSET_FLUSH_INTERVAL_MS: 10000
CONFIG_STORAGE_TOPIC: connect-configs
OFFSET_STORAGE_TOPIC: connect-offsets
STATUS_STORAGE_TOPIC: connect-status
CONFIG_STORAGE_REPLICATION_FACTOR: 3
OFFSET_STORAGE_REPLICATION_FACTOR: 3
STATUS_STORAGE_REPLICATION_FACTOR: 3
LOG4J_LOGGERS: org.apache.kafka.connect.runtime.WorkerSinkTask=DEBUG
#LOG4J_ROOT_LOGLEVEL: INFO
#PLUGIN_PATH: /usr/share/java
depends_on:
- kafka-1
- kafka-2
- kafka-3
2. docker compose 실행
/data/kafka/ 디렉터리에 .env 파일과 docker-compose.yaml 파일이 같은 경로에 있는 상태에서 docker-compose up -d 명령어 수행
docker-compose up -d
docker-compose 실행 시 .env 파일이 같은 경로에 있으면 자동으로 적용 됨
3. Debezium 필수 토픽 생성(connect-configs, connect-offsets, connect-status)
docker exec -it kafka-1 kafka-topics --create --topic connect-configs --partitions 1 --replication-factor 3 --config cleanup.policy=compact --bootstrap-server kafka-1:29092,kafka-2:29092,kafka-3:29092
docker exec -it kafka-1 kafka-topics --create --topic connect-offsets --partitions 1 --replication-factor 3 --config cleanup.policy=compact --bootstrap-server kafka-1:29092,kafka-2:29092,kafka-3:29092
docker exec -it kafka-1 kafka-topics --create --topic connect-status --partitions 1 --replication-factor 3 --config cleanup.policy=compact --bootstrap-server kafka-1:29092,kafka-2:29092,kafka-3:29092
4. MariaDB 수정
4.1 MariaDB 환경변수 수정
MariaDB 컨테이너 접속
docker exec -it {mariadb 컨테이너} bash
my.cnf 수정
(경로는 버젼따라 다를 수 있음)
vi /etc/mysql/my.cnf
환경변수 주석해제 및 추가(binlog_format, server-id, log_bin)
[mysqld]
...
binlog_format=row
server-id = 1 # 아래에서 사용 됨. 다른 값으로 적용하여도 됨
log_bin = /var/log/mysql/mariadb-bin
...
4.2 Debezium에서 스트리밍하기 위한 유저생성
mysql -u root -p
use mysql
create user 'kafka'@'localhost' identified by '{비밀번호}';
create user 'kafka'@'%' identified by '{비밀번호}';
grant select, reload, replication client, replication slave on *.* to 'kafka'@'%'; # 모니터링이 목적인 계정이므로 delete, update, insert 권한은 제외함
flush privileges;
show grants for 'kafka'@'%';
5. DB 커넥션 정보 등록
curl -X POST -H "Content-Type: application/json" --data '{
"name": "{알맞게 작성}-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "{DB 호스트 주소}",
"database.port": "{DB 연결 포트}",
"database.user": "{DB 접속 계정}",
"database.password": "{DB 접속 계정의 비밀번호}",
"database.server.id": "{위 my.cnf에서 작성한 server-id 작성. 위 예시 기준 1로 작성하면 됨}",
"database.server.name": "{DB 명}",
"database.whitelist": "{감시할 테이블}",
"database.history.kafka.bootstrap.servers": "kafka-1:29092,kafka-2:29092,kafka-3:29092",
"database.history.kafka.topic": "schema-changes.{알맞게 작성}",
"topic.prefix": "{알맞게 작성}",
"schema.history.internal.kafka.topic": "schema-changes.internal.{알맞게 작성}",
"schema.history.internal.kafka.bootstrap.servers": "kafka-1:29092,kafka-2:29092,kafka-3:29092"
}
}' http://{Debezium 접속 호스트}:{Debezium 접속 포트}/connectors
새 토픽 추가
docker exec -it kafka-1 kafka-topics --create --topic {바로 위에서 작성한 database.history.kafka.topic 넣어주기} --bootstrap-server kafka-1:29092,kafka-2:29092,kafka-3:29092 --partitions 1 --replication-factor 3
6. 실행 확인
6.1 커넥션 상태 확인
curl -X GET http://localhost:8083/{바로 위에서 작성한 name 넣어주기}/status
error 가 있다면 웹서핑 등으로 수정하기
6.2 config 정보 확인
curl -X GET http://localhost:8083/connectors/{바로 위에서 작성한 name 넣어주기}/config
6.3 모니터링 확인
http://{kafka 호스트 서버주소} 접속
브로커 확인
토픽 확인(DB 스트리밍 토픽 확인)
완료~~( 'ᵕ' و(و ♪( 'o' و(و ♪
'Server > Docker' 카테고리의 다른 글
[Docker]M1 Mac mysql wordpress 연동 (0) | 2021.09.19 |
---|---|
[Docker]M1 Mac에서 homebrew를 활용한 docker 설치 (2) | 2021.07.15 |