mvanhorn opened a new pull request, #21715:
URL: https://github.com/apache/kafka/pull/21715

   When an idempotent (non-transactional) KafkaProducer encounters an SSL 
handshake failure during the initial connection, the `InitProducerId` request 
is dequeued from the pending requests queue but never sent. The 
`AuthenticationException` is caught in `Sender.runOnce()`, which calls 
`transactionManager.authenticationFailed()`. However, since the request was 
already dequeued, `authenticationFailed()` iterates over an empty queue and 
does nothing.
   
   The `TransactionManager` remains stuck in `INITIALIZING` state:
   - `bumpIdempotentEpochAndResetIdIfNeeded()` skips re-enqueueing because 
`currentState == INITIALIZING`
   - `nextRequest()` returns null because the queue is empty
   - The producer becomes permanently unable to send messages, even after the 
SSL configuration is corrected
   
   ## Changes
   
   Added a recovery path in `bumpIdempotentEpochAndResetIdIfNeeded()` that 
detects when the state is `INITIALIZING` but the pending queue is empty and no 
request is in-flight. In this case, the `InitProducerId` request is 
re-enqueued, allowing the producer to recover without requiring a restart.
   
   Also extracted the `InitProducerId` request creation into a helper method 
(`enqueueInitProducerIdRequest()`) to avoid duplication.
   
   ## Testing
   
   Added `testIdempotentProducerRecoversFromLostInitProducerIdRequest()` that 
simulates:
   1. Transitioning to INITIALIZING and enqueuing the request
   2. Dequeuing the request (as `Sender.maybeSendAndPollTransactionalRequest()` 
would)
   3. Triggering `authenticationFailed()` (which does nothing since queue is 
empty)
   4. Verifying that the next call to `bumpIdempotentEpochAndResetIdIfNeeded()` 
re-enqueues the request
   
   ## Impact
   
   This enables self-recovery for idempotent producers in cloud-native 
environments where certificate rotation or temporary auth server unavailability 
can cause transient SSL failures. Previously, the only workaround was to close 
and recreate the KafkaProducer.
   
   This contribution was developed with AI assistance (Claude Code).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to