This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new 034968b MINOR: Add all topics created check streams broker bounce test (2.2) (#6244) 034968b is described below commit 034968b1ac5edcc3e4f80bb98a2a8d20e5a82022 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Wed Feb 20 12:47:05 2019 -0500 MINOR: Add all topics created check streams broker bounce test (2.2) (#6244) The StreamsBrokerBounceTest.test_broker_type_bounce experienced what looked like a transient failure. After looking over this test and failure, it seems like it is vulnerable to timing error that streams will start before the kafka service creates all topics. Reviews: Matthias J. Sax <mj...@apache.org>, John Roesler <j...@confluent.io> --- .../tests/streams/streams_broker_bounce_test.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py index 8d623eb..7859d69 100644 --- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py @@ -133,6 +133,22 @@ class StreamsBrokerBounceTest(Test): for num in range(0, num_failures - 1): signal_node(self, self.kafka.nodes[num], sig) + def confirm_topics_on_all_brokers(self, expected_topic_set): + for node in self.kafka.nodes: + match_count = 0 + # need to iterate over topic_list_generator as kafka.list_topics() + # returns a python generator so values are fetched lazily + # so we can't just compare directly we must iterate over what's returned + topic_list_generator = self.kafka.list_topics("placeholder", node) + for topic in topic_list_generator: + if topic in expected_topic_set: + match_count += 1 + + if len(expected_topic_set) != match_count: + return False + + return True + def setup_system(self, start_processor=True): # Setup phase @@ -141,6 +157,12 @@ class StreamsBrokerBounceTest(Test): self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=self.zk, topics=self.topics) self.kafka.start() + + # allow some time for topics to be created + wait_until(lambda: self.confirm_topics_on_all_brokers(set(self.topics.keys())), + timeout_sec=60, + err_msg="Broker did not create all topics in 60 seconds ") + # Start test harness self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)