Kafka Example Messaging with Spring Boot

In our previous articles About RabbitMQ and RabbitMQ Example, we explored what RabbitMQ is and how it works. Now, let’s dive into Apache Kafka and put that knowledge into practice by implementing messaging in Spring Boot. In this article, we’ll create two projects—a producer and a consumer—using Spring Initializr. Both projects will share a similar dependency configuration, and their final pom.xml files are nearly identical. We’ll also provide an introduction to Kafka and its key concepts.

 

 

1. What We Are Doing

We will build a simple monitoring solution where:

  • The Producer sends messages to Kafka topics (e.g., monitoring.laptop, monitoring.phone, and monitoring.tablet) using a scheduled task.
  • The Consumer listens for messages from these topics and processes them accordingly.

This architecture demonstrates how to use Apache Kafka as a distributed streaming platform in a Spring Boot environment.

2. Introduction to Apache Kafka

Apache Kafka is a high-throughput distributed messaging system designed for building real-time data pipelines and streaming applications. Its main capabilities include:

  • Publishing and subscribing to streams of records: Kafka allows producers to send messages to topics, and consumers to subscribe to these topics.
  • Storing streams of records: Messages are persisted on disk and can be replayed as needed.
  • Processing streams of records: Kafka supports real-time processing of data, making it ideal for handling large-scale, continuous data flows.

Kafka’s scalability, durability, and fault-tolerance make it a popular choice for modern data-intensive applications.

3. Project Setup

Both the producer and consumer projects are created using Spring Initializr. When setting up your projects, be sure to include the Spring Kafka dependency. The final pom.xml for both projects is very similar, as they share the same core dependencies:

  • spring-boot-starter
  • spring-kafka
  • spring-boot-starter-test
  • spring-kafka-test

Final pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.4.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>tech.devblueprint</groupId>
    <artifactId>kafka-consumer-spring-boot-example</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-consumer-spring-boot-example</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

In addition, we use a docker-compose.yml file to quickly set up a Kafka broker and its required Zookeeper service. An example snippet from the docker-compose configuration ( to run, execute docker-compose up -d ):

version: '3'
services:
  devblueprint-zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: devblueprint-zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  devblueprint-kafka:
    image: confluentinc/cp-kafka:latest
    container_name: devblueprint-kafka
    depends_on:
      - devblueprint-zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: devblueprint-zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://devblueprint-kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1


  devblueprint-kafdrop:
    image: obsidiandynamics/kafdrop
    container_name: devblueprint-kafdrop
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: devblueprint-kafka:9092
      SERVER_SERVLET_CONTEXTPATH: /
    depends_on:
      - devblueprint-kafka

This configuration starts Zookeeper and Kafka in Docker, making it easy to test and develop your messaging applications locally.

Zookeeper is used to manage cluster configuration and coordination for Kafka, while Kafkadrop provides a web-based interface for visualizing and monitoring Kafka topics, partitions, and consumer groups.

4. Project Implementation

Producer Project

Key Components:

KafkaProducerConfig.class (Producer Configuration):

package tech.devblueprint.kafka_producer_spring_boot_example.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    // Configure producer properties
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        // Kafka broker address (assumes Kafka is running on localhost:9092)
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    // KafkaTemplate for sending messages
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

This class sets up the Kafka producer properties. It defines the broker address (e.g., localhost:29092) and configures the key and value serializers using StringSerializer. The configuration creates a ProducerFactory and a KafkaTemplate for sending messages.

MessageProducer.class (Message Sending Service):

package tech.devblueprint.kafka_producer_spring_boot_example.service;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // Sends a message to a specified topic
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

This service encapsulates the logic to send messages to Kafka. It leverages the KafkaTemplate to send messages to specified topics. The method takes in a topic and a message, and then publishes the message to Kafka.

ScheduledMessageSender.class (Scheduled Message Sender):

package tech.devblueprint.kafka_producer_spring_boot_example.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class ScheduledMessageSender {

    private static final Logger logger = LoggerFactory.getLogger(ScheduledMessageSender.class);

    private final MessageProducer messageProducer;
    private int counter = 0;

    public ScheduledMessageSender(MessageProducer messageProducer) {
        this.messageProducer = messageProducer;
    }

    // Sends a monitoring message every 5 seconds.
    // Cycles through topics representing different device types.
    @Scheduled(fixedDelay = 5000)
    public void sendMonitoringMessage() {
        // Define topics for different device types
        String[] topics = {"monitoring.laptop", "monitoring.phone", "monitoring.tablet"};
        String topic = topics[counter % topics.length];
        counter++;
        String message = String.format("Monitoring data at %d for topic %s", System.currentTimeMillis(), topic);
        logger.info(message);
        messageProducer.sendMessage(topic, message);
    }
}

Using the @Scheduled annotation, this component sends monitoring messages every few seconds. It cycles through an array of topics (e.g., monitoring.laptop, monitoring.phone, and monitoring.tablet) and constructs a message with dynamic content (such as the current timestamp). The message is then sent via the MessageProducer.

Consumer Project

Key Components:

KafkaConsumerConfig.class (Consumer Configuration):

package tech.devblueprint.kafka_consumer_spring_boot_example.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    // Configure consumer properties
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        // Kafka broker address
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
        // Consumer group ID for monitoring consumers
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "monitoringGroupDev");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    // Kafka listener container factory
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
         factory.setConsumerFactory(consumerFactory());
         return factory;
    }
}

This configuration class defines Kafka consumer properties. It sets the broker address, consumer group ID (e.g., monitoringGroupDev), and specifies the key and value deserializers using StringDeserializer. The configuration also creates a ConcurrentKafkaListenerContainerFactory for managing consumer listeners.

MessageConsumer.class (Message Consumption Service):

package tech.devblueprint.kafka_consumer_spring_boot_example.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class MessageConsumer {

    private static final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

    // Listener for messages from the "monitoring.laptop" topic
    @KafkaListener(topics = "monitoring.laptop", groupId = "monitoringGroupDev", id = "laptopListener")
    public void listenLaptop(String message) {
        logger.info("Received message from monitoring.laptop: {}", message);
    }

    // Listener for messages from the "monitoring.phone" topic
    @KafkaListener(topics = "monitoring.phone", groupId = "monitoringGroupDev", id = "phoneListener")
    public void listenPhone(String message) {
        logger.info("Received message from monitoring.phone: {}", message);
    }

    // Listener for messages from the "monitoring.tablet" topic
    @KafkaListener(topics = "monitoring.tablet", groupId = "monitoringGroupDev", id = "tabletListener")
    public void listenTablet(String message) {
        logger.info("Received message from monitoring.tablet: {}", message);
    }
}


This service uses the @KafkaListener annotation to listen to messages from different topics. For each topic (monitoring.laptop, monitoring.phone, and monitoring.tablet), there is a dedicated listener method that logs the received messages. This setup demonstrates how to process messages from multiple Kafka topics concurrently.

5. Verification:

Conclusion

In this article, we demonstrated how to integrate Apache Kafka with Spring Boot to build a simple messaging solution. Apache Kafka is a robust distributed streaming platform that enables real-time data processing and high-throughput messaging. By following the setup described here and using Docker to run Kafka locally, you can quickly build and test scalable messaging applications with Spring Boot.