Kafka에서 이벤트를 소비/생성해야 하는 Spring Boot 기반 애플리케이션이 있을 경우 2가지 라이브러리 중 선택해 사용할 수 있다.
- Kafka Clients
- Spring Kafka
둘 중 어떤 라이브러리를 사용할 것인가에 대한 스택오버플로우의 한 게시물에서는 이렇게 설명하고 있다.
Spring Kafka는 핵심 의존성 주입으로 대표되는 Spring 개념을 적용하는데 도움된다.
이것은 메시지를 보내는데 사용되는 ‘템플릿’을 높은 수준의 추상화로 제공한다.
또한, @KafkaListener 어노테이션과 ‘Listener Container’를 사용해 메시지 기반의 POJO(Message-driven POJOs)를 지원한다.
두 가지 라이브러리에 대해 좀 더 자세히 알아보자.
Kafka Clients
Kafka Clients는 Apache Kafka와 상호 작용하기 위해 사용되는 공식적인 클라이언트 라이브러리의 집합이다.
순수한 Kafka 클라이언트 라이브러리로, Spring Kafka의 추가적인 기능과 자동 구성을 활용하지 않는다.
이 라이브러리는 다양한 언어로 제공되며, 프로듀서(Producer)와 컨슈머(Consumer)와 같은 다양한 역할을 수행할 수 있다.
Java, Python, C/C++, Go, Ruby 등 다양한 언어에서 사용할 수 있으며, 각각의 언어에 대한 공식 클라이언트 라이브러리가 있다.
Kafka Clients를 사용하여 메시지를 생산하고, 메시지를 소비하며, Kafka 클러스터와 상호 작용할 수 있다.
주로 Kafka 프로듀서는 메시지를 Kafka 토픽으로 보내고, 컨슈머는 토픽에서 메시지를 읽어오는 역할을 한다.
Spring Kafka
Spring Kafka는 Spring Framework를 기반으로 한 Apache Kafka의 간편한 통합을 제공하는 라이브러리다.
Kafka Clients를 기반으로 구축되며, Kafka를 사용하는 애플리케이션을 더 쉽게 작성할 수 있도록 도와준다.
Spring Boot와 함께 사용할 때 특히 편리한데, Spring Boot의 Autoconfiguration 기능을 통해 Kafka와의 통합에 대한 구성이 거의 없어도 바로 시작할 수 있다.
Spring Kafka는 프로듀서와 컨슈머를 쉽게 설정하고 메시지를 처리할 수 있는 다양한 기능과 유틸리티를 제공한다.
또한, Spring의 특성을 이용해 메시지 핸들러를 정의하고 Kafka 토픽과의 매핑을 간단하게 설정할 수 있다.
즉, Kafka Clients는 Apache Kafka와 통신하는 순수한 Kafka 클라이언트 라이브러리를 의미하며, Spring Kafka는 Spring Framework를 기반으로 Kafka를 더 쉽게 통합할 수 있도록 도와주는 라이브러리다.
Kafka Clients 사용 예시
Kafka Clients는 Spring Boot의 자동 설정을 사용하지 않기 때문에 Producer와 Consumer의 연결과 설정에 대해 더 많은 관리가 필요하다.
1. build.gradle kafka-clients 라이브러리 추가
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.8.2'
}
2. application.yml 설정
Kafka 클러스터의 브로커 주소를 지정한다.
spring:
kafka:
bootstrap-servers: localhost:9092
producer의 key.serializer, value.serializer, consumer의 group.id, key.deserializer, value.deserializer 등을 yml에서 설정해줄 수 있긴 하지만 나는 따로 클래스에서 설정해주기로 했다.
만약 yml에서 이를 설정해주고 싶으면 아래와 같이 설정해주면 될 것 같다.
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: groupId
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3. Kafka Producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducer {
@Value("${spring.kafka.bootstrap-servers}")
String BOOTSTRAP_SERVERS;
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String topic = "topic";
String message = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
producer.send(record);
producer.close();
}
}
4. Kafka Consumer
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumer {
@Value("${spring.kafka.bootstrap-servers}")
String BOOTSTRAP_SERVERS;
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("group.id", "groupId");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
String topic = "topic";
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
records.forEach(record -> {
System.out.println("Received message: " + record.value());
});
}
}
}
Kafka Clients 라이브러리를 사용해 간단한 Kafka Producer와 Kafka Consumer를 구현했다.
이 경우에는 Kafka 설정을 코드에 직접 명시적으로 지정해야 한다.(물론 앞서 말했듯 yml 파일에서 설정을 해줄 수도 있으나, 코드에서 직접 설정하도록 구현했다.)
설정은 Properties 객체를 사용하여 구성되며, Producer와 Consumer에 필요한 주요 설정들을 지정한다.
Spring Kafka 사용 예시
Spring Kafka는 Spring Boot와 함께 사용할 때 편리하며, 자동 설정 기능을 활용해 Kafka Producer와 Consumer를 쉽게 구성할 수 있다.
1. build.gradle에 Spring Kafka 의존성 추가
dependencies {
implementation 'org.springframework.kafka:spring-kafka:2.8.0' // Spring Kafka 의존성 추가
}
2. application.yml 설정
spring:
kafka:
bootstrap-servers: localhost:9092
3. ConsumerFactory
ConsumerFactory는 Spring Kafka에서 Kafka Consumer 인스턴스를 생성하기 위한 팩토리(Factory) 인터페이스다.
Kafka Consumer는 Kafka 토픽에서 메시지를 읽어오는 역할을 담당하는데, 이러한 Kafka Consumer를 생성하고 구성하기 위해 ConsumerFactory를 사용한다.
Spring Kafka에서는 ConsumerFactory를 사용하여 Kafka 컨슈머를 생성하고 프로퍼티를 설정하는 것이 편리하다.
ConsumerFactory를 통해 Kafka 클러스터의 브로커 정보, 그룹 ID, 키 및 값의 직렬화/역직렬화 방법 등을 구성할 수 있다.
이렇게 구성된 ConsumerFactory는 Spring Kafka에 의해 관리되며, 애플리케이션에서 필요할 때마다 Kafka Consumer를 만들어 제공한다.
ConsumerFactory는 Spring Kafka의 org.springframework.kafka.core.ConsumerFactory 인터페이스를 구현해야 한다.
각각의 메시징 프로토콜(Kafka, RabbitMQ 등)에 따라 ConsumerFactory 인터페이스를 구현하는 구체적인 클래스들이 제공되며, Kafka의 경우 org.springframework.kafka.core.DefaultKafkaConsumerFactory 클래스를 사용하는 것이 일반적이다.
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
String BOOTSTRAP_SERVERS;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
return new DefaultKafkaConsumerFactory<>(props);
}
}
참고로 @EnableKafka는 Spring Kafka를 사용하는 데에 필요한 Kafka Consumer 관련 기능들을 활성화하는데 사용되는 어노테이션이다.
일반적으로 @EnableKafka 어노테이션은 @Configuration이 붙은 클래스에 추가된다.
4. ProducerFactory
ProducerFactory는 Kafka Producer를 생성하고 구성하기 위한 인터페이스다.
Spring Kafka에서는 ProducerFactory를사용해 Kafka Producer를 생성하고 커스터마이징할 수 있다.
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
String BOOTSTRAP_SERVERS;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
5. Kafka Producer
KafkaTemplate를 주입받아 메시지를 Kafka 토픽으로 보낸다.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component // Spring Bean으로 등록
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
// KafkaTemplate Bean을 주입해 MyKafkaProducer 객체 생성
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
public static void main(String[] args) {
// Spring Boot 애플리케이션을 실행하기 위해 SpringApplication.run()을 호출
// 여기서는 main 메서드 내에서 바로 Spring Boot 애플리케이션을 실행하도록 했다.
SpringApplication.run(KafkaProducer.class, args);
// KafkaProducer 인스턴스 생성
KafkaProducer kafkaProducer = new KafkaProducer();
// Kafka 토픽 및 메시지 생성
String topic = "topic";
String message = "Hello, Kafka!";
// 메시지 전송
kafkaProducer.sendMessage(topic, message);
}
}
6. Kafka Consumer
@KafkaListener 어노테이션을 사용해 Kafka 토픽에서 메시지를 받는다.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic", groupId = "groupId")
public void handleMessage(String message) {
System.out.println("Received message: " + message);
}
}
이렇게 Kafka Clients와 Spring Kafka에 대한 대강의 예시 코드를 구현해보았다.
참고로 Spring Bean에서 관리해주는 Spring Kafka는 어플리케이션 실행 시점에 Kafka와 연동이 정상적인지 바로 확인 가능하지만, Kafka Clients는 메세지 전송, 수신과 관련된 메서드를 호출해 실행하고 연동을 진행하기 때문에, 메세지 전공/수신 시점에 연동이 정상적으로 이루어졌는지 확인할 수 있다.
Kafka와 관련된 설정 관련해서는 추가로 더 할 수 있는 부분이긴 하지만 가장 간단하고 기본적인 것만 설정하는 것으로 진행해보았다.
코드는 필요할 때 수정하면서 구현하면 되고, 이 게시글을 정리하자면 Java 기반 어플리케이션에서 Kafka를 사용할 경우 이용할 수 있는 라이브러리는 2개이며, 그 중 Kafka Clients는 Apache Kafka와 통신하는 순수한 Kafka 클라이언트 라이브러리를 의미하고, Spring Kafka는 Spring Framework를 기반으로 Kafka를 더 쉽게 통합할 수 있도록 도와주는 라이브러리라는 것이다.