Repository: kafka Updated Branches: refs/heads/trunk 395bf46dd -> d1053915f
KAFKA-2803: Add hard bounce system test for Kafka Connect. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Gwen Shapira Closes #494 from ewencp/kafka-2803-connect-hard-bounce-system-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d1053915 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d1053915 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d1053915 Branch: refs/heads/trunk Commit: d1053915f64aec7ea717bbeac9570b1f75e9a2b0 Parents: 395bf46 Author: Ewen Cheslack-Postava <[email protected]> Authored: Tue Nov 24 17:39:14 2015 -0800 Committer: Gwen Shapira <[email protected]> Committed: Tue Nov 24 17:39:14 2015 -0800 ---------------------------------------------------------------------- .../kafkatest/tests/connect_distributed_test.py | 43 +++++++++++++------- .../templates/connect-distributed.properties | 7 +++- 2 files changed, 34 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d1053915/tests/kafkatest/tests/connect_distributed_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/connect_distributed_test.py b/tests/kafkatest/tests/connect_distributed_test.py index 4689f36..1f82e63 100644 --- a/tests/kafkatest/tests/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect_distributed_test.py @@ -17,7 +17,8 @@ from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink from kafkatest.services.console_consumer import ConsoleConsumer from ducktape.utils.util import wait_until -import hashlib, subprocess, json, itertools, time +from ducktape.mark import matrix +import subprocess, itertools, time from collections import Counter class ConnectDistributedTest(KafkaTest): @@ -84,7 +85,8 @@ class ConnectDistributedTest(KafkaTest): wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file") - def test_clean_bounce(self): + @matrix(clean=[True, False]) + def test_bounce(self, clean): """ Validates that source and sink tasks that run continuously and produce a predictable sequence of messages run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces. @@ -102,13 +104,19 @@ class ConnectDistributedTest(KafkaTest): for _ in range(3): for node in self.cc.nodes: started = time.time() - self.logger.info("Cleanly bouncing Kafka Connect on " + str(node.account)) - self.cc.stop_node(node) + self.logger.info("%s bouncing Kafka Connect on %s", clean and "Clean" or "Hard", str(node.account)) + self.cc.stop_node(node, clean_shutdown=clean) with node.account.monitor_log(self.cc.LOG_FILE) as monitor: self.cc.start_node(node) monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90, err_msg="Kafka Connect worker didn't successfully join group and start work") self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started) + # If this is a hard bounce, give additional time for the consumer groups to recover. If we don't give + # some time here, the next bounce may cause consumers to be shut down before they have any time to process + # data and we can end up with zero data making it through the test. + if not clean: + time.sleep(15) + self.source.stop() self.sink.stop() @@ -118,12 +126,14 @@ class ConnectDistributedTest(KafkaTest): # cleanly exited. Currently this only tests at least once delivery because the sink task may not have consumed # all the messages generated by the source task. This needs to be done per-task since seqnos are not unique across # tasks. - src_msgs = self.source.messages() - sink_msgs = self.sink.messages() success = True errors = [] + allow_dups = not clean + src_messages = self.source.messages() + sink_messages = self.sink.messages() for task in range(num_tasks): - src_seqnos = [msg['seqno'] for msg in src_msgs if msg['task'] == task] + # Validate source messages + src_seqnos = [msg['seqno'] for msg in src_messages if msg['task'] == task] # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean # bouncing should commit on rebalance. src_seqno_max = max(src_seqnos) @@ -136,12 +146,14 @@ class ConnectDistributedTest(KafkaTest): self.logger.error("Missing source sequence numbers for task " + str(task)) errors.append("Found missing source sequence numbers for task %d: %s" % (task, missing_src_seqnos)) success = False - if duplicate_src_seqnos: + if not allow_dups and duplicate_src_seqnos: self.logger.error("Duplicate source sequence numbers for task " + str(task)) errors.append("Found duplicate source sequence numbers for task %d: %s" % (task, duplicate_src_seqnos)) success = False - sink_seqnos = [msg['seqno'] for msg in sink_msgs if msg['task'] == task and 'flushed' in msg] + + # Validate sink messages + sink_seqnos = [msg['seqno'] for msg in sink_messages if msg['task'] == task and 'flushed' in msg] # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because # clean bouncing should commit on rebalance. sink_seqno_max = max(sink_seqnos) @@ -154,17 +166,16 @@ class ConnectDistributedTest(KafkaTest): self.logger.error("Missing sink sequence numbers for task " + str(task)) errors.append("Found missing sink sequence numbers for task %d: %s" % (task, missing_sink_seqnos)) success = False - if duplicate_sink_seqnos: - self.logger.error("Duplicate sink sequence numbers for task " + str(task)) - errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task, duplicate_sink_seqnos)) - success = False - + if not allow_dups and duplicate_sink_seqnos: + self.logger.error("Duplicate sink sequence numbers for task " + str(task)) + errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task, duplicate_sink_seqnos)) + success = False + # Validate source and sink match if sink_seqno_max > src_seqno_max: self.logger.error("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d", task, sink_seqno_max, src_seqno_max) errors.append("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d" % (task, sink_seqno_max, src_seqno_max)) success = False - if src_seqno_max < 1000 or sink_seqno_max < 1000: errors.append("Not enough messages were processed: source:%d sink:%d" % (src_seqno_max, sink_seqno_max)) success = False @@ -175,9 +186,11 @@ class ConnectDistributedTest(KafkaTest): consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True) consumer_validator.run() self.mark_for_collect(consumer_validator, "consumer_stdout") + assert success, "Found validation errors:\n" + "\n ".join(errors) + def _validate_file_output(self, input): input_set = set(input) # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled. http://git-wip-us.apache.org/repos/asf/kafka/blob/d1053915/tests/kafkatest/tests/templates/connect-distributed.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/templates/connect-distributed.properties b/tests/kafkatest/tests/templates/connect-distributed.properties index 4a61b92..8a9f6c7 100644 --- a/tests/kafkatest/tests/templates/connect-distributed.properties +++ b/tests/kafkatest/tests/templates/connect-distributed.properties @@ -37,4 +37,9 @@ config.storage.topic={{ CONFIG_TOPIC }} # Make sure data gets flushed frequently so tests don't have to wait to ensure they see data in output systems offset.flush.interval.ms=5000 -rest.advertised.host.name = {{ node.account.hostname }} \ No newline at end of file +rest.advertised.host.name = {{ node.account.hostname }} + + +# Reduce session timeouts so tests that kill workers don't need to wait as long to recover +session.timeout.ms=10000 +consumer.session.timeout.ms=10000 \ No newline at end of file
