This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new d072db1  MINOR: Add check all topics created check streams broker 
bounce test (2.0) (#6241)
d072db1 is described below

commit d072db1d67c59436bf65a192245f423e87399eda
Author: Bill Bejeck <bbej...@gmail.com>
AuthorDate: Wed Feb 20 16:40:33 2019 -0500

    MINOR: Add check all topics created check streams broker bounce test (2.0) 
(#6241)
    
    The StreamsBrokerBounceTest.test_broker_type_bounce experienced what looked 
like a transient failure. After looking over this test and failure, it seems 
like it is vulnerable to timing error that streams will start before the kafka 
service creates all topics.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax 
<mj...@apache.org>, John Roesler <j...@confluent.io>
---
 .../tests/streams/streams_broker_bounce_test.py    | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)

diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py 
b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index 8d623eb..7859d69 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -133,6 +133,22 @@ class StreamsBrokerBounceTest(Test):
         for num in range(0, num_failures - 1):
             signal_node(self, self.kafka.nodes[num], sig)
 
+    def confirm_topics_on_all_brokers(self, expected_topic_set):
+        for node in self.kafka.nodes:
+            match_count = 0
+            # need to iterate over topic_list_generator as kafka.list_topics()
+            # returns a python generator so values are fetched lazily
+            # so we can't just compare directly we must iterate over what's 
returned
+            topic_list_generator = self.kafka.list_topics("placeholder", node)
+            for topic in topic_list_generator:
+                if topic in expected_topic_set:
+                    match_count += 1
+
+            if len(expected_topic_set) != match_count:
+                return False
+
+        return True
+
         
     def setup_system(self, start_processor=True):
         # Setup phase
@@ -141,6 +157,12 @@ class StreamsBrokerBounceTest(Test):
 
         self.kafka = KafkaService(self.test_context, 
num_nodes=self.replication, zk=self.zk, topics=self.topics)
         self.kafka.start()
+
+        # allow some time for topics to be created
+        wait_until(lambda: 
self.confirm_topics_on_all_brokers(set(self.topics.keys())),
+                   timeout_sec=60,
+                   err_msg="Broker did not create all topics in 60 seconds ")
+
         # Start test harness
         self.driver = StreamsSmokeTestDriverService(self.test_context, 
self.kafka)
         self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka)

Reply via email to