반응형

이번 포스팅은 이전에 게시했던 "Apache Kafka란" 주제의 게시물에 이은 간단한 샘플을 작성해 보는 내용입니다.

마이크로서비스 아키텍처에서 메시지 전달을 위해 많이 사용하는 Pub/Sub구조의 프로그램을 간단히 설명하는 예제입니다. Pub/Sub 구조는 마이크로서비스 아키텍처를 이용한 응용 프로그램 개발시 비동기 방식의 메시지 전달을 위해 가장 많이 사용되는 방식으로 그 개념을 이해할 필요가 있습니다.

Pub/Sub 구조

Kafka, RabbitMQ를 비롯한 Message Broker를 이용해 메시지를 전달하는 방법 중의 하나입니다. 메시지를 생성하는 Publisher와 그 메시지를 전달하는 Message Broker(Queue) 그리고 메시지를 소비하는 Consumer로 구성되어 있습니다.

publisher는 메세지를 topic이라는 그룹에 넣어 발행합니다.

Message Broker는 각 publisher들에서 unique한 topic명으로 그룹화된 메세지들을 받아 저정합니다.

subscriber는 본인이 관심있어하는 메시지를 topic을 통해 구분해 해당 topic을 가입해 신규 메시지가 topic에 담길때마다 가져옵니다.

pub/sub구조에서 publisher는 본인의 topic과 그에 딸린 message를 어느 subscriber가 가져도 소비(consum)하는지 모릅니다. 마찬가지로 subscriber입장에서도 topic에만 관심이 있지 해당 topic의 메시지를 누가 발행(publish)하는지 관심이 없습니다.

 

 

Apache Kafka Pub/Sub 아키텍처

 

단, Kafka에서는 Publisher라는 이름 대신 Producer, Subscriber라는 이름 대신 Consumer라는 이름을 사용하고 있습니다. Publisher와 Producer, Subscriber와 Consumer는 동일한 의미이기 때문에 본 포스팅에서 혼합해서 사용하고 있습니다.

 

Greeting Application 예제

예제로 사용할 Greeting 어플리케이션은 간단한 Pub/Sub 구조 어플리케이션으로써 publish를 담당하는 KafkaPubApplication은 Publish(Produce)를 하는 KafkaPubApplication과 Subscribe(Consume)을 하는 KafkaSubApplication으로 구성되어 있습니다.

KafkaPubApplication은 사용자 UI(swagger-ui)로부터 name과 message로 구성된 Greeting을 입력 받아 이를 KafkaTemplate를 통해 'greeting'이란 topic로 메시지를 발행하고 사용자에게 리턴 문자("Hi there...")를 전달하고 종료됩니다.

KafkaSubApplication은 Consume(Subscribe)를 담당하며 kafka에 저장된 topic 중 greeting이라는 topic에 가입합니다. 평상시 greeting topic을 주기적으로 polling(listening) 하고 있다가 topic에 신규 메시지가 발행되면 끌어와 소비(consume)를 하게 됩니다.

 

 

이 예제에서 각각의 서비스는 Spring Boot 기반으로 구성되어 있습니다.

데모 프로젝트 디렉토리 구성

 

 

개발에 필요한 Apache Kafka 설정

이 블로그의 다음 포스팅을 참조에 apache kafka를 설정하도록 한다.

윈도우에 Apache Kafka 개발 환경 구성하기! (https://sharplee7.tistory.com/60?category=1041182 )

 

프로그램 소스코드

Producer(Publisher) 구현

1. dependency

implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-aop'
// Kafka 라이브러리 참조
implementation 'org.springframework.kafka:spring-kafka'

compile group: 'io.springfox', name: 'springfox-swagger-ui', version: '2.6.1'
compile group: 'io.springfox', name: 'springfox-swagger2', version: '2.6.1'

 

2. application.properties

server.port=8081
# 접속하고자 하는 Kafka Broker IP와 포트 지정
kafka.bootstrapAddress=localhost:9092
greeting.topic.name=greeting

 

3. Publisher 환경설정

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    // Kafka Publish를 위한 기본 환경 설정을 입력한다.
    // 서버 주소 설정
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    // 키 값 전송 방식(StringSerializer)
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // 값 전송 방식(JsonSerializer)
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
    // Kafka Publish 사용을 위한 객체 생성
    return new KafkaTemplate<>(greetingProducerFactory());

}

 

4. Controller 클래스

@RequestMapping(value = "/publish", method=RequestMethod.POST)
public Courtesy greetAndCourtesy(@RequestBody final Greeting greeting) throws Exception {
    // API로부터 인사말을 입력 파라미터로 받는다.
    return greetingService.greetAndCourtesy(greeting);
}

 

5. Service 클래스

@Override
public Courtesy greetAndCourtesy(Greeting greeting) throws Exception {
    //카프카 퍼블리싱
    greetingProducer.sendMessage(greeting);

    // Respose body에 넣을 임의의 데이터 생성
    return new Courtesy("Guest","Hi there? I'm fine and you?");
}

 

6. Publishing 클래스

public void sendMessage(Greeting greeting) {
    // Kafka 환경설정에서 만든 KafkaTemplate를 이용해 Topic에 메시지를 발행한다.
    greetingKafkaTemplate.send(greetingTopicName, greeting);
}

 

Consumer(Subscriber) 구현

1. dependency

implementation 'org.springframework.boot:spring-boot-starter-web'
// Kafka Dependency 추가
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-aop'

compile group: 'io.springfox', name: 'springfox-swagger-ui', version: '2.6.1'
compile group: 'io.springfox', name: 'springfox-swagger2', version: '2.6.1'

 

2. application.properties

server.port=8082
// Kafka 주소 설정
kafka.bootstrapAddress=localhost:9092
greeting.topic.name=greeting

 

3. Consumer 환경설정

public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    // 그룹 ID 지정, 동일 토픽에 서로 다른 Consumer가 여럿일때 group id를 다르게 지정함으로써 각각의 토픽 메시지를 찾아 갈수 있게 설정 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
}


@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
   // Kafka Listener 설정
   ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

 

4. Consumer 클래스

@KafkaListener(topics = "${greeting.topic.name}", containerFactory ="greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting, Acknowledgment ack) {
    System.out.println(">>>>>> " + greeting); // Kafka를 통해 받은 메시지를 출력한다
}

 

본 프로그램에 대한 완전한 소스는 아래의 git repository를 통해 참고 하실 수 있습니다.

https://github.com/sharplee7/kafka-demo.git

 

만약 DB접속, 실제 메시지 전송등에 대해 조금 더 잘된 예제를 보고 싶다면 이 블로그의 다음 글을 추천한다.

https://sharplee7.tistory.com/63

 

Spring Boot를 이용한 Kafka Pub/Sub 예제

Kafka Pub Sub 모델 링크드인(Linkedin)은 초창기 Point to Point 구조를 이용해 시스템간 데이터 전송을 사용하다가 증가하는 메시지 전송에 대해 대응을 하지 못했다. Linkedin은 이 문제를 해결하기 위해

sharplee7.tistory.com

 

반응형

+ Recent posts