Kafka consumers look simple in tutorials. Listen to a topic, process a message, acknowledge it. Ship it.

Then production happens.

Suddenly you’re staring at consumer lag climbing into the thousands. Messages are getting reprocessed for no obvious reason. A single vendor API starts timing out and, somehow, your entire consumer group comes to a halt. Nothing is technically “down”, but nothing is moving either.

This article is about an inventory sync consumer I built recently. It isn’t textbook, but it’s been running in production for a few months now, handling the entire inventory system of Headout without falling over — and that counts for something.

Some context: This service is built using Spring Boot and Kotlin. Each listener is responsible for fetching inventory (price and availability for different dates) for tours from third-party vendors through API calls.

Async All The Way Down

This consumer returns a CompletableFuture<Void> and immediately hands off work to a thread pool.

Spring Kafka’s listener container runs on its own threads. Block that thread with a 15-minute vendor API call and the consumer stops polling. Kafka thinks you’re dead, triggers a rebalance, and your entire consumer group starts shuffling partitions while legitimate work is still happening.

We return a future immediately and do the actual work in a separate thread pool. The listener container thread stays free to poll and send heartbeats. The future completes when work finishes (or fails), which tells Spring when to commit the offset.

fun startInventorySync(
    record: ConsumerRecord<String, ByteArray>,
    @Header(name = RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, required = false) retryAttempts: Int?
): CompletableFuture<Void> {
    val resultFuture = CompletableFuture<Void>()
    val query = parsePayload(record.value())
 
    // Capture MDC before switching threads
    val contextMap = MDCUtil.mdcContextMap
 
    // Hand off to thread pool immediately
    inventoryThreadPool.execute {
        MDCUtil.setMDC(contextMap)
        try {
            val inventoryResult = fetchInventory(query)
            handleSuccessfulInventoryFetch(query, inventoryResult, ...)
            resultFuture.complete(null)  // Signal success
        } catch (e: Exception) {
            resultFuture.completeExceptionally(e)  // Signal failure
            throw e  // Ensure retries trigger
        }
    }
 
    return resultFuture  // Return immediately
}

This keeps the Kafka listener thread responsive and prevents rebalances during long-running operations.

Thread Pool Backpressure

The thread pool uses SynchronousQueue with CallerRunsPolicy.

private val inventoryThreadPool = ThreadPoolExecutor(
    CORE_INVENTORY_THREAD_POOL_SIZE,        // 25 core threads
    MAXIMUM_INVENTORY_THREAD_POOL_SIZE,     // 50 max threads
    THREAD_POOL_IDLE_THREAD_TIMEOUT_MILLIS, // 3 minutes idle timeout
    TimeUnit.MILLISECONDS,
    SynchronousQueue(),                      // No queuing
    ThreadPoolExecutor.CallerRunsPolicy()    // Backpressure mechanism
)

SynchronousQueue doesn’t queue anything. It’s a handoff point. Thread available? Work gets picked up. No thread? Submission blocks until someone’s free. With CallerRunsPolicy, if the pool is saturated, the caller (Kafka listener thread) runs the task itself.

This creates natural backpressure. When the system is overwhelmed, we stop accepting new messages as fast. The Kafka consumer slows down organically — no manual rate limiting, no elaborate throttling.

The tradeoff: under extreme load, you’re back to blocking the consumer thread. But that’s the escape valve. Better to slow down gracefully than OOM from queuing ten thousand vendor API calls in memory.

The 15-Minute Timeout

900 seconds. Fifteen minutes.

private const val INVENTORY_SYNC_TIMEOUT_MILLIS = 900_000L  // 15 minutes
 
private fun fetchInventory(query: EnduranceInventoryQuery): List<InventorySlotInfo> {
    return runBlocking {
        withTimeout(INVENTORY_SYNC_TIMEOUT_MILLIS) {
            inventoryService.getInventoryForVendorTour(
                startDate = query.startDate.toJavaLocalDate(),
                endDate = query.endDate.toJavaLocalDate(),
                request = InventoryRequest(...)
            )
        }
    }
}

Vendor APIs are unpredictable. Some respond in 200ms. Others take three minutes on a good day. A few have weird timeout issues on weekends when their infrastructure runs differently. Set a 30-second timeout and you’ll fail legitimate requests that were just slow, trigger retries, make everything worse.

15 minutes is generous, but it’s a buffer against third-party chaos. The tradeoff is explicit: we chose completeness over speed. In inventory sync, getting all the data matters more than getting it fast.

Retries That Don’t Make Things Worse

Spring’s @RetryableTopic handles retries with a separate topic and exponential backoff. Three attempts, 10-second delay, doubling each time.

@KafkaListener(topics = [KafkaTopics.URUK_START_INVENTORY_SYNC])
@RetryableTopic(
    attempts = "3",
    numPartitions = "5",
    replicationFactor = "2",
    backoff = Backoff(delay = 10000, multiplier = 2.0),  // 10s, 20s, 40s
    autoCreateTopics = "true",
    listenerContainerFactory = "retryAndDltFactory"      // Concurrency = 1
)
fun startInventorySync(...): CompletableFuture<Void> {
    // ... processing logic ...
}

Key detail: retry topics have concurrency set to 1 (via retryAndDltFactory). When messages start failing, parallel retries would hammer the already-failing system. Instead, we back off, give it time to recover, process retries slowly.

The error handling does something specific:

inventoryThreadPool.execute {
    try {
        val inventoryResult = fetchInventory(query)
        handleSuccessfulInventoryFetch(query, inventoryResult, ...)
        resultFuture.complete(null)
    } catch (e: Exception) {
        log.error("Failed to get inventory for RMS: $rmsName", e)
        sendFailureAnalyticsEvent(query, e, startTime, eventTracker)
        resultFuture.completeExceptionally(e)  // Tell Spring this failed
        throw e  // Propagate to retry framework
    }
}

Both completeExceptionally(e) AND throw e. The future completion signals to Spring that this message failed. The throw ensures the exception propagates through the retry framework. Without both, retries don’t trigger reliably. Learned this by watching messages disappear into the void.

MDC Propagation

We capture MDC (Mapped Diagnostic Context) before submitting to the thread pool, then restore it in the worker thread.

fun startInventorySync(...): CompletableFuture<Void> {
    // Capture MDC in the Kafka listener thread
    val contextMap = MDCUtil.mdcContextMap
 
    inventoryThreadPool.execute {
        // Restore MDC in the worker thread
        if (contextMap != null) {
            MDCUtil.setMDC(contextMap)
        }
        try {
            // All logs from here have proper trace context
            log.info("[INVENTORY_SYNC] Starting inventory fetch...")
            // ...
        } finally {
            // Clean up happens automatically
        }
    }
}

Distributed tracing and logging need MDC to correlate requests. Without propagating context across thread boundaries, logs become a jumbled mess. You lose the thread (literally) of what happened during a message’s lifecycle.

This is the kind of detail that’s invisible when present but immediately painful when absent.

The Compression Journey

Inventory responses get massive. Hundreds of time slots with pricing and availability. I started off by sending raw JSON to the topic because simple is good. Watched Kafka cluster size balloon.

To counter this, we tried storing payloads in Redis and sending only keys through Kafka. It worked fine but Redis became a bottleneck. Now we had two systems to worry about instead of one.

Considered gzip compression next. Better, not great. Landed on Zstd and we started saving over 70% in memory without a lot of CPU overhead. The major drawback here was that messages on the Kafka UI suddenly became unreadable, which wasn’t fun to look at while debugging.

Eventually, we landed back on raw JSON for now. Currently, we’re considering Avro to strike that balance between size and readability.

This consumer keeps the Kafka thread free, applies backpressure when overwhelmed, retries without hammering failing systems, maintains trace context, compresses payloads if needed. It ships inventory updates every day without falling over — reliable, debuggable code that does the job.