lianetm commented on code in PR #20957:
URL: https://github.com/apache/kafka/pull/20957#discussion_r2557385410
##########
tests/kafkatest/tests/client/consumer_test.py:
##########
@@ -344,16 +353,23 @@ def test_fencing_static_consumer(self,
num_conflict_consumers, fencing_stage, me
consumer.stop_all()
wait_until(lambda: len(consumer.dead_nodes()) ==
len(consumer.nodes),
timeout_sec=60,
- err_msg="Timed out waiting for the consumer to
shutdown")
+ err_msg="Timed out waiting for the consumer to
shutdown. Describe output is %s" % "
".join(self.kafka.describe_consumer_group_members(self.group_id)))
# Wait until the group becomes empty to ensure the instance ID
is released.
- # We use the 50-second timeout because the consumer session
timeout is 45 seconds.
+
+
+ # We use the 60-second timeout because the consumer session
timeout is 45 seconds adding some time for latency.
wait_until(lambda: self.group_id in
self.kafka.list_consumer_groups(state="empty"),
- timeout_sec=50,
- err_msg="Timed out waiting for the consumers to be
removed from the group.")
+ timeout_sec=60,
+ err_msg="Timed out waiting for the consumers to be
removed from the group Describe output is %s." % "
".join(self.kafka.describe_consumer_group_members(self.group_id)))
Review Comment:
```suggestion
err_msg="Timed out waiting for the consumers to
be removed from the group. Describe output is %s." % "
".join(self.kafka.describe_consumer_group_members(self.group_id)))
```
##########
tests/kafkatest/tests/client/consumer_test.py:
##########
@@ -330,10 +330,19 @@ def test_fencing_static_consumer(self,
num_conflict_consumers, fencing_stage, me
timeout_sec=10,
err_msg="Timed out waiting for the fenced consumers to
stop")
else:
- # Consumer protocol: Existing members should remain active and
new conflicting ones should not be able to join.
- self.await_consumed_messages(consumer)
- assert num_rebalances == consumer.num_rebalances(), "Static
consumers attempt to join with instance id in use should not cause a rebalance"
- assert len(consumer.joined_nodes()) == len(consumer.nodes)
+ self.logger.debug("Members status - Joined [%d/%d]: %s, Not
joined: %s",
+ len(consumer.joined_nodes()),
+ len(consumer.nodes),
+ " ".join(str(node.account) for node in
consumer.joined_nodes()),
+ " ".join(
+ set(str(node.account) for node in
consumer.nodes) - set(consumer.joined_nodes())))
Review Comment:
seems we're missing something here? it has set(account) - set(node)
##########
tests/kafkatest/tests/client/consumer_test.py:
##########
@@ -330,10 +330,19 @@ def test_fencing_static_consumer(self,
num_conflict_consumers, fencing_stage, me
timeout_sec=10,
err_msg="Timed out waiting for the fenced consumers to
stop")
else:
- # Consumer protocol: Existing members should remain active and
new conflicting ones should not be able to join.
- self.await_consumed_messages(consumer)
- assert num_rebalances == consumer.num_rebalances(), "Static
consumers attempt to join with instance id in use should not cause a rebalance"
Review Comment:
we still need these 2 calls right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]