Repository: kafka Updated Branches: refs/heads/0.10.0 8a6ddf4c4 -> 71a598a17
KAFKA-3782: Fix transient failure in connect distributed bounce test Author: Jason Gustafson <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1650 from hachikuji/KAFKA-3782 (cherry picked from commit f5df13627aaa6052a19e4cd7896e94730dac7f64) Signed-off-by: Ewen Cheslack-Postava <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71a598a1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71a598a1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71a598a1 Branch: refs/heads/0.10.0 Commit: 71a598a179ea9f1ee897d755b6daf9fb99860b50 Parents: 8a6ddf4 Author: Jason Gustafson <[email protected]> Authored: Thu Jul 21 20:09:03 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu Jul 21 20:09:19 2016 -0700 ---------------------------------------------------------------------- .../tests/connect/connect_distributed_test.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/71a598a1/tests/kafkatest/tests/connect/connect_distributed_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index d4c4225..b9757ba 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -329,7 +329,7 @@ class ConnectDistributedTest(Test): self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) self.cc.start() - self.source = VerifiableSource(self.cc, tasks=num_tasks) + self.source = VerifiableSource(self.cc, tasks=num_tasks, throughput=100) self.source.start() self.sink = VerifiableSink(self.cc, tasks=num_tasks) self.sink.start() @@ -344,11 +344,14 @@ class ConnectDistributedTest(Test): monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90, err_msg="Kafka Connect worker didn't successfully join group and start work") self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started) - # If this is a hard bounce, give additional time for the consumer groups to recover. If we don't give - # some time here, the next bounce may cause consumers to be shut down before they have any time to process - # data and we can end up with zero data making it through the test. - if not clean: - time.sleep(15) + + # Give additional time for the consumer groups to recover. Even if it is not a hard bounce, there are + # some cases where a restart can cause a rebalance to take the full length of the session timeout + # (e.g. if the client shuts down before it has received the memberId from its initial JoinGroup). + # If we don't give enough time for the group to stabilize, the next bounce may cause consumers to + # be shut down before they have any time to process data and we can end up with zero data making it + # through the test. + time.sleep(15) self.source.stop()
