[ 
https://issues.apache.org/jira/browse/KAFKA-20237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18062332#comment-18062332
 ] 

Yin Lei commented on KAFKA-20237:
---------------------------------

Thanks for the quick investigation! Your analysis about the initProducerId 
being dequeued without re-enqueuing perfectly explains the "deadlock" state I 
observed.

Regarding the recovery behavior, I understand that SSL failures are typically 
long-lasting. However, in containerized or cloud-native environments, 
certificates might be rotated or fixed via configmaps without restarting the 
pod. So as far as I'm concerned, a producer that can self-recover after such a 
fix would greatly improve system resilience. 

To address the broker stability concern you mentioned, perhaps we could 
implement a capped exponential backoff for the retry logic. This would prevent 
the broker from being overwhelmed while still allowing the producer to 
eventually recover once the environment is fixed.

It's pleasure for me to take on this challenge and attempt to provide a fix for 
this issue! Since I'm relatively new to the Kafka source code, I need some time 
to deep dive into the Sender and TransactionManager internals to ensure the fix 
is robust and handles the backoff logic correctly.

Once I have a potential patch ready, I would greatly appreciate it if you could 
help me review the code and provide your expertise.

Thanks again for your supporting!

>  TransactionManager stuck in `INITIALIZING` state after initial SSL handshake 
> failure
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-20237
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20237
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, producer 
>    Affects Versions: 3.9.0
>         Environment: - Operating System: Linux aarch64;
> - Kafka Version (Both Client and Server): 3.9.0;
> - security.protocol: SSL;
> - Some producer configurations: retries=2, reconnect.backoff.ms=30000, 
> transactional.id not set, enable.idempotence not set;
>            Reporter: Yin Lei
>            Priority: Major
>
> I encountered a scenario where the `KafkaProducer` fails to recover if the 
> initial SSL handshake with the broker fails, even after the underlying SSL 
> configuration is corrected.
>  
> *Steps to Reproduce:*
> 1. Configure a `KafkaProducer` with SSL enabled, but use an 
> incorrect/untrusted certificate on the server side to trigger an 
> `SSLHandshakeException`.
> 2. Start the Producer and attempt to send a message.
> 3. The Producer logs show recurring SSL handshake errors. At this point, 
> `TransactionManager` enters the `INITIALIZING` state.
> 4. Correct the SSL certificate configuration on the *server side,* so that 
> the broker is now reachable and the handshake can succeed.
> 5. Observe the Producer's behavior, messages still cat not be sent to broker.
>  
> *Expected Behavior:*
> The Producer should successfully complete the SSL handshake, and the `Sender` 
> thread should retry the `InitProducerId` request, allowing the 
> `TransactionManager` to transition from `INITIALIZING` to `READY`.
>  
> *Actual Behavior:*
> Even though the network/SSL layer is recovered, the `KafkaProducer` remains 
> unable to send messages. The `TransactionManager` stays stuck in 
> *INITIALIZING* because the initial failure to obtain a `ProducerId` isn't 
> properly re-triggered, or the state machine doesn't recover from the specific 
> handshake exception during the transition.
> h3. *Potential Impact:*
> In long-running microservices, if the initial connection to Kafka fails due 
> to temporary infrastructure or certificate issues, the Producer becomes 
> permanently "broken" and requires a full application restart to recover, 
> which is not ideal for high-availability systems.
> h3. *PS: Log Snippet*
> > The producer thread repeatedly prints the following log, and no message 
> > sending record was found.
> ```
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready 
> to send: [192.168.0.10:9812 (id: 0 rack: null)]  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][ProducerBatch 121] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
> attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
> attempt: 0  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> will not backoff, shouldWaitMore false, hasLeaderChanged false  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number 
> of partitions is too small: available=1, all=1, not using adaptive for topic 
> dte_nb_federation_receive  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][ProducerBatch 121] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
> attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
> attempt: 0  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> will not backoff, shouldWaitMore false, hasLeaderChanged false  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready 
> to send: [192.168.0.10:9812 (id: 0 rack: null)]  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][ProducerBatch 121] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
> attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
> attempt: 0  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> will not backoff, shouldWaitMore false, hasLeaderChanged false  
> 02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread | 
> producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number 
> of partitions is too small: available=1, all=1, not using adaptive for topic 
> dte_nb_federation_receive  
> 02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread | 
> producer-4][ProducerBatch 121] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
> attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
> attempt: 0  
> 02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread | 
> producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> will not backoff, shouldWaitMore false, hasLeaderChanged false 
> ```
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to