How do you create a Kafka consumer in Spring Boot?

Table of Contents

Introduction

Apache Kafka is widely used for handling real-time event streaming. In a Spring Boot application, integrating a Kafka consumer allows you to consume messages from Kafka topics and process them asynchronously. Spring Kafka provides an abstraction layer to simplify consuming Kafka messages, and the @KafkaListener annotation is the key to implementing a Kafka consumer.

In this guide, we will walk you through how to set up a Kafka consumer in Spring Boot, configure it properly, and create a service to consume messages from a Kafka topic.

Steps to Implement a Kafka Consumer in Spring Boot

1. Add Kafka Dependencies

Before setting up a Kafka consumer, you need to add the necessary dependencies for Kafka in your Spring Boot project.

Maven Configuration

Add the following dependencies to your pom.xml:

Gradle Configuration

For Gradle, add this to your build.gradle file:

These dependencies include everything necessary to work with Kafka and Spring Boot, including the KafkaListener to consume messages.

2. Configure Kafka in **application.yml** or **application.properties**

Next, you need to configure the Kafka properties in your application.yml or application.properties file. This configuration will define the Kafka broker addresses and other necessary properties.

Example application.yml Configuration:

  • bootstrap-servers: Kafka brokers that the consumer will connect to.
  • group-id: A unique ID for the consumer group that helps in distributing message consumption among different consumers.
  • key-serializer and value-serializer: Set the deserializer for key and value to StringDeserializer, as this example uses simple string messages.
  • enable-auto-commit: Set to false if you want to manually commit the offset, allowing for more control over message consumption.
  • auto-offset-reset: Determines where to start reading messages if no previous offset is stored.

3. Create Kafka Consumer Service with **@KafkaListener**

In Spring Kafka, the @KafkaListener annotation is used to define methods that will listen to Kafka topics. You can create a service class where Kafka consumer logic is implemented.

Example Kafka Consumer Service

  • **@KafkaListener**: This annotation listens for messages from the specified Kafka topic (test-topic in this case).
  • **groupId**: Specifies the consumer group that this listener belongs to, ensuring that multiple consumers can read from the same topic without duplicating messages.

4. Handle Message Consumption

The @KafkaListener-annotated method will be triggered every time a new message arrives in the topic it’s subscribed to. You can define the message type and include any processing logic as required.

Example with JSON Messages:

If you're consuming JSON data, you may want to deserialize the incoming messages into Java objects. You can use @KafkaListener with a MessageConverter or simply a custom deserializer.

  • **CustomObject**: This can be a POJO (Plain Old Java Object) representing the message structure.
  • Spring Kafka automatically maps the incoming JSON data to the Java object, provided you've configured the appropriate deserializer.

5. Consume Kafka Messages with Offset Management

If you've disabled auto-commit (enable-auto-commit: false), you can manually manage offsets for each message. This ensures that you have complete control over when the offset is committed.

Example of Manual Offset Commit

6. Test the Kafka Consumer

After configuring your Kafka consumer, you can test it by sending messages to the Kafka topic (test-topic) and observing the output.

Example CURL Command to Send Message to Kafka:

If you want to send a message to Kafka from your local machine, you can use Kafka's command line tools or a producer client.

For example, using Kafka’s command-line tool:

Then enter a message like "Hello from Kafka" to send to the test-topic.

The Spring Boot consumer will log the consumed message:

7. Handle Message Acknowledgments (Optional)

If you need more control over message acknowledgment, you can manually acknowledge the messages after processing. You can use the Acknowledgment parameter in your @KafkaListener-annotated method.

In this case, the message will be acknowledged after the consumer method completes.

8. Scaling Kafka Consumers

You can scale the Kafka consumer by increasing the number of consumer instances or by adding partitions to your Kafka topics. Each consumer in the same group consumes messages from different partitions. This allows your application to scale efficiently by distributing the load.

Conclusion

In this guide, we’ve walked through the steps of setting up a Kafka consumer in Spring Boot. By using the @KafkaListener annotation, you can easily consume messages from Kafka topics and integrate Kafka with your Spring Boot application. Additionally, manual offset management and acknowledgments provide finer control over message processing.

With these steps, you can create a reliable and scalable Kafka consumer in Spring Boot, ideal for real-time data processing, event-driven systems, and distributed architectures.

Similar Questions