본문 바로가기
Kafka

KafKa 커맨드 활용예시

by 혀눅짱 2023. 9. 7.

Kafka 다운로드 후 zookeeper 실행 명령어

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

 

Kafka서버 실행 명렁어

./bin/kafka-server-start.sh ./config/server.properties

 

토픽생성

./bin/kafka-topics.sh --create --topic example-events --bootstrap-server localhost:9092 \ --partitions 1

 

토픽목록확인

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

좀전에 생성한 example-events토픽이 있는걸 확인 할 수 있음

 

프로듀서와 컨슈머를 이용한 간단한 메시지 주고받기 예제의 경우

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic example-events

프로듀서 콘솔과

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example-events --from-beginning

컨슈머 콘솔을 열어둔뒤

프로듀서콘솔에서 hello를 입력하게되면

컨슈머 측에서도 

hello문자열이 표출되게된다.

 

kafka와 데이터베이스를 연동테스트해보기 위해 미리 설치해둔 mariaDB를 활용한다.

 

연동을 위해  kafka-connect를 설치하고 압축을 해제한다.

 

Kafka Connect 설치

curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz

curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz

tar xvf confluent-community-6.1.0.tar.gz

cd  $KAFKA_CONNECT_HOME

 

Kafka Connect 실행

./bin/connect-distributed ./etc/kafka/connect-distributed.properties

 

JDBC Connector 설치

- https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

- confluentinc-kafka-connect-jdbc-10.7.3.zip 

 

kafka connect 설치경로에 etc/kafka/connect-distributed.properties 파일 마지막에 아래 plugin 정보 추가

- plugin.path=[confluentinc-kafka-connect-jdbc-10.7.3 폴더](onfluentinc-kafka-connect-jdbc-10.7.4/lib 경로에있음)

 

JdbcSourceConnector에서 MariaDB 사용하기 위해 mariadb 드라이버 복사

kafka_2.13-3.5.1/confluent-6.1.0/share/java/kafka/ 폴더에 mariadb-java-client-2.7.2.jar  파일 복사

 

 

Kafka source connect 사용

postman을 활용하여

{

"name" : "my-source-connect",

"config" : {

"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",

"connection.url":"jdbc:mysql://localhost:3306/mydb",

"connection.user":"root",

"connection.password":"test1357",

"mode": "incrementing",

"incrementing.column.name" : "id",

"table.whitelist":"users",

"topic.prefix" : "my_topic_",

"tasks.max" : "1"

}

}

127.0.0.1:8083/connectors에 전송

 

 

Kafka sink connect 사용

 

{

"name":"my-sink-connect",

"config":{

"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",

"connection.url":"jdbc:mysql://localhost:3306/mydb",

"connection.user":"root",

"connection.password":"test1357",

"auto.create":"true",

"auto.evolve":"true",

"delete.enabled":"false",

"tasks.max":"1",

"topics":"my_topic_users"

}

}

 

 

토픽과 같은 이름의 테이블이 생성된다.

결론적으로 users에 테이블이 변경될경우 my_topic_users에 그대로 전달되어 동기화된다.

 

 

maria db users테이블에 인서트를하게될경우 소스커넥트에 의해 토픽에 전달되고

전달된 정보를 싱크커넥트가 my_topic_users테이블에 동기화한다.

5번째행에 users에서 입력한 테스트 데이터가 들어간것을 확인할수있다.

 

'Kafka' 카테고리의 다른 글

KafKa를 활용한 데이터 동기화  (0) 2023.09.11
SpringFramework Kafka  (1) 2023.09.08
Kafka 개요  (0) 2023.09.05