[ 
https://issues.apache.org/jira/browse/KAFKA-16312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16312:
---------------------------------

    Assignee: Lucas Brutschy  (was: Lianet Magrans)

> ConsumerRebalanceListener.onPartitionsAssigned() should be called after 
> joining, even if empty
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16312
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16312
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.7.0
>            Reporter: Kirk True
>            Assignee: Lucas Brutschy
>            Priority: Critical
>              Labels: kip-848-client-support
>             Fix For: 3.8.0
>
>
> There is a difference between the {{LegacyKafkaConsumer}} and 
> {{AsyncKafkaConsumer}} respecting when the 
> {{ConsumerRebalanceListener.onPartitionsAssigned()}} method is invoked.
> For example, with {{onPartitionsAssigned()}}:
> * {{LegacyKafkaConsumer}}: the listener method is invoked when the consumer 
> joins the group, even if that consumer was not assigned any partitions. In 
> this case it's passed an empty list.
> * {{AsyncKafkaConsumer}}: the listener method is only invoked after the 
> consumer joins the group iff it has assigned partitions
> This difference is affecting the system tests. The system tests use a Java 
> class named {{VerifiableConsumer}} which uses a {{ConsumerRebalanceListener}} 
> that logs when the callbacks are invoked. The system tests then read from 
> that log to determine when the callbacks are invoked. This coordination is 
> used by the system tests to determine the lifecycle and status of the 
> consumers.
> The system tests rely heavily on the listener behavior of the 
> {{LegacyKafkaConsumer}}. It invokes the {{onPartitionsAssigned()}} method 
> when the consumer joins the group, and the system tests use that to determine 
> when the consumer is actively a member of the group. This validation of 
> membership is used as an assertion throughout the consumer-related tests.
> In the system test I'm executing from {{consumer_test.py}}, there's a test 
> that creates three consumers to read from a single topic with a single 
> partition. It's a bit of an oddball test, but it demonstrates the issue.
> Here are the logs pulled from the test run when executed using the 
> {{LegacyKafkaConsumer}}:
> Node 1:
> {code:java}
> [2024-02-15 00:43:52,400] INFO Adding newly assigned partitions:  
> (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
> Node 2:
> {code:java}
> [2024-02-15 00:43:52,401] INFO Adding newly assigned partitions: test_topic-0 
> (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
> Node 3:
> {code:java}
> [2024-02-15 00:43:52,399] INFO Adding newly assigned partitions:  
> (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
> Here are the logs when executing the same test using the 
> {{AsyncKafkaConsumer}}:
> Node 1:
> {code:java}
> [2024-02-15 01:15:46,576] INFO Adding newly assigned partitions: test_topic-0 
> (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
> Node 2:
> {code:java}n/a{code}
> Node 3:
> {code:java}n/a{code}
> As a result of this change, the existing system tests do not work with the 
> new consumer. However, even more importantly, this change in behavior may 
> adversely affect existing users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to