This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9e9d2a23efa MINOR: fix flaky sys test for static membership (#20594)
9e9d2a23efa is described below
commit 9e9d2a23efa32deea6d930cd8a4bd07048656ac4
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Sep 25 15:29:35 2025 -0400
MINOR: fix flaky sys test for static membership (#20594)
Fixing flakiness seen on this test, where static consumers could not
join as expected after shutting down previous consumers with the same
instance ID, and logs showed `UnreleasedInstanceIdException`.
I expect the flakiness could happen if a consumer with instanceId1 is
closed but not effectively removed from the group due to leave group
fail/delayed (the leave group request is sent on a best effort, not
retried if fails or times out).
Fix by adding check to ensure the group is empty before attempting to
reuse the instance ID
Reviewers: Matthias J. Sax <[email protected]>
---
tests/kafkatest/tests/client/consumer_test.py | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/tests/kafkatest/tests/client/consumer_test.py
b/tests/kafkatest/tests/client/consumer_test.py
index c6ecfc371a6..cb964e4c303 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -313,7 +313,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
num_rebalances = consumer.num_rebalances()
conflict_consumer.start()
if group_protocol == consumer_group.classic_group_protocol:
- # Classic protocol: conflicting members should join, and the
intial ones with conflicting instance id should fail.
+ # Classic protocol: conflicting members should join, and the
initial ones with conflicting instance id should fail.
self.await_members(conflict_consumer, num_conflict_consumers)
self.await_members(consumer, len(consumer.nodes) -
num_conflict_consumers)
@@ -332,6 +332,11 @@ class OffsetValidationTest(VerifiableConsumerTest):
wait_until(lambda: len(consumer.dead_nodes()) ==
len(consumer.nodes),
timeout_sec=60,
err_msg="Timed out waiting for the consumer to
shutdown")
+ # Wait until the group becomes empty to ensure the instance ID
is released.
+ # We use the 50-second timeout because the consumer session
timeout is 45 seconds.
+ wait_until(lambda: self.group_id in
self.kafka.list_consumer_groups(state="empty"),
+ timeout_sec=50,
+ err_msg="Timed out waiting for the consumers to be
removed from the group.")
conflict_consumer.start()
self.await_members(conflict_consumer, num_conflict_consumers)