Repository: kafka Updated Branches: refs/heads/trunk 89ba0c152 -> dd6347a5d
KAFKA-5888; System test to check ordering of messages with transactions and max.in.flight > 1 To check ordering, we augment the existing transactions test to read and write from topics with one partition. Since we are writing monotonically increasing numbers, the topics should always be sorted, making it very easy to check for out of order messages. Author: Apurva Mehta <apu...@confluent.io> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #3969 from apurvam/KAFKA-5888-system-test-which-check-ordering Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd6347a5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd6347a5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd6347a5 Branch: refs/heads/trunk Commit: dd6347a5df5299efeeae27d5b196182058027502 Parents: 89ba0c1 Author: Apurva Mehta <apu...@confluent.io> Authored: Thu Sep 28 13:03:07 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Sep 28 13:03:07 2017 -0700 ---------------------------------------------------------------------- tests/kafkatest/tests/core/transactions_test.py | 124 +++++++++++-------- 1 file changed, 74 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dd6347a5/tests/kafkatest/tests/core/transactions_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index d8f9e5c..0914844 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -47,53 +47,35 @@ class TransactionsTest(Test): self.num_output_partitions = 3 self.num_seed_messages = 100000 self.transaction_size = 750 - self.first_transactional_id = "my-first-transactional-id" - self.second_transactional_id = "my-second-transactional-id" self.consumer_group = "transactions-test-consumer-group" self.zk = ZookeeperService(test_context, num_nodes=1) self.kafka = KafkaService(test_context, num_nodes=self.num_brokers, - zk=self.zk, - topics = { - self.input_topic: { - "partitions": self.num_input_partitions, - "replication-factor": 3, - "configs": { - "min.insync.replicas": 2 - } - }, - self.output_topic: { - "partitions": self.num_output_partitions, - "replication-factor": 3, - "configs": { - "min.insync.replicas": 2 - } - } - }) + zk=self.zk) def setUp(self): self.zk.start() - def seed_messages(self): + def seed_messages(self, topic, num_seed_messages): seed_timeout_sec = 10000 seed_producer = VerifiableProducer(context=self.test_context, num_nodes=1, kafka=self.kafka, - topic=self.input_topic, + topic=topic, message_validator=is_int, - max_messages=self.num_seed_messages, + max_messages=num_seed_messages, enable_idempotence=True) seed_producer.start() - wait_until(lambda: seed_producer.num_acked >= self.num_seed_messages, + wait_until(lambda: seed_producer.num_acked >= num_seed_messages, timeout_sec=seed_timeout_sec, err_msg="Producer failed to produce messages %d in %ds." %\ (self.num_seed_messages, seed_timeout_sec)) return seed_producer.acked - def get_messages_from_output_topic(self): - consumer = self.start_consumer(self.output_topic, group_id="verifying_consumer") - return self.drain_consumer(consumer) + def get_messages_from_topic(self, topic, num_messages): + consumer = self.start_consumer(topic, group_id="verifying_consumer") + return self.drain_consumer(consumer, num_messages) def bounce_brokers(self, clean_shutdown): for node in self.kafka.nodes: @@ -107,16 +89,16 @@ class TransactionsTest(Test): hard-killed broker %s" % str(node.account)) self.kafka.start_node(node) - def create_and_start_message_copier(self, input_partition, transactional_id): + def create_and_start_message_copier(self, input_topic, input_partition, output_topic, transactional_id): message_copier = TransactionalMessageCopier( context=self.test_context, num_nodes=1, kafka=self.kafka, transactional_id=transactional_id, consumer_group=self.consumer_group, - input_topic=self.input_topic, + input_topic=input_topic, input_partition=input_partition, - output_topic=self.output_topic, + output_topic=output_topic, max_messages=-1, transaction_size=self.transaction_size ) @@ -137,16 +119,15 @@ class TransactionsTest(Test): str(copier.progress_percent()))) copier.restart(clean_shutdown) - def create_and_start_copiers(self): + def create_and_start_copiers(self, input_topic, output_topic, num_copiers): copiers = [] - copiers.append(self.create_and_start_message_copier( - input_partition=0, - transactional_id=self.first_transactional_id - )) - copiers.append(self.create_and_start_message_copier( - input_partition=1, - transactional_id=self.second_transactional_id - )) + for i in range(0, num_copiers): + copiers.append(self.create_and_start_message_copier( + input_topic=input_topic, + output_topic=output_topic, + input_partition=i, + transactional_id="copier-" + str(i) + )) return copiers def start_consumer(self, topic_to_read, group_id): @@ -167,7 +148,7 @@ class TransactionsTest(Test): 60) return consumer - def drain_consumer(self, consumer): + def drain_consumer(self, consumer, num_messages): # wait until we read at least the expected number of messages. # This is a safe check because both failure modes will be caught: # 1. If we have 'num_seed_messages' but there are duplicates, then @@ -175,14 +156,16 @@ class TransactionsTest(Test): # # 2. If we never reach 'num_seed_messages', then this will cause the # test to fail. - wait_until(lambda: len(consumer.messages_consumed[1]) >= self.num_seed_messages, + wait_until(lambda: len(consumer.messages_consumed[1]) >= num_messages, timeout_sec=90, err_msg="Consumer consumed only %d out of %d messages in %ds" %\ - (len(consumer.messages_consumed[1]), self.num_seed_messages, 90)) + (len(consumer.messages_consumed[1]), num_messages, 90)) consumer.stop() return consumer.messages_consumed[1] - def copy_messages_transactionally(self, failure_mode, bounce_target): + def copy_messages_transactionally(self, failure_mode, bounce_target, + input_topic, output_topic, + num_copiers, num_messages_to_copy): """Copies messages transactionally from the seeded input topic to the output topic, either bouncing brokers or clients in a hard and soft way as it goes. @@ -192,8 +175,10 @@ class TransactionsTest(Test): It returns the concurrently consumed messages. """ - copiers = self.create_and_start_copiers() - concurrent_consumer = self.start_consumer(self.output_topic, + copiers = self.create_and_start_copiers(input_topic=input_topic, + output_topic=output_topic, + num_copiers=num_copiers) + concurrent_consumer = self.start_consumer(output_topic, group_id="concurrent_consumer") clean_shutdown = False if failure_mode == "clean_bounce": @@ -210,22 +195,57 @@ class TransactionsTest(Test): err_msg="%s - Failed to copy all messages in %ds." %\ (copier.transactional_id, 120)) self.logger.info("finished copying messages") - return self.drain_consumer(concurrent_consumer) + + return self.drain_consumer(concurrent_consumer, num_messages_to_copy) + + def setup_topics(self): + self.kafka.topics = { + self.input_topic: { + "partitions": self.num_input_partitions, + "replication-factor": 3, + "configs": { + "min.insync.replicas": 2 + } + }, + self.output_topic: { + "partitions": self.num_output_partitions, + "replication-factor": 3, + "configs": { + "min.insync.replicas": 2 + } + } + } @cluster(num_nodes=9) @matrix(failure_mode=["hard_bounce", "clean_bounce"], - bounce_target=["brokers", "clients"]) - def test_transactions(self, failure_mode, bounce_target): + bounce_target=["brokers", "clients"], + check_order=[True, False]) + def test_transactions(self, failure_mode, bounce_target, check_order): security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol self.kafka.logs["kafka_data_1"]["collect_default"] = True self.kafka.logs["kafka_data_2"]["collect_default"] = True self.kafka.logs["kafka_operational_logs_debug"]["collect_default"] = True + if check_order: + # To check ordering, we simply create input and output topics + # with a single partition. + # We reduce the number of seed messages to copy to account for the fewer output + # partitions, and thus lower parallelism. This helps keep the test + # time shorter. + self.num_seed_messages = self.num_seed_messages / 3 + self.num_input_partitions = 1 + self.num_output_partitions = 1 + + self.setup_topics() self.kafka.start() - input_messages = self.seed_messages() - concurrently_consumed_messages = self.copy_messages_transactionally(failure_mode, bounce_target) - output_messages = self.get_messages_from_output_topic() + + input_messages = self.seed_messages(self.input_topic, self.num_seed_messages) + concurrently_consumed_messages = self.copy_messages_transactionally( + failure_mode, bounce_target, input_topic=self.input_topic, + output_topic=self.output_topic, num_copiers=self.num_input_partitions, + num_messages_to_copy=self.num_seed_messages) + output_messages = self.get_messages_from_topic(self.output_topic, self.num_seed_messages) concurrently_consumed_message_set = set(concurrently_consumed_messages) output_message_set = set(output_messages) @@ -242,3 +262,7 @@ class TransactionsTest(Test): assert input_message_set == concurrently_consumed_message_set, \ "Input and concurrently consumed output message sets are not equal. Num input messages: %d. Num concurrently_consumed_messages: %d" %\ (len(input_message_set), len(concurrently_consumed_message_set)) + if check_order: + assert input_messages == sorted(input_messages), "The seed messages themselves were not in order" + assert output_messages == input_messages, "Output messages are not in order" + assert concurrently_consumed_messages == output_messages, "Concurrently consumed messages are not in order"