This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 034968b  MINOR: Add all topics created check streams broker bounce 
test (2.2) (#6244)
034968b is described below

commit 034968b1ac5edcc3e4f80bb98a2a8d20e5a82022
Author: Bill Bejeck <bbej...@gmail.com>
AuthorDate: Wed Feb 20 12:47:05 2019 -0500

    MINOR: Add all topics created check streams broker bounce test (2.2) (#6244)
    
    The StreamsBrokerBounceTest.test_broker_type_bounce experienced what looked 
like a transient failure. After looking over this test and failure, it seems 
like it is vulnerable to timing error that streams will start before the kafka 
service creates all topics.
    
    Reviews:  Matthias J. Sax <mj...@apache.org>, John Roesler 
<j...@confluent.io>
---
 .../tests/streams/streams_broker_bounce_test.py    | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)

diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py 
b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index 8d623eb..7859d69 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -133,6 +133,22 @@ class StreamsBrokerBounceTest(Test):
         for num in range(0, num_failures - 1):
             signal_node(self, self.kafka.nodes[num], sig)
 
+    def confirm_topics_on_all_brokers(self, expected_topic_set):
+        for node in self.kafka.nodes:
+            match_count = 0
+            # need to iterate over topic_list_generator as kafka.list_topics()
+            # returns a python generator so values are fetched lazily
+            # so we can't just compare directly we must iterate over what's 
returned
+            topic_list_generator = self.kafka.list_topics("placeholder", node)
+            for topic in topic_list_generator:
+                if topic in expected_topic_set:
+                    match_count += 1
+
+            if len(expected_topic_set) != match_count:
+                return False
+
+        return True
+
         
     def setup_system(self, start_processor=True):
         # Setup phase
@@ -141,6 +157,12 @@ class StreamsBrokerBounceTest(Test):
 
         self.kafka = KafkaService(self.test_context, 
num_nodes=self.replication, zk=self.zk, topics=self.topics)
         self.kafka.start()
+
+        # allow some time for topics to be created
+        wait_until(lambda: 
self.confirm_topics_on_all_brokers(set(self.topics.keys())),
+                   timeout_sec=60,
+                   err_msg="Broker did not create all topics in 60 seconds ")
+
         # Start test harness
         self.driver = StreamsSmokeTestDriverService(self.test_context, 
self.kafka)
         self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka)

Reply via email to