Kafka 다운로드 후 zookeeper 실행 명령어
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
Kafka서버 실행 명렁어
./bin/kafka-server-start.sh ./config/server.properties
토픽생성
./bin/kafka-topics.sh --create --topic example-events --bootstrap-server localhost:9092 \ --partitions 1

토픽목록확인
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

좀전에 생성한 example-events토픽이 있는걸 확인 할 수 있음
프로듀서와 컨슈머를 이용한 간단한 메시지 주고받기 예제의 경우
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic example-events
프로듀서 콘솔과
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example-events --from-beginning
컨슈머 콘솔을 열어둔뒤
프로듀서콘솔에서 hello를 입력하게되면

컨슈머 측에서도

hello문자열이 표출되게된다.
kafka와 데이터베이스를 연동테스트해보기 위해 미리 설치해둔 mariaDB를 활용한다.
연동을 위해 kafka-connect를 설치하고 압축을 해제한다.
Kafka Connect 설치
curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz
curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz
tar xvf confluent-community-6.1.0.tar.gz
cd $KAFKA_CONNECT_HOME
Kafka Connect 실행
./bin/connect-distributed ./etc/kafka/connect-distributed.properties
JDBC Connector 설치
- https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
- confluentinc-kafka-connect-jdbc-10.7.3.zip
kafka connect 설치경로에 etc/kafka/connect-distributed.properties 파일 마지막에 아래 plugin 정보 추가
- plugin.path=[confluentinc-kafka-connect-jdbc-10.7.3 폴더](onfluentinc-kafka-connect-jdbc-10.7.4/lib 경로에있음)
JdbcSourceConnector에서 MariaDB 사용하기 위해 mariadb 드라이버 복사
kafka_2.13-3.5.1/confluent-6.1.0/share/java/kafka/ 폴더에 mariadb-java-client-2.7.2.jar 파일 복사
Kafka source connect 사용
postman을 활용하여
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
127.0.0.1:8083/connectors에 전송
Kafka sink connect 사용
{
"name":"my-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_users"
}
}
토픽과 같은 이름의 테이블이 생성된다.
결론적으로 users에 테이블이 변경될경우 my_topic_users에 그대로 전달되어 동기화된다.

maria db users테이블에 인서트를하게될경우 소스커넥트에 의해 토픽에 전달되고
전달된 정보를 싱크커넥트가 my_topic_users테이블에 동기화한다.

5번째행에 users에서 입력한 테스트 데이터가 들어간것을 확인할수있다.
'Kafka' 카테고리의 다른 글
KafKa를 활용한 데이터 동기화 (0) | 2023.09.11 |
---|---|
SpringFramework Kafka (1) | 2023.09.08 |
Kafka 개요 (0) | 2023.09.05 |