Kafka Pub Sub 모델
링크드인(Linkedin)은 초창기 Point to Point 구조를 이용해 시스템간 데이터 전송을 사용하다가 증가하는 메시지 전송에 대해 대응을 하지 못했다. Linkedin은 이 문제를 해결하기 위해 메시지 전송을 전용 시스템을 중앙에 두고 운영하는 Hub and Spoke 방식의 메시지 전송 시스템을 구성하였다. 그리고 그들이 만든 이 메시지 전송 시스템이 우리가 아는 Kafka이다.
Kafka를 이용한 Hub and Spoke 시스템을 Pub(Publish) Sub(Subscribe) Model이라 지칭하고 이를 그림으로 설명 표현하면 다음과 같다.
Producer(Publish)
Publish 역할을 하는 서비스는 카프카에게 전송만하게 되고 수신자가 누구인지 알 필요가 없다. 그래서 특별히 수신자를 정해놓지 않고 서버에 전송하게 되는 매우 심플한 구조를 보여준다.
카프카는 내부에 발신자의 메세지를 토픽(Topic)이라는 주소로 저장하게 되고, 발신자는 이 토픽에게 우체원이 우편함에 편지를 넣듯 저장을 한다.
Consumer(Subscribe)
Subscribe(수신자)는 카프카에 원하는 토픽을 구독한다. 즉 수신자 역시 발신자가 누구인지 관심은 없고 필요한 메세지만 구독을 하게 되는 것이다. 이를 통해 Point to Point 구조에 비해서 매우 단순해지고 유지보수 뿐만 아니라 에러의 발생, 네트워크 트래픽의 장점까지 얻게 된다.
Pub/Sub 작동 방식
1. publisher는 메세지를 topic이라는 그룹에 넣어 발행한다.
2. Message Broker는 각 publisher들에서 unique한 topic명으로 그룹화된 메세지들을 받아 저장한다.
3. subscriber는 본인이 관심있어하는 메시지를 topic을 통해 구분해 해당 topic을 가입해 신규 메시지가 topic에 담길때마다 가져온다.
4. pub/sub구조에서 publisher는 본인의 topic과 그에 딸린 message를 어느 subscriber가 가져가 소비(consum)하는지 모른다. 마찬가지로 subscriber입장에서도 topic에만 관심이 있지 해당 topic의 메시지를 누가 발행(publish)하는지 관심이 없다.
5. 결과적으로 Pub Sub 모델을 이용해 결과적으로 시스템간 종속성이 없는 시스템을 구현 할 수 있다.
Spring Boot를 이용한 Kafka Pub Sub 개발 예제
시나리오
- http://localhost:8080/pub/swagger-ui/에 접속해 EmpDTO 내용 등록(empNo, empName, detpNo)
- EmpService의 insertEmp 호출, EmpService는 EmpDTO 등록시, EmpRepository를 통해 H2DB에 EMP 내용 등록 및 EmpPublisher를 통해 Kafka로 EmpDTO 내용 publish
- EmpRepository를 통해 H2DB에 EmpDTO 등록
- EmpPublisher를 통해 Kafka에 EmpDTO Publish
- Kafka로 EmpDTO 내용 Sirialize 시켜 전달
- Kafka에 EmpDTO 신규 등록 내용 캐치
- EmpSubscriber가 Kafka에 신규 등록된 EmpDTO Subscribe
- EmpSubscriber가 EmpService를 호출 해 insertEmp(EmpDTO) 실행
- EmpService를 통해 EmpDTO를 H2DB에 등록
- http://localhost:8090/sub/swager-ui/에 접속해 Publisher로 부터 전달 받은 EmpDTO가 나오는지 확인
개발 환경
|
개발 환경 준비
1. git clone 으로 개발소스 복제 2. kafka 다운로드 3. kafka 설정 %KAFKA_HOME%\config\ 폴더에 있는 zookeeper.properties 파일을 열어 dataDir=/tmp/zookeeper 항목을 dataDir=C:/temp/zookeeper 로 변경 |
프로그램 빌드
1. 전체 프로젝트 빌드 gradlew build 2. 모듈별 빌드 gradlew :<모듈명>:build |
프로그램 실행
1. zookeeper 실행 1. 새로운 윈도우 코맨드 창을 열어 kafka가 설치된 디렉토리로 이동 2. 윈도우라면 다음과 같이 bin\windows\zookeeper-server-start.bat를 실행 3. 이때 입력 파라미터로 %KAFKA_HOME%\config\zookeeper.properties의 경로명과 파일명을 함께 넘길 것 4. e.g. start/b C:\kafka_2.12-2.1.1\bin\windows\zookeeper-server-start.bat C:\kafka_2.12-2.1.1\config\zookeeper.properties 2. kafka 실행 1. 새로운 윈도우 코맨드 창을 열어 kafka가 설치된 디렉토리로 이동 2. 윈도우라면 다음과 같이 bin\windows\kafka-server-start.bat를 실행 3. 이떄 입력 파라미터로 %KAFKA_HOME%\config\server.properties를 넘길 것 4. e.g. start/b C:\kafka_2.12-2.1.1\bin\windows\kafka-server-start.bat C:\kafka_2.12-2.1.1\config\server.properties 3. kafka-demo/kafka-pub/src/main/java/KafkaPubApplication을 찾아 실행 4. kafka-demo/kafka-sub/src/main/java/KafkaSubApplication을 찾아 실행 |
프로그램 테스트
1. http://localhost:8080/pub/swagger-ui.html 호출 1. [GET] /employee 를 클릭해 try it out >> Execute 를 실행해 데이터가 1건 등록되어 있는 것을 확인한다.(기본으로 한 건 등록되어 있다.) 2. [POST] /employee 를 클릭해 try it out >> Execute 를 실행해 임의의 데이터를 넣고 저장한다. 3. [GET] /employee 를 다시 실행해 데이터가 2건 등록되어 있는 것을 확인한다. 2. http://localhost:8080/sub/swagger-ui.html 호출 1. [GET] /employee 를 클릭해 try it out >> Execute 를 실행해 pub서비스에서 방금 입력한 데이터 1건이 전송되어 온것을 확인한다. |
Publisher 소스
dependency
dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-aop' implementation 'com.h2database:h2' implementation 'org.springframework.boot:spring-boot-starter-jdbc' implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:2.1.1' implementation 'org.bgee.log4jdbc-log4j2:log4jdbc-log4j2-jdbc4.1:1.16' implementation 'io.springfox:springfox-boot-starter:3.0.0' testImplementation('org.springframework.boot:spring-boot-starter-test') {exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' }testImplementation 'org.springframework.kafka:spring-kafka-test' } |
- 'org.springframework.kafka:spring-kafka' : 카프카 디펜던시 추가
KafkaProducerConfig.java
@Configuration public class KafkaProducerConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public ProducerFactory<String, EmpDTO> empProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, EmpDTO> empKafkaTemplateKafkaTemplate() { return new KafkaTemplate<>(empProducerFactory()); } } |
- BOOTSTRAP_SERVERS_CONFIG : 대상 Kafka 주소이다. Multi Cluster 환경일 경우 "192.168.56.107:9092,192.168.56.107:9093,192.168.56.107:9094" 와 같이 처리
- KEY_SERIALIZER_CLASS_CONFIG : Publishing 하는 객체의 키(Property) 이름을 어떤 형식으로 넘길지 지정.
ByteArraySerializer, JsonSerializer, StringSerializer 사용 가능 - VALUE_SERIALIZER_CLASS_CONFIG : Publishing 하는 객체의 값을 어떤 형식으로 넘길지 지정.
ByteArraySerializer, JsonSerializer, StringSerializer - KEY_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG에서 키 이름과 키 값으로 지정할 때 사용한 클래스는 EmpDTO이다.
EmpPublisher.java
public class EmpPublisher { @Autowired private KafkaTemplate<String, EmpDTO> empKafkaTemplate; @Value(value = "${emp.topic.name}") private String empTopicName; public void publish(EmpDTO empDTO) { ListenableFuture<SendResult<String, EmpDTO>> future = empKafkaTemplate.send(empTopicName, empDTO); // 아래의 콜백은 옵션 사항 future.addCallback(new ListenableFutureCallback<SendResult<String, EmpDTO>>() { @Override public void onSuccess(SendResult<String, EmpDTO> result) { EmpDTO empDTO = result.getProducerRecord().value(); Log.info("Sent message=[" + empDTO.toString() + "] with offset=[" + result.getRecordMetadata().offset() + "]"); } @Override public void onFailure(Throwable ex) { Log.info( "Unable to send message=[" + empDTO.toString() + "] due to : " + ex.getMessage()); } }); } } |
EmpServiceImpl.java
@Component public class EmpServiceImpl implements EmpService{ @Autowired private EmpRepository empRepository; @Autowired private EmpPublisher empPublisher; @Override public int insertEmp(EmpDTO empDTO) throws Exception { int ret = 0; if(empRepository.insertEmp(empDTO) >= 1) { empPublisher.publish(empDTO); ret =1; } return ret; } @Override public List<EmpDTO> getEmpList() throws Exception { return empRepository.selectEmpList(); } } |
EmpDTO.java
public class EmpDTO { private String empNO=null; private String empName = null; private String deptNo = null; public EmpDTO(){} public EmpDTO(String empNO, String empName, String deptNo) { this.empNO = empNO; this.empName = empName; this.deptNo = deptNo; } // 생략... } |
Subscriber 소스
dependency
dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-aop' implementation 'com.h2database:h2' implementation 'org.springframework.boot:spring-boot-starter-jdbc' implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:2.1.1' implementation 'org.bgee.log4jdbc-log4j2:log4jdbc-log4j2-jdbc4.1:1.16' implementation 'io.springfox:springfox-boot-starter:3.0.0' testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } testImplementation 'org.springframework.kafka:spring-kafka-test' } |
KafkaConsumerConfig.java
@EnableKafka @Configuration public class KafkaConsumerConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; public ConsumerFactory<String, EmpDTO2> empConsumerFactory() { // Pub에서 보낸 DTO의 패키지 구조에 상관 없이 전송 받을 수 있도록 TrustedPackage 설정 JsonDeserializer<EmpDTO2> deserializer = new JsonDeserializer<>(EmpDTO2.class); deserializer.setRemoveTypeHeaders(false); deserializer.addTrustedPackages("*"); deserializer.setUseTypeMapperForKey(true); Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, "emp"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer); } @Bean public ConcurrentKafkaListenerContainerFactory<String, EmpDTO2> empKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, EmpDTO2> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(empConsumerFactory()); // 필터 사용 시 // factory.setRecordFilterStrategy(record -> record.value().contains("World")); return factory; } } |
- JsonDeserializer<EmpDTO2> deserializer : Publisher에서 보낸 Topic의 객체를 받을 객체를 지정한다.
- 다음의 세 속성을 지정함으로써 Publisher에서 지정한 클래스와 동일하지 않더라도 객체를 구성할 수 있게 할 수 있다.
- deserializer.setRemoveTypeHeaders(false);
- deserializer.addTrustedPackages("*");
- deserializer.setUseTypeMapperForKey(true);
Sub에서 객체를 받을 때 Pub에서 보낸 오브젝트의 키 값을 기준으로 값을 받는다.
- GROUP_ID_CONFIG: 이 값을 통해 컨슈머가 속한 컨슈머 그룹의 ID를 지정한다. Zookeeper에서는 각 컨슈머 그룹의 메시지 offset을 관리하는데, 이 때 이 ID가 키로써 사용된다. 따라서 컨슈머 그룹 ID가 같으면 모두 같은 컨슈머 그룹에 속한 것으로 간주되며 메시지 offset 값 또한 공유된다.(서비스명-컨슈명 등으로 마이크로서비스 별 unique 값 지정 필요)
EmpSubscriber.java
@Component public class EmpSubscriber { @Autowired EmpService empService; @KafkaListener(topics = "${emp.topic.name}", containerFactory = "empKafkaListenerContainerFactory") public void empListener(EmpDTO2 empDTO) { try { Log.info("▶▶▶▶▶▶▶▶ RECEIVED DATA FROM KAFKA: " + empDTO.toString()); empService.insertEmp(empDTO); } catch (Exception e) { e.printStackTrace(); } } } |
EmpServiceImpl.java
@Component public class EmpServiceImpl implements EmpService{ @Autowired private EmpRepository empRepository; @Autowired private EmpPublisher empPublisher; @Override public int insertEmp(EmpDTO empDTO) throws Exception { int ret = 0; if(empRepository.insertEmp(empDTO) >= 1) { empPublisher.publish(empDTO); ret =1; } return ret; } @Override public List<EmpDTO> getEmpList() throws Exception { return empRepository.selectEmpList(); } } |
EmpDTO2.java
public class EmpDTO2 { private String empNO=null; private String empName = null; private String deptNo = null; private int number =0; // 생략... } |
프로그램 전체 소스
https://github.com/sharplee7/kafka-demo2.git
끝~
'Middleware > Message Queue' 카테고리의 다른 글
Kafka 파티션 (0) | 2022.06.03 |
---|---|
Kafka 설정 - 포인트만 (0) | 2022.04.24 |
Spring Boot에서 Apache Kafka 사용 ... 2/2 (0) | 2021.08.10 |
Spring Boot에서 Apache Kafka 사용 ... 1/2 (0) | 2021.08.10 |
윈도우에서 Apache Kafka 개발 환경 만들기 (0) | 2021.08.10 |