Most messaging problems in distributed systems aren’t hard to solve — they’re hard to solve correctly. Publishing a message to Kafka is three lines of code. Handling failures, schema evolution, poison messages, and test isolation without a running broker is where most teams accumulate technical debt.
This guide covers the full spring-kafka integration stack: dependency setup, producer and consumer configuration, @KafkaListener, error handling with dead letter topics, Avro schema registry, and testing with EmbeddedKafka. The goal is a setup you can actually run in production without surprises.
Adding spring-kafka
The spring-kafka library is the standard integration layer. Add it alongside your Kafka client dependency.
Maven:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Spring Boot’s dependency management handles the version. If you need a specific Kafka client version, override it explicitly:
<properties>
<kafka.version>3.7.0</kafka.version>
</properties>
Gradle:
implementation 'org.springframework.kafka:spring-kafka'
No version needed in the Spring Boot BOM — it resolves to a tested, compatible version automatically.
Kafka Configuration
The minimal application.yml to connect to a local broker:
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-app
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest means a new consumer group reads from the beginning of the topic. latest skips existing messages. For production, pick this deliberately — getting it wrong is a common source of lost messages during first deployment.
For production, you’ll also want:
spring:
kafka:
producer:
acks: all
retries: 3
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 5
consumer:
enable-auto-commit: false
listener:
ack-mode: MANUAL_IMMEDIATE
acks: all with enable.idempotence: true gives you exactly-once delivery semantics on the producer side. enable-auto-commit: false puts you in control of offset commits — the listener container handles this when you use MANUAL_IMMEDIATE ack mode.
Producing Messages with KafkaTemplate
KafkaTemplate is your main entry point for publishing. Spring Boot auto-configures it from the spring.kafka.producer properties.
@Service
@RequiredArgsConstructor
public class OrderEventPublisher {
private final KafkaTemplate<String, String> kafkaTemplate;
public void publishOrderCreated(String orderId, String payload) {
kafkaTemplate.send("orders.created", orderId, payload)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to publish order event for {}: {}", orderId, ex.getMessage());
} else {
log.debug("Published order event for {} to partition {} offset {}",
orderId,
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
}
A few things worth noting here. The message key — orderId in this case — determines which partition the message lands in. All messages with the same key go to the same partition, which gives you ordering guarantees per key. If ordering doesn’t matter, pass null.
send() returns a CompletableFuture<SendResult>. Don’t ignore it. Swallowing send failures silently is how you lose messages in production. The whenComplete callback runs on a Kafka producer thread, not your application thread, so don’t do blocking work there.
For synchronous sends (blocking until the broker acknowledges):
SendResult<String, String> result = kafkaTemplate.send("orders.created", orderId, payload).get();
Use this sparingly. It blocks your thread and tanks throughput, but it’s useful in batch jobs or when you genuinely need send confirmation before proceeding.
Consuming Messages with @KafkaListener
@KafkaListener turns a method into a Kafka consumer with minimal configuration:
@Component
@Slf4j
public class OrderEventConsumer {
@KafkaListener(topics = "orders.created", groupId = "inventory-service")
public void handleOrderCreated(
@Payload String payload,
@Header(KafkaHeaders.RECEIVED_KEY) String orderId,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("Processing order {} from partition {} offset {}", orderId, partition, offset);
// process the order
}
}
The groupId in @KafkaListener overrides the global spring.kafka.consumer.group-id. Useful when you have multiple consumers in the same application that need different group IDs.
Concurrency and Partitions
By default, each @KafkaListener gets one consumer thread. To increase parallelism:
@KafkaListener(topics = "orders.created", concurrency = "3")
public void handleOrderCreated(String payload) {
// This runs on 3 consumer threads
}
Set concurrency to match (or be less than) the number of partitions. A consumer count higher than the partition count means idle threads — Kafka won’t assign more than one consumer per partition within a group.
Batch Listeners
For high-throughput scenarios, consume a batch of records in one call:
@KafkaListener(topics = "orders.created", batch = "true")
public void handleOrderBatch(List<String> payloads, Acknowledgment ack) {
payloads.forEach(this::processOrder);
ack.acknowledge();
}
Batch listeners significantly improve throughput at the cost of more complex error handling — if one message in the batch fails, you need to decide how to handle the others.
Error Handling
The default error behavior in spring-kafka is to log the error and move on. That’s wrong for most use cases — you’ll silently lose messages. You need an explicit error handler.
DefaultErrorHandler with Retry
@Configuration
public class KafkaConsumerConfig {
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, String> kafkaTemplate) {
// Retry up to 3 times with 1-second backoff
ExponentialBackOff backOff = new ExponentialBackOff(1000L, 2.0);
backOff.setMaxAttempts(3);
DefaultErrorHandler handler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate),
backOff
);
// Don't retry on deserialization errors — they won't succeed
handler.addNotRetryableExceptions(DeserializationException.class);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
}
DefaultErrorHandler replaced the older SeekToCurrentErrorHandler in spring-kafka 2.8. It retries failed records in-place (without re-queuing), then hands off to the recoverer when retries are exhausted.
Wire it into your listener container factory:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory,
DefaultErrorHandler errorHandler) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
Dead Letter Topics
DeadLetterPublishingRecoverer routes failed messages to a dead letter topic after retries are exhausted. By default it publishes to {original-topic}.DLT.
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<String, String> kafkaTemplate) {
return new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> {
// Route to specific DLT based on the original topic
return new TopicPartition(record.topic() + ".dlt", record.partition());
});
}
The DLT message includes the original message plus exception headers — you can inspect why a message failed without losing it. Build a separate consumer for your DLT topics to alert, re-process, or archive failed messages.
A minimal DLT consumer:
@KafkaListener(topics = "orders.created.dlt", groupId = "dlt-processor")
public void handleDeadLetter(
@Payload String payload,
@Header("kafka_dlt-exception-message") String exceptionMessage,
@Header(KafkaHeaders.RECEIVED_KEY) String key) {
log.error("Dead letter received for key={}: exception={}, payload={}",
key, exceptionMessage, payload);
// alert, store in database, etc.
}
Schema Registry with Avro
JSON is convenient until two teams need to evolve a message format independently. Schema registries solve this: producers register a schema, consumers validate against it, and both sides can evolve independently within compatibility rules.
Avro Dependencies
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
Confluent’s artifacts aren’t on Maven Central. Add their repository:
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
Define Your Schema
Create src/main/avro/OrderCreatedEvent.avsc:
{
"type": "record",
"name": "OrderCreatedEvent",
"namespace": "com.katyella.events",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "totalAmount", "type": "double"},
{"name": "createdAt", "type": "long", "logicalType": "timestamp-millis"}
]
}
Add the Avro Maven plugin to generate Java classes from the schema:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals><goal>schema</goal></goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-sources/avro</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
Configure Avro Serialization
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: http://localhost:8081
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: http://localhost:8081
specific.avro.reader: true
specific.avro.reader: true tells the deserializer to return the generated OrderCreatedEvent class rather than a generic GenericRecord.
Now your producer and consumer use typed objects:
// Producer
public void publishOrderCreated(OrderCreatedEvent event) {
kafkaTemplate.send("orders.created", event.getOrderId().toString(), event);
}
// Consumer
@KafkaListener(topics = "orders.created")
public void handleOrderCreated(OrderCreatedEvent event) {
log.info("Processing order {} for customer {}", event.getOrderId(), event.getCustomerId());
}
Schema evolution works within Confluent’s compatibility rules. Adding a field with a default value is backward-compatible. Removing a field with a default is forward-compatible. Both together is fully compatible. Configure compatibility level per subject in the registry to enforce this automatically.
Testing with EmbeddedKafka
The @EmbeddedKafka annotation spins up an in-process Kafka broker for tests — no Docker, no external broker, no test infrastructure to manage. It’s fast enough for unit and integration tests.
Add the test dependency:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
Testing a Producer
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"orders.created"})
@TestPropertySource(properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"
})
class OrderEventPublisherTest {
@Autowired
private OrderEventPublisher publisher;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
void publishOrderCreated_sendsMessageToTopic() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"test-consumer", "true", embeddedKafka);
try (Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(
consumerProps,
new StringDeserializer(),
new StringDeserializer()).createConsumer()) {
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "orders.created");
publisher.publishOrderCreated("order-123", "{\"amount\": 99.99}");
ConsumerRecord<String, String> record =
KafkaTestUtils.getSingleRecord(consumer, "orders.created", Duration.ofSeconds(5));
assertThat(record.key()).isEqualTo("order-123");
assertThat(record.value()).contains("99.99");
}
}
}
${spring.embedded.kafka.brokers} is a Spring property that resolves to the embedded broker’s address. The @TestPropertySource overrides your application’s bootstrap-servers with the embedded one.
Testing a Consumer
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"orders.created"})
@TestPropertySource(properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest"
})
class OrderEventConsumerTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@MockBean
private InventoryService inventoryService;
@Test
void handleOrderCreated_callsInventoryService() throws Exception {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(
producerProps,
new StringSerializer(),
new StringSerializer()).createProducer();
producer.send(new ProducerRecord<>("orders.created", "order-456", "{\"amount\": 50.00}"));
producer.flush();
// Wait for the listener to process
verify(inventoryService, timeout(5000)).reserveStock("order-456");
}
}
verify(..., timeout(5000)) from Mockito lets you assert that a method was eventually called without manually polling. This is cleaner than Thread.sleep() and fails fast when the consumer is broken.
Testing Error Handling and DLTs
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"orders.created", "orders.created.dlt"})
@TestPropertySource(properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"
})
class DeadLetterTopicTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@MockBean
private OrderProcessor orderProcessor;
@Test
void failedMessage_routedToDeadLetterTopic() throws Exception {
// Make processing always fail
doThrow(new RuntimeException("processing failed"))
.when(orderProcessor).process(any());
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(
producerProps, new StringSerializer(), new StringSerializer()).createProducer();
producer.send(new ProducerRecord<>("orders.created", "order-789", "bad-payload"));
producer.flush();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"dlt-test-consumer", "true", embeddedKafka);
try (Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(
consumerProps, new StringDeserializer(), new StringDeserializer())
.createConsumer()) {
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "orders.created.dlt");
ConsumerRecord<String, String> dltRecord =
KafkaTestUtils.getSingleRecord(consumer, "orders.created.dlt", Duration.ofSeconds(10));
assertThat(dltRecord.key()).isEqualTo("order-789");
}
}
}
The 10-second timeout accounts for retry backoff time. If your ExponentialBackOff has 3 retries with a 1-second initial interval, the message takes at least 7 seconds to reach the DLT (1s + 2s + 4s of backoff). Size your timeout accordingly.
For testing with real Docker containers and better isolation, see Spring Boot Testcontainers Integration Testing.
Production Checklist
A few things that consistently bite teams going to production with Kafka:
Monitoring. Consumer lag is your primary health signal. kafka.consumer:type=consumer-fetch-manager-metrics,client-id=* via JMX, or use Micrometer’s Kafka metrics binding — it auto-registers if micrometer-core is on the classpath.
Topic creation. Don’t rely on auto-topic creation in production. Create topics explicitly with the right partition count, replication factor, and retention settings. Spring’s NewTopic bean handles this at startup:
@Bean
public NewTopic ordersTopic() {
return TopicBuilder.name("orders.created")
.partitions(6)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(Duration.ofDays(7).toMillis()))
.build();
}
Idempotent consumers. Kafka guarantees at-least-once delivery by default. Your consumer should handle duplicate messages gracefully — check if the work was already done before doing it again. The message key or a unique ID in the payload is your deduplication handle.
Graceful shutdown. Spring handles this if you use a standard @KafkaListener — the container waits for in-flight messages to complete before shutting down. Don’t call System.exit() or kill the JVM abruptly without testing what happens to in-flight messages.
This fits naturally into event-driven microservices architecture — for the broader patterns around service decomposition and event flows, see Spring Boot Microservices Architecture Patterns. For error handling beyond Kafka — REST APIs, database transactions, and global exception handling — Spring Boot Error Handling covers the complementary patterns.