반응형

Kafka Pub Sub 모델

링크드인(Linkedin)은 초창기 Point to Point 구조를 이용해 시스템간 데이터 전송을 사용하다가 증가하는 메시지 전송에 대해 대응을 하지 못했다. Linkedin은 이 문제를 해결하기 위해 메시지 전송을 전용 시스템을 중앙에 두고 운영하는 Hub and Spoke 방식의 메시지 전송 시스템을 구성하였다. 그리고 그들이 만든 이 메시지 전송 시스템이 우리가 아는 Kafka이다.

 

Point-to-Point vs Hub-and-Spoke

 

Kafka를 이용한 Hub and Spoke 시스템을 Pub(Publish) Sub(Subscribe) Model이라 지칭하고 이를 그림으로 설명 표현하면 다음과 같다.

 

Kafka Pub Sub 모델

 

Producer(Publish)

Publish 역할을 하는 서비스는 카프카에게 전송만하게 되고 수신자가 누구인지 알 필요가 없다. 그래서 특별히 수신자를 정해놓지 않고 서버에 전송하게 되는 매우 심플한 구조를 보여준다. 

카프카는 내부에 발신자의 메세지를 토픽(Topic)이라는 주소로 저장하게 되고, 발신자는 이 토픽에게 우체원이 우편함에 편지를 넣듯 저장을 한다.

 

Consumer(Subscribe)

Subscribe(수신자)는 카프카에 원하는 토픽을 구독한다. 즉 수신자 역시 발신자가 누구인지 관심은 없고 필요한 메세지만 구독을 하게 되는 것이다. 이를 통해 Point to Point 구조에 비해서 매우 단순해지고 유지보수 뿐만 아니라 에러의 발생, 네트워크 트래픽의 장점까지 얻게 된다.

 

Pub/Sub 작동 방식

 

Kafka Pub Sub 모델

 

1. publisher는 메세지를 topic이라는 그룹에 넣어 발행한다.

2. Message Broker는 각 publisher들에서 unique한 topic명으로 그룹화된 메세지들을 받아 저장한다.

3. subscriber는 본인이 관심있어하는 메시지를 topic을 통해 구분해 해당 topic을 가입해 신규 메시지가 topic에 담길때마다 가져온다.

4. pub/sub구조에서 publisher는 본인의 topic과 그에 딸린 message를 어느 subscriber가 가져가 소비(consum)하는지 모른다. 마찬가지로 subscriber입장에서도 topic에만 관심이 있지 해당 topic의 메시지를 누가 발행(publish)하는지 관심이 없다.

5. 결과적으로 Pub Sub 모델을 이용해 결과적으로 시스템간 종속성이 없는 시스템을 구현 할 수 있다.

 

Spring Boot를 이용한 Kafka Pub Sub 개발 예제

 

시나리오

 

 

 

  1. http://localhost:8080/pub/swagger-ui/에 접속해 EmpDTO 내용 등록(empNo, empName, detpNo)
  2. EmpService의 insertEmp 호출, EmpService는 EmpDTO 등록시, EmpRepository를 통해 H2DB에 EMP 내용 등록 및 EmpPublisher를 통해 Kafka로 EmpDTO 내용 publish
  3. EmpRepository를 통해 H2DB에 EmpDTO 등록
  4. EmpPublisher를 통해 Kafka에 EmpDTO Publish
  5. Kafka로 EmpDTO 내용 Sirialize 시켜 전달

  6. Kafka에 EmpDTO 신규 등록 내용 캐치
  7. EmpSubscriber가 Kafka에 신규 등록된 EmpDTO Subscribe
  8. EmpSubscriber가 EmpService를 호출 해 insertEmp(EmpDTO) 실행
  9. EmpService를 통해 EmpDTO를 H2DB에 등록
  10. http://localhost:8090/sub/swager-ui/에 접속해 Publisher로 부터 전달 받은 EmpDTO가 나오는지 확인 

 

개발 환경

  1. jdk 1.8
  2. gradle
  3. spring boot 2.2.6
  4. mybatis
  5. swagger 2.9
  6. h2db (in-memory)
  7. mongodb (in-memory)
  8. kafka_2.12-2.1.1 (https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz

 

개발 환경 준비

1. git clone 으로 개발소스 복제
2. kafka 다운로드
3. kafka 설정
%KAFKA_HOME%\config\ 폴더에 있는 zookeeper.properties 파일을 열어 dataDir=/tmp/zookeeper 항목을 dataDir=C:/temp/zookeeper 로 변경

 

프로그램 빌드

1. 전체 프로젝트 빌드
gradlew build
2. 모듈별 빌드
gradlew :<모듈명>:build

 

프로그램 실행

1. zookeeper 실행
  1. 새로운 윈도우 코맨드 창을 열어 kafka가 설치된 디렉토리로 이동
  2. 윈도우라면 다음과 같이 bin\windows\zookeeper-server-start.bat를 실행
  3. 이때 입력 파라미터로 %KAFKA_HOME%\config\zookeeper.properties의 경로명과 파일명을 함께 넘길 것
  4. e.g. start/b C:\kafka_2.12-2.1.1\bin\windows\zookeeper-server-start.bat C:\kafka_2.12-2.1.1\config\zookeeper.properties
2. kafka 실행
  1. 새로운 윈도우 코맨드 창을 열어 kafka가 설치된 디렉토리로 이동
  2. 윈도우라면 다음과 같이 bin\windows\kafka-server-start.bat를 실행
  3. 이떄 입력 파라미터로 %KAFKA_HOME%\config\server.properties를 넘길 것
  4. e.g. start/b C:\kafka_2.12-2.1.1\bin\windows\kafka-server-start.bat C:\kafka_2.12-2.1.1\config\server.properties
  3. kafka-demo/kafka-pub/src/main/java/KafkaPubApplication을 찾아 실행
  4. kafka-demo/kafka-sub/src/main/java/KafkaSubApplication을 찾아 실행

 

프로그램 테스트

1. http://localhost:8080/pub/swagger-ui.html 호출
  1. [GET] /employee 를 클릭해 try it out >> Execute 를 실행해 데이터가 1건 등록되어 있는 것을 확인한다.(기본으로 한 건 등록되어 있다.)
  2. [POST] /employee 를 클릭해 try it out >> Execute 를 실행해 임의의 데이터를 넣고 저장한다.
  3. [GET] /employee 를 다시 실행해 데이터가 2건 등록되어 있는 것을 확인한다.
2. http://localhost:8080/sub/swagger-ui.html 호출
  1. [GET] /employee 를 클릭해 try it out >> Execute 를 실행해 pub서비스에서 방금 입력한 데이터 1건이 전송되어 온것을 확인한다.

 

Publisher 소스

dependency

dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-web'
  implementation 'org.springframework.kafka:spring-kafka'
  implementation 'org.springframework.boot:spring-boot-starter-aop'
  implementation 'com.h2database:h2'
  implementation 'org.springframework.boot:spring-boot-starter-jdbc'
  implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:2.1.1'
  implementation 'org.bgee.log4jdbc-log4j2:log4jdbc-log4j2-jdbc4.1:1.16'
  implementation 'io.springfox:springfox-boot-starter:3.0.0'

  testImplementation('org.springframework.boot:spring-boot-starter-test') {exclude group'org.junit.vintage'module'junit-vintage-engine'
  }testImplementation 'org.springframework.kafka:spring-kafka-test'
}
  • 'org.springframework.kafka:spring-kafka' : 카프카 디펜던시 추가

 

KafkaProducerConfig.java

@Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, EmpDTO> empProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, EmpDTO> empKafkaTemplateKafkaTemplate() {
        return new KafkaTemplate<>(empProducerFactory());
    }

}
  • BOOTSTRAP_SERVERS_CONFIG : 대상 Kafka 주소이다. Multi Cluster 환경일 경우 "192.168.56.107:9092,192.168.56.107:9093,192.168.56.107:9094" 와 같이 처리
  • KEY_SERIALIZER_CLASS_CONFIG : Publishing 하는 객체의 키(Property) 이름을 어떤 형식으로 넘길지 지정.
    ByteArraySerializer, JsonSerializer, StringSerializer  사용 가능
  • VALUE_SERIALIZER_CLASS_CONFIG : Publishing 하는 객체의 값을 어떤 형식으로 넘길지 지정.
    ByteArraySerializer, JsonSerializer, StringSerializer 
  • KEY_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG에서 키 이름과 키 값으로 지정할 때 사용한 클래스는 EmpDTO이다.

 

EmpPublisher.java

public class EmpPublisher {
    @Autowired
    private KafkaTemplate<String, EmpDTO> empKafkaTemplate;

    @Value(value = "${emp.topic.name}")
    private String empTopicName;

    public void publish(EmpDTO empDTO) {
        ListenableFuture<SendResult<String, EmpDTO>> future = empKafkaTemplate.send(empTopicName, empDTO);

        // 아래의 콜백은 옵션 사항
        future.addCallback(new ListenableFutureCallback<SendResult<String, EmpDTO>>() {
            @Override
            public void onSuccess(SendResult<String, EmpDTO> result) {
                EmpDTO empDTO = result.getProducerRecord().value();
                Log.info("Sent message=[" + empDTO.toString() + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }

            @Override
            public void onFailure(Throwable ex) {
                Log.info( "Unable to send message=[" + empDTO.toString() + "] due to : " + ex.getMessage());
            }
        });

    }
}

 

EmpServiceImpl.java

@Component
public class EmpServiceImpl implements EmpService{
    @Autowired
    private EmpRepository empRepository;

    @Autowired
    private EmpPublisher empPublisher;

    @Override
    public int insertEmp(EmpDTO empDTO) throws Exception {
        int ret = 0;

        if(empRepository.insertEmp(empDTO) >= 1) {
            empPublisher.publish(empDTO);
            ret =1;
        }

        return ret;
    }

    @Override
    public List<EmpDTO> getEmpList() throws Exception {
        return empRepository.selectEmpList();
    }
}

 

EmpDTO.java

public class EmpDTO {
    private String empNO=null;
    private String empName = null;
    private String deptNo = null;

    public EmpDTO(){}

    public EmpDTO(String empNO, String empName, String deptNo) {
        this.empNO = empNO;
        this.empName = empName;
        this.deptNo = deptNo;
    }
    // 생략...
}

 

Subscriber 소스

dependency

dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-web'
  implementation 'org.springframework.kafka:spring-kafka'
  implementation 'org.springframework.boot:spring-boot-starter-aop'
  implementation 'com.h2database:h2'
  implementation 'org.springframework.boot:spring-boot-starter-jdbc'
  implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:2.1.1'
  implementation 'org.bgee.log4jdbc-log4j2:log4jdbc-log4j2-jdbc4.1:1.16'
  implementation 'io.springfox:springfox-boot-starter:3.0.0'

  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }

  testImplementation 'org.springframework.kafka:spring-kafka-test'
}

 

KafkaConsumerConfig.java

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    public ConsumerFactory<String, EmpDTO2> empConsumerFactory() {
        // Pub에서 보낸 DTO의 패키지 구조에 상관 없이 전송 받을 수 있도록 TrustedPackage 설정
        JsonDeserializer<EmpDTO2> deserializer = new JsonDeserializer<>(EmpDTO2.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "emp");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, EmpDTO2> empKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, EmpDTO2> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(empConsumerFactory());
        // 필터 사용 시
        // factory.setRecordFilterStrategy(record -> record.value().contains("World"));
        return factory;
    }

}
  • JsonDeserializer<EmpDTO2> deserializer : Publisher에서 보낸 Topic의 객체를 받을 객체를 지정한다.
  • 다음의 세 속성을 지정함으로써 Publisher에서 지정한 클래스와 동일하지 않더라도 객체를 구성할 수 있게 할 수 있다.
    • deserializer.setRemoveTypeHeaders(false);
    • deserializer.addTrustedPackages("*"); 
    • deserializer.setUseTypeMapperForKey(true);
      Sub에서 객체를 받을 때 Pub에서 보낸 오브젝트의 키 값을 기준으로 값을 받는다.
  • GROUP_ID_CONFIG: 이 값을 통해 컨슈머가 속한 컨슈머 그룹의 ID를 지정한다. Zookeeper에서는 각 컨슈머 그룹의 메시지 offset을 관리하는데, 이 때 이 ID가 키로써 사용된다. 따라서 컨슈머 그룹 ID가 같으면 모두 같은 컨슈머 그룹에 속한 것으로 간주되며 메시지 offset 값 또한 공유된다.(서비스명-컨슈명 등으로 마이크로서비스 별 unique 값 지정 필요)

EmpSubscriber.java

@Component
public class EmpSubscriber {
    @Autowired
    EmpService empService;

    @KafkaListener(topics = "${emp.topic.name}", containerFactory = "empKafkaListenerContainerFactory")
    public void empListener(EmpDTO2 empDTO) {
        try {
            Log.info("▶▶▶▶▶▶▶▶ RECEIVED DATA FROM KAFKA: " + empDTO.toString());
            empService.insertEmp(empDTO);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

EmpServiceImpl.java

@Component
public class EmpServiceImpl implements EmpService{
    @Autowired
    private EmpRepository empRepository;

    @Autowired
    private EmpPublisher empPublisher;

    @Override
    public int insertEmp(EmpDTO empDTO) throws Exception {
        int ret = 0;

        if(empRepository.insertEmp(empDTO) >= 1) {
            empPublisher.publish(empDTO);
            ret =1;
        }

        return ret;
    }

    @Override
    public List<EmpDTO> getEmpList() throws Exception {
        return empRepository.selectEmpList();
    }
}

 

EmpDTO2.java

public class EmpDTO2 {

    private String empNO=null;
    private String empName = null;
    private String deptNo = null;
    private int number =0;

    // 생략...
}

 

프로그램  전체 소스

https://github.com/sharplee7/kafka-demo2.git

 

GitHub - sharplee7/kafka-demo2: kafka pub/sub demo

kafka pub/sub demo . Contribute to sharplee7/kafka-demo2 development by creating an account on GitHub.

github.com

 

끝~

반응형

+ Recent posts