[
https://issues.apache.org/jira/browse/KAFKA-18160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chia-Ping Tsai updated KAFKA-18160:
-----------------------------------
Issue Type: Bug (was: Improvement)
> Interrupting or waking up onPartitionsAssigned in AsyncConsumer can cause the
> ConsumerRebalanceListenerCallbackCompletedEvent to be skipped, potentially
> leading to corrupted added partitions
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-18160
> URL: https://issues.apache.org/jira/browse/KAFKA-18160
> Project: Kafka
> Issue Type: Bug
> Reporter: Chia-Ping Tsai
> Assignee: Chia-Ping Tsai
> Priority: Blocker
>
> I noticed this issue when testing KAFKA-17962. It includes two bugs listed
> below.
> *ConsumerRebalanceListenerCallbackCompletedEvent is skipped*
> `invokeRebalanceCallbacks`could throw WakeupException/InterruptException [0]
> and they are NOT handled. Hence, the event
> `ConsumerRebalanceListenerCallbackCompletedEvent` is NOT sent to background
> thread.
> *Solution*: We should use try-catch blocks to propagate both
> InterruptedException and WakeupException to the background thread.
> *corrupted added partitions*
> In the next iteration of invokeRebalanceCallbacks, non-fetchable assigned
> partitions are treated as owned partitions [1]. This results in "empty"
> partitions being passed to the listener, meaning that the listener never
> receives the correctly added partitions after the first execution fails.
> Consequently, this causes the test_pause_and_resume_sink (KAFKA-17962) to
> become unstable when using AsyncConsumer.
> *Solution*: We should add only partitions where pendingOnAssignedCallback is
> false to the owned partitions.
> [0]
> https://github.com/apache/kafka/blob/2d39d5be64d4f5b6446f4b9ec3f32b039707d9d1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L2046
> [1]
> https://github.com/apache/kafka/blob/2d39d5be64d4f5b6446f4b9ec3f32b039707d9d1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java#L828
--
This message was sent by Atlassian Jira
(v8.20.10#820010)