How do you create a Kafka consumer in Spring Boot?
Table of Contents
- Introduction
- Steps to Implement a Kafka Consumer in Spring Boot
- 1. Add Kafka Dependencies
- 2. Configure Kafka in
**application.yml**
or**application.properties**
- 3. Create Kafka Consumer Service with
**@KafkaListener**
- 4. Handle Message Consumption
- 5. Consume Kafka Messages with Offset Management
- 6. Test the Kafka Consumer
- 7. Handle Message Acknowledgments (Optional)
- 8. Scaling Kafka Consumers
- Conclusion
Introduction
Apache Kafka is widely used for handling real-time event streaming. In a Spring Boot application, integrating a Kafka consumer allows you to consume messages from Kafka topics and process them asynchronously. Spring Kafka provides an abstraction layer to simplify consuming Kafka messages, and the @KafkaListener
annotation is the key to implementing a Kafka consumer.
In this guide, we will walk you through how to set up a Kafka consumer in Spring Boot, configure it properly, and create a service to consume messages from a Kafka topic.
Steps to Implement a Kafka Consumer in Spring Boot
1. Add Kafka Dependencies
Before setting up a Kafka consumer, you need to add the necessary dependencies for Kafka in your Spring Boot project.
Maven Configuration
Add the following dependencies to your pom.xml
:
Gradle Configuration
For Gradle, add this to your build.gradle
file:
These dependencies include everything necessary to work with Kafka and Spring Boot, including the KafkaListener to consume messages.
2. Configure Kafka in **application.yml**
or **application.properties**
Next, you need to configure the Kafka properties in your application.yml
or application.properties
file. This configuration will define the Kafka broker addresses and other necessary properties.
Example application.yml
Configuration:
bootstrap-servers
: Kafka brokers that the consumer will connect to.group-id
: A unique ID for the consumer group that helps in distributing message consumption among different consumers.key-serializer
andvalue-serializer
: Set the deserializer for key and value toStringDeserializer
, as this example uses simple string messages.enable-auto-commit
: Set tofalse
if you want to manually commit the offset, allowing for more control over message consumption.auto-offset-reset
: Determines where to start reading messages if no previous offset is stored.
3. Create Kafka Consumer Service with **@KafkaListener**
In Spring Kafka, the @KafkaListener
annotation is used to define methods that will listen to Kafka topics. You can create a service class where Kafka consumer logic is implemented.
Example Kafka Consumer Service
**@KafkaListener**
: This annotation listens for messages from the specified Kafka topic (test-topic
in this case).**groupId**
: Specifies the consumer group that this listener belongs to, ensuring that multiple consumers can read from the same topic without duplicating messages.
4. Handle Message Consumption
The @KafkaListener
-annotated method will be triggered every time a new message arrives in the topic it’s subscribed to. You can define the message type and include any processing logic as required.
Example with JSON Messages:
If you're consuming JSON data, you may want to deserialize the incoming messages into Java objects. You can use @KafkaListener
with a MessageConverter
or simply a custom deserializer.
**CustomObject**
: This can be a POJO (Plain Old Java Object) representing the message structure.- Spring Kafka automatically maps the incoming JSON data to the Java object, provided you've configured the appropriate deserializer.
5. Consume Kafka Messages with Offset Management
If you've disabled auto-commit (enable-auto-commit: false
), you can manually manage offsets for each message. This ensures that you have complete control over when the offset is committed.
Example of Manual Offset Commit
6. Test the Kafka Consumer
After configuring your Kafka consumer, you can test it by sending messages to the Kafka topic (test-topic
) and observing the output.
Example CURL Command to Send Message to Kafka:
If you want to send a message to Kafka from your local machine, you can use Kafka's command line tools or a producer client.
For example, using Kafka’s command-line tool:
Then enter a message like "Hello from Kafka"
to send to the test-topic
.
The Spring Boot consumer will log the consumed message:
7. Handle Message Acknowledgments (Optional)
If you need more control over message acknowledgment, you can manually acknowledge the messages after processing. You can use the Acknowledgment
parameter in your @KafkaListener
-annotated method.
In this case, the message will be acknowledged after the consumer method completes.
8. Scaling Kafka Consumers
You can scale the Kafka consumer by increasing the number of consumer instances or by adding partitions to your Kafka topics. Each consumer in the same group consumes messages from different partitions. This allows your application to scale efficiently by distributing the load.
Conclusion
In this guide, we’ve walked through the steps of setting up a Kafka consumer in Spring Boot. By using the @KafkaListener
annotation, you can easily consume messages from Kafka topics and integrate Kafka with your Spring Boot application. Additionally, manual offset management and acknowledgments provide finer control over message processing.
With these steps, you can create a reliable and scalable Kafka consumer in Spring Boot, ideal for real-time data processing, event-driven systems, and distributed architectures.