이번 포스팅은 이전에 게시했던 "Apache Kafka란" 주제의 게시물에 이은 간단한 샘플을 작성해 보는 내용입니다.
마이크로서비스 아키텍처에서 메시지 전달을 위해 많이 사용하는 Pub/Sub구조의 프로그램을 간단히 설명하는 예제입니다. Pub/Sub 구조는 마이크로서비스 아키텍처를 이용한 응용 프로그램 개발시 비동기 방식의 메시지 전달을 위해 가장 많이 사용되는 방식으로 그 개념을 이해할 필요가 있습니다.
Pub/Sub 구조
Kafka, RabbitMQ를 비롯한 Message Broker를 이용해 메시지를 전달하는 방법 중의 하나입니다. 메시지를 생성하는 Publisher와 그 메시지를 전달하는 Message Broker(Queue) 그리고 메시지를 소비하는 Consumer로 구성되어 있습니다.
publisher는 메세지를 topic이라는 그룹에 넣어 발행합니다.
Message Broker는 각 publisher들에서 unique한 topic명으로 그룹화된 메세지들을 받아 저정합니다.
subscriber는 본인이 관심있어하는 메시지를 topic을 통해 구분해 해당 topic을 가입해 신규 메시지가 topic에 담길때마다 가져옵니다.
pub/sub구조에서 publisher는 본인의 topic과 그에 딸린 message를 어느 subscriber가 가져도 소비(consum)하는지 모릅니다. 마찬가지로 subscriber입장에서도 topic에만 관심이 있지 해당 topic의 메시지를 누가 발행(publish)하는지 관심이 없습니다.
단, Kafka에서는 Publisher라는 이름 대신 Producer, Subscriber라는 이름 대신 Consumer라는 이름을 사용하고 있습니다. Publisher와 Producer, Subscriber와 Consumer는 동일한 의미이기 때문에 본 포스팅에서 혼합해서 사용하고 있습니다.
Greeting Application 예제
예제로 사용할 Greeting 어플리케이션은 간단한 Pub/Sub 구조 어플리케이션으로써 publish를 담당하는 KafkaPubApplication은 Publish(Produce)를 하는 KafkaPubApplication과 Subscribe(Consume)을 하는 KafkaSubApplication으로 구성되어 있습니다.
KafkaPubApplication은 사용자 UI(swagger-ui)로부터 name과 message로 구성된 Greeting을 입력 받아 이를 KafkaTemplate를 통해 'greeting'이란 topic로 메시지를 발행하고 사용자에게 리턴 문자("Hi there...")를 전달하고 종료됩니다.
KafkaSubApplication은 Consume(Subscribe)를 담당하며 kafka에 저장된 topic 중 greeting이라는 topic에 가입합니다. 평상시 greeting topic을 주기적으로 polling(listening) 하고 있다가 topic에 신규 메시지가 발행되면 끌어와 소비(consume)를 하게 됩니다.
이 예제에서 각각의 서비스는 Spring Boot 기반으로 구성되어 있습니다.
개발에 필요한 Apache Kafka 설정
이 블로그의 다음 포스팅을 참조에 apache kafka를 설정하도록 한다.
윈도우에 Apache Kafka 개발 환경 구성하기! (https://sharplee7.tistory.com/60?category=1041182 )
프로그램 소스코드
Producer(Publisher) 구현
1. dependency
implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-aop' // Kafka 라이브러리 참조 implementation 'org.springframework.kafka:spring-kafka' compile group: 'io.springfox', name: 'springfox-swagger-ui', version: '2.6.1' compile group: 'io.springfox', name: 'springfox-swagger2', version: '2.6.1' |
2. application.properties
server.port=8081 # 접속하고자 하는 Kafka Broker IP와 포트 지정 kafka.bootstrapAddress=localhost:9092 greeting.topic.name=greeting |
3. Publisher 환경설정
@Bean public ProducerFactory<String, Greeting> greetingProducerFactory() { Map<String, Object> configProps = new HashMap<>(); // Kafka Publish를 위한 기본 환경 설정을 입력한다. // 서버 주소 설정 configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); // 키 값 전송 방식(StringSerializer) configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 값 전송 방식(JsonSerializer) configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, Greeting> greetingKafkaTemplate() { // Kafka Publish 사용을 위한 객체 생성 return new KafkaTemplate<>(greetingProducerFactory()); } |
4. Controller 클래스
@RequestMapping(value = "/publish", method=RequestMethod.POST) public Courtesy greetAndCourtesy(@RequestBody final Greeting greeting) throws Exception { // API로부터 인사말을 입력 파라미터로 받는다. return greetingService.greetAndCourtesy(greeting); } |
5. Service 클래스
@Override public Courtesy greetAndCourtesy(Greeting greeting) throws Exception { //카프카 퍼블리싱 greetingProducer.sendMessage(greeting); // Respose body에 넣을 임의의 데이터 생성 return new Courtesy("Guest","Hi there? I'm fine and you?"); } |
6. Publishing 클래스
public void sendMessage(Greeting greeting) { // Kafka 환경설정에서 만든 KafkaTemplate를 이용해 Topic에 메시지를 발행한다. greetingKafkaTemplate.send(greetingTopicName, greeting); } |
Consumer(Subscriber) 구현
1. dependency
implementation 'org.springframework.boot:spring-boot-starter-web' // Kafka Dependency 추가 implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-aop' compile group: 'io.springfox', name: 'springfox-swagger-ui', version: '2.6.1' compile group: 'io.springfox', name: 'springfox-swagger2', version: '2.6.1' |
2. application.properties
server.port=8082 // Kafka 주소 설정 kafka.bootstrapAddress=localhost:9092 greeting.topic.name=greeting |
3. Consumer 환경설정
public ConsumerFactory<String, Greeting> greetingConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); // 그룹 ID 지정, 동일 토픽에 서로 다른 Consumer가 여럿일때 group id를 다르게 지정함으로써 각각의 토픽 메시지를 찾아 갈수 있게 설정 props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() { // Kafka Listener 설정 ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(greetingConsumerFactory()); return factory; } |
4. Consumer 클래스
@KafkaListener(topics = "${greeting.topic.name}", containerFactory ="greetingKafkaListenerContainerFactory") public void greetingListener(Greeting greeting, Acknowledgment ack) { System.out.println(">>>>>> " + greeting); // Kafka를 통해 받은 메시지를 출력한다 } |
본 프로그램에 대한 완전한 소스는 아래의 git repository를 통해 참고 하실 수 있습니다.
https://github.com/sharplee7/kafka-demo.git
만약 DB접속, 실제 메시지 전송등에 대해 조금 더 잘된 예제를 보고 싶다면 이 블로그의 다음 글을 추천한다.
https://sharplee7.tistory.com/63
'Middleware > Message Queue' 카테고리의 다른 글
Kafka 파티션 (0) | 2022.06.03 |
---|---|
Kafka 설정 - 포인트만 (0) | 2022.04.24 |
Spring Boot를 이용한 Kafka Pub Sub 개발 (0) | 2021.08.31 |
Spring Boot에서 Apache Kafka 사용 ... 1/2 (0) | 2021.08.10 |
윈도우에서 Apache Kafka 개발 환경 만들기 (0) | 2021.08.10 |