SpringFramework Kafka
커맨드를 활용해 producer topic consumer의 통신을 살펴보았다 이번엔 실제 예제프로젝트를 통해 해당통신을 구현해보도록한다.
기존 order-service와 catalog-service간의 kafka를 이용한 메시지 통신을 구현한다.
order-service에서 주문이 발생하면 해당주문의 상품주문개수만큼 catalog-service의 재고에서 빠지도록하는 예제이다.
order-service와 catalog-service의 데이터베이스는 각각 별도로 존재한다.
먼저 order -service(producer)에 dependency를 추가한다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
그후 config 파일을 생성한다.
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Consumer와 같이 config정보를 작성해준다.- method : ProducerFactory
데이터를 전달하기위한 인스턴스인 KafkaTemplate가 필요하다.- method : KafkaTemplate
그다음 데이터를 토픽에 전달하는 producer 소스이다
@Service
@Slf4j
public class KafkaProducer {
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public OrderDto send(String topic, OrderDto orderDto) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(orderDto);
} catch(JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from the Order microservice: " + orderDto);
return orderDto;
}
}
@PostMapping("{userId}/orders")
public ResponseEntity<ResponseOrder> createUser(@PathVariable String userId, @RequestBody RequestOrder order){
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = mapper.map(order, OrderDto.class);
orderDto.setUserId(userId);
OrderDto createOrder = orderService.createOrder(orderDto);
ResponseOrder responseUser = mapper.map(createOrder, ResponseOrder.class);
/* send this oder to the kafka*/
kafkaProducer.send("example-catalog-topic",orderDto);
return ResponseEntity.status(HttpStatus.CREATED).body(responseUser);
}
producer에서 생성한 메서드를 order-service에서 주문이 발생할시 해당 메서드를 호출하도록한다.
example-catalog-topic이란 토픽이름으로 orderDto(주문정보)를 opjectMapper를 이용해 json으로 치환하여 송신한다.
다음으로는 catalog-service(consumer)작업을 하도록한다 마찬가지로 spring kafka 디펜던시를 추가하도록한다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
카프카의 정보를 작성
카프카 서버주소, group_id를 작성해준다. group_id는 카프카에서 토픽에 쌓여있는 consumer를 그루핑할 수 있다. 여러개의 consumer가 데이터를 가져갈때 특정한 group을 만들어서 사용할 수 있다.
또한 카프카에 json형식으로 보낼 것이기 때문에 key, value의 deserializer를 설정해준다.
이제 접속정보를 이용해서 리스너를 생성하자
ConcurrentKafkaListenerContainerFactory
리스너를 생성후 위에서 작성한 설정 정보를 등록한다.
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
private final CatalogRepository catalogRepository;
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage) {
log.info("Kafka Message: ->" + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
CatalogEntity entity = catalogRepository.findByProductId((String) map.get("productId"));
if(entity != null){
entity.setStock(entity.getStock() - (Integer)map.get("qty"));
}
catalogRepository.save(entity);
}
}
@KafkaListener어노테이션을 사용하여 topic(example-catalog-topic)에 값이 들어오면 감지를 하도록한다.
값을 통해 CatalogRepository (DB)를 업데이트해준다.
이제 테스트를 진행해본다.
먼저 최초 CATALOG-001 상품의 재고는 100개로 확인되었다.
order-service에서 해당 상품을 15개주문해보도록한다.
터미널로 확인해보면 해당 토픽에 order-service에서 보낸 주문정보가 json형태로 보내진다
consumer의 kafkaListener가 해당 토픽의 변경상태를 감지하여 주문정보에따른
재고를 최신화한 결과 15개가 빠진 85개로 정정되었다.
추가적으로 사실 여기서 jpa를 공부해본사람이라면 consumer측에서 해당 토픽정보를 수신받아 수량을 마이너스하고 재저장하는 로직이 필요없다는 것을 알 수 있다. datajpa에 의해 select된 엔티티는 영속상태이므로 해당 class파일에 @Transactional 어노테이션만 붙여준다면(트랜잭션이 없으면 변경은 불가) 재저장 할 필요없이 jpa의 더티체킹에 의해 자동으로 업데이트 될 것이다.