[ https://issues.apache.org/jira/browse/KAFKA-16312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kirk True reassigned KAFKA-16312: --------------------------------- Assignee: Lucas Brutschy (was: Lianet Magrans) > ConsumerRebalanceListener.onPartitionsAssigned() should be called after > joining, even if empty > ---------------------------------------------------------------------------------------------- > > Key: KAFKA-16312 > URL: https://issues.apache.org/jira/browse/KAFKA-16312 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Affects Versions: 3.7.0 > Reporter: Kirk True > Assignee: Lucas Brutschy > Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > > There is a difference between the {{LegacyKafkaConsumer}} and > {{AsyncKafkaConsumer}} respecting when the > {{ConsumerRebalanceListener.onPartitionsAssigned()}} method is invoked. > For example, with {{onPartitionsAssigned()}}: > * {{LegacyKafkaConsumer}}: the listener method is invoked when the consumer > joins the group, even if that consumer was not assigned any partitions. In > this case it's passed an empty list. > * {{AsyncKafkaConsumer}}: the listener method is only invoked after the > consumer joins the group iff it has assigned partitions > This difference is affecting the system tests. The system tests use a Java > class named {{VerifiableConsumer}} which uses a {{ConsumerRebalanceListener}} > that logs when the callbacks are invoked. The system tests then read from > that log to determine when the callbacks are invoked. This coordination is > used by the system tests to determine the lifecycle and status of the > consumers. > The system tests rely heavily on the listener behavior of the > {{LegacyKafkaConsumer}}. It invokes the {{onPartitionsAssigned()}} method > when the consumer joins the group, and the system tests use that to determine > when the consumer is actively a member of the group. This validation of > membership is used as an assertion throughout the consumer-related tests. > In the system test I'm executing from {{consumer_test.py}}, there's a test > that creates three consumers to read from a single topic with a single > partition. It's a bit of an oddball test, but it demonstrates the issue. > Here are the logs pulled from the test run when executed using the > {{LegacyKafkaConsumer}}: > Node 1: > {code:java} > [2024-02-15 00:43:52,400] INFO Adding newly assigned partitions: > (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code} > Node 2: > {code:java} > [2024-02-15 00:43:52,401] INFO Adding newly assigned partitions: test_topic-0 > (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code} > Node 3: > {code:java} > [2024-02-15 00:43:52,399] INFO Adding newly assigned partitions: > (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code} > Here are the logs when executing the same test using the > {{AsyncKafkaConsumer}}: > Node 1: > {code:java} > [2024-02-15 01:15:46,576] INFO Adding newly assigned partitions: test_topic-0 > (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code} > Node 2: > {code:java}n/a{code} > Node 3: > {code:java}n/a{code} > As a result of this change, the existing system tests do not work with the > new consumer. However, even more importantly, this change in behavior may > adversely affect existing users. -- This message was sent by Atlassian Jira (v8.20.10#820010)