저번에는 두개의 각기다른서비스에서 kafka 토픽을 이용해 데이터를 주고받는 시나리오였다면
이번엔 같은서비스의 2개이상의 인스턴스에서 데이터 insert시 kafka에 토픽을 보내고
해당토픽을 sink connector를 통해 단일 db에 저장하는 시나리오이다.
단일db로는 MariaDB를 사용한다.
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = Arrays.asList(new Field("string",true,"order_id"),
new Field("string",true,"user_id"),
new Field("string",true,"product_id"),
new Field("int32",true,"qty"),
new Field("int32",true,"unit_price"),
new Field("int32",true,"total_price"));
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders").build();
public OrderDto send(String topic, OrderDto orderDto) {
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
ObjectMapper mapper = new ObjectMapper();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema,payload);
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
} catch(JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Order Producer sent data from the Order microservice: " + kafkaOrderDto);
return orderDto;
}
}
사용자 주문발생시 jpa를 이용해 db에 직접저장하는것이 아닌 카프카 토픽에 등록하는것으로 로직을 변경하도록 한다.
json형태로 보내기 위해 데이터 전달 객체를 설계한다.
해당 설계 dto는 다음과같다
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
private Schema schema;
private Payload payload;
}
@Data
@Builder
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
@Data
@Builder
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private int qty;
private int unit_price;
private int total_price;
}
KafkaOrderDto가 스키마와 페이로드 를 가지고 있으며 스키마는 헤더부분 페이로드는 실제 인서트되어질 데이터부분이다.
@PostMapping("{userId}/orders")
public ResponseEntity<ResponseOrder> createUser(@PathVariable String userId, @RequestBody RequestOrder order){
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = mapper.map(order, OrderDto.class);
orderDto.setUserId(userId);
/*jpa*/
//OrderDto createOrder = orderService.createOrder(orderDto);
//ResponseOrder responseUser = mapper.map(createOrder, ResponseOrder.class);
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(order.getQty() * order.getUnitPrice());
/* send this oder to the kafka*/
kafkaProducer.send("example-catalog-topic",orderDto);
orderProducer.send("orders",orderDto);
ResponseOrder responseUser = mapper.map(orderDto, ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(responseUser);
}
컨트롤러에선 Jpa부분을 주석처리한 후 카프카프로듀서클래스를 호출하여 "orders"라는 토픽이름으로 메세지를 보낸다.
그후 db에서 sink connector를 통해 등록된 데이터를 받기위해
해당 json을 127.0.0.1:8083/connectors 카프카 커넥터에 포스트방식으로 send한다.
테스트를 해보기 위해 인텔리제이에서 order-service를 구동하고 터미널로 하나의 인스턴스를 더 구동시킨다
그 후 주문로직을 5번호출하게되면 하나의인스턴스 씩 번갈아서 로드밸런싱되어 호출되게된다.
서버단에서 db에 직접접근하여 insert하는 로직은 없지만 kafka에 메세지를 전달하고 sink connector가 이를 탐지하여 db에 동기화시키기때문에 주문5건이 정상적으로 insert 되었다.
사실 msa와 kafka를 예제로만 접하다보니 아직 해당 기술들의 극대화된 장점이 와닿는 부분은 적다. 실무에서 경험을 통해 해당아키텍쳐들의 편리함을 느껴보면좋을 것 같다
'Kafka' 카테고리의 다른 글
SpringFramework Kafka (1) | 2023.09.08 |
---|---|
KafKa 커맨드 활용예시 (0) | 2023.09.07 |
Kafka 개요 (0) | 2023.09.05 |