Repository: kafka
Updated Branches:
  refs/heads/trunk f1b37eec7 -> f5df13627


KAFKA-3782: Fix transient failure in connect distributed bounce test

Author: Jason Gustafson <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>

Closes #1650 from hachikuji/KAFKA-3782


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f5df1362
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f5df1362
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f5df1362

Branch: refs/heads/trunk
Commit: f5df13627aaa6052a19e4cd7896e94730dac7f64
Parents: f1b37ee
Author: Jason Gustafson <[email protected]>
Authored: Thu Jul 21 20:09:03 2016 -0700
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Thu Jul 21 20:09:03 2016 -0700

----------------------------------------------------------------------
 .../tests/connect/connect_distributed_test.py        | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f5df1362/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py 
b/tests/kafkatest/tests/connect/connect_distributed_test.py
index d4c4225..b9757ba 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -329,7 +329,7 @@ class ConnectDistributedTest(Test):
         self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
         self.cc.start()
 
-        self.source = VerifiableSource(self.cc, tasks=num_tasks)
+        self.source = VerifiableSource(self.cc, tasks=num_tasks, 
throughput=100)
         self.source.start()
         self.sink = VerifiableSink(self.cc, tasks=num_tasks)
         self.sink.start()
@@ -344,11 +344,14 @@ class ConnectDistributedTest(Test):
                     monitor.wait_until("Starting connectors and tasks using 
config offset", timeout_sec=90,
                                        err_msg="Kafka Connect worker didn't 
successfully join group and start work")
                 self.logger.info("Bounced Kafka Connect on %s and rejoined in 
%f seconds", node.account, time.time() - started)
-                # If this is a hard bounce, give additional time for the 
consumer groups to recover. If we don't give
-                # some time here, the next bounce may cause consumers to be 
shut down before they have any time to process
-                # data and we can end up with zero data making it through the 
test.
-                if not clean:
-                    time.sleep(15)
+
+                # Give additional time for the consumer groups to recover. 
Even if it is not a hard bounce, there are
+                # some cases where a restart can cause a rebalance to take the 
full length of the session timeout
+                # (e.g. if the client shuts down before it has received the 
memberId from its initial JoinGroup).
+                # If we don't give enough time for the group to stabilize, the 
next bounce may cause consumers to 
+                # be shut down before they have any time to process data and 
we can end up with zero data making it 
+                # through the test.
+                time.sleep(15)
 
 
         self.source.stop()

Reply via email to