This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9e9d2a23efa MINOR: fix flaky sys test for static membership (#20594)
9e9d2a23efa is described below

commit 9e9d2a23efa32deea6d930cd8a4bd07048656ac4
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Sep 25 15:29:35 2025 -0400

    MINOR: fix flaky sys test for static membership (#20594)
    
    Fixing flakiness seen on this test, where static consumers could not
    join as expected after shutting down previous consumers with the same
    instance ID, and logs showed `UnreleasedInstanceIdException`.
    
    I expect the flakiness could happen if a consumer with instanceId1 is
    closed but not effectively removed from the group due to leave group
    fail/delayed (the leave group request is sent on a best effort, not
    retried if fails or times out).
    
    Fix by adding check to ensure the group is empty before attempting to
    reuse the instance ID
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 tests/kafkatest/tests/client/consumer_test.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/tests/kafkatest/tests/client/consumer_test.py 
b/tests/kafkatest/tests/client/consumer_test.py
index c6ecfc371a6..cb964e4c303 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -313,7 +313,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
             num_rebalances = consumer.num_rebalances()
             conflict_consumer.start()
             if group_protocol == consumer_group.classic_group_protocol:
-                # Classic protocol: conflicting members should join, and the 
intial ones with conflicting instance id should fail.
+                # Classic protocol: conflicting members should join, and the 
initial ones with conflicting instance id should fail.
                 self.await_members(conflict_consumer, num_conflict_consumers)
                 self.await_members(consumer, len(consumer.nodes) - 
num_conflict_consumers)
 
@@ -332,6 +332,11 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 wait_until(lambda: len(consumer.dead_nodes()) == 
len(consumer.nodes),
                            timeout_sec=60,
                            err_msg="Timed out waiting for the consumer to 
shutdown")
+                # Wait until the group becomes empty to ensure the instance ID 
is released.
+                # We use the 50-second timeout because the consumer session 
timeout is 45 seconds.
+                wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(state="empty"),
+                           timeout_sec=50,
+                           err_msg="Timed out waiting for the consumers to be 
removed from the group.")
                 conflict_consumer.start()
                 self.await_members(conflict_consumer, num_conflict_consumers)
 

Reply via email to