Repository: kafka Updated Branches: refs/heads/trunk e89a9ce1a -> 0d8cbbcb2
HOTFIX: Renamed tests to match expected suffix ewencp gwenshap granders could you have a look please? Thanks. Author: Eno Thereska <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1096 from enothereska/systest-hotfix-name Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0d8cbbcb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0d8cbbcb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0d8cbbcb Branch: refs/heads/trunk Commit: 0d8cbbcb208ccaf1cb84df0440331d4cef064391 Parents: e89a9ce Author: Eno Thereska <[email protected]> Authored: Fri Mar 18 12:01:56 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Fri Mar 18 12:01:56 2016 -0700 ---------------------------------------------------------------------- .../tests/compatibility_test_new_broker.py | 78 ----------------- .../tests/compatibility_test_new_broker_test.py | 78 +++++++++++++++++ .../kafkatest/tests/consumer_rolling_upgrade.py | 82 ----------------- .../tests/consumer_rolling_upgrade_test.py | 82 +++++++++++++++++ tests/kafkatest/tests/message_format_change.py | 92 -------------------- .../tests/message_format_change_test.py | 92 ++++++++++++++++++++ 6 files changed, 252 insertions(+), 252 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/compatibility_test_new_broker.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/compatibility_test_new_broker.py b/tests/kafkatest/tests/compatibility_test_new_broker.py deleted file mode 100644 index 2c261df..0000000 --- a/tests/kafkatest/tests/compatibility_test_new_broker.py +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright 2015 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.tests.test import Test -from ducktape.mark import parametrize -from ducktape.utils.util import wait_until -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService -from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.utils import is_int -from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest -from kafkatest.services.kafka import config_property - -# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x) -class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest): - - def __init__(self, test_context): - super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context) - - def setUp(self): - self.topic = "test_topic" - self.zk = ZookeeperService(self.test_context, num_nodes=1) - - self.zk.start() - - # Producer and consumer - self.producer_throughput = 10000 - self.num_producers = 1 - self.num_consumers = 1 - self.messages_per_producer = 1000 - - @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], timestamp_type=None) - @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None) - @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=None) - @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None) - @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=None) - @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime")) - @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime")) - @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=str("LogAppendTime")) - def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=False, timestamp_type=None): - - self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: { - "partitions": 3, - "replication-factor": 3, - 'configs': {"min.insync.replicas": 2}}}) - for node in self.kafka.nodes: - if timestamp_type is not None: - node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type - self.kafka.start() - - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, - self.topic, throughput=self.producer_throughput, - message_validator=is_int, - compression_types=compression_types, - version=KafkaVersion(producer_version)) - - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, - self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer, - message_validator=is_int, version=KafkaVersion(consumer_version)) - - self.run_produce_consume_validate(lambda: wait_until( - lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True, - timeout_sec=120, backoff_sec=1, - err_msg="Producer did not produce all messages in reasonable amount of time")) http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/compatibility_test_new_broker_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/compatibility_test_new_broker_test.py new file mode 100644 index 0000000..2c261df --- /dev/null +++ b/tests/kafkatest/tests/compatibility_test_new_broker_test.py @@ -0,0 +1,78 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test +from ducktape.mark import parametrize +from ducktape.utils.util import wait_until +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.utils import is_int +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest +from kafkatest.services.kafka import config_property + +# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x) +class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest): + + def __init__(self, test_context): + super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context) + + def setUp(self): + self.topic = "test_topic" + self.zk = ZookeeperService(self.test_context, num_nodes=1) + + self.zk.start() + + # Producer and consumer + self.producer_throughput = 10000 + self.num_producers = 1 + self.num_consumers = 1 + self.messages_per_producer = 1000 + + @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], timestamp_type=None) + @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None) + @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=None) + @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None) + @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=None) + @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("CreateTime")) + @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime")) + @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime")) + @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=str("LogAppendTime")) + def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=False, timestamp_type=None): + + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: { + "partitions": 3, + "replication-factor": 3, + 'configs': {"min.insync.replicas": 2}}}) + for node in self.kafka.nodes: + if timestamp_type is not None: + node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type + self.kafka.start() + + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, + self.topic, throughput=self.producer_throughput, + message_validator=is_int, + compression_types=compression_types, + version=KafkaVersion(producer_version)) + + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, + self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer, + message_validator=is_int, version=KafkaVersion(consumer_version)) + + self.run_produce_consume_validate(lambda: wait_until( + lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True, + timeout_sec=120, backoff_sec=1, + err_msg="Producer did not produce all messages in reasonable amount of time")) http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/consumer_rolling_upgrade.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/consumer_rolling_upgrade.py b/tests/kafkatest/tests/consumer_rolling_upgrade.py deleted file mode 100644 index 3cd3c7c..0000000 --- a/tests/kafkatest/tests/consumer_rolling_upgrade.py +++ /dev/null @@ -1,82 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.utils.util import wait_until - -from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest -from kafkatest.services.kafka import TopicPartition - -class ConsumerRollingUpgradeTest(VerifiableConsumerTest): - TOPIC = "test_topic" - NUM_PARTITIONS = 4 - RANGE = "org.apache.kafka.clients.consumer.RangeAssignor" - ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor" - - def __init__(self, test_context): - super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0, - num_zk=1, num_brokers=1, topics={ - self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 } - }) - - def _verify_range_assignment(self, consumer): - # range assignment should give us two partition sets: (0, 1) and (2, 3) - assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()]) - assert assignment == set([ - frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]), - frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])]) - - def _verify_roundrobin_assignment(self, consumer): - assignment = set([frozenset(x) for x in consumer.current_assignment().values()]) - assert assignment == set([ - frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]), - frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])]) - - def rolling_update_test(self): - """ - Verify rolling updates of partition assignment strategies works correctly. In this - test, we use a rolling restart to change the group's assignment strategy from "range" - to "roundrobin." We verify after every restart that all members are still in the group - and that the correct assignment strategy was used. - """ - - # initialize the consumer using range assignment - consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE) - - consumer.start() - self.await_all_members(consumer) - self._verify_range_assignment(consumer) - - # change consumer configuration to prefer round-robin assignment, but still support range assignment - consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE - - # restart one of the nodes and verify that we are still using range assignment - consumer.stop_node(consumer.nodes[0]) - consumer.start_node(consumer.nodes[0]) - self.await_all_members(consumer) - self._verify_range_assignment(consumer) - - # now restart the other node and verify that we have switched to round-robin - consumer.stop_node(consumer.nodes[1]) - consumer.start_node(consumer.nodes[1]) - self.await_all_members(consumer) - self._verify_roundrobin_assignment(consumer) - - # if we want, we can now drop support for range assignment - consumer.assignment_strategy = self.ROUND_ROBIN - for node in consumer.nodes: - consumer.stop_node(node) - consumer.start_node(node) - self.await_all_members(consumer) - self._verify_roundrobin_assignment(consumer) http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/consumer_rolling_upgrade_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/consumer_rolling_upgrade_test.py new file mode 100644 index 0000000..3cd3c7c --- /dev/null +++ b/tests/kafkatest/tests/consumer_rolling_upgrade_test.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.utils.util import wait_until + +from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest +from kafkatest.services.kafka import TopicPartition + +class ConsumerRollingUpgradeTest(VerifiableConsumerTest): + TOPIC = "test_topic" + NUM_PARTITIONS = 4 + RANGE = "org.apache.kafka.clients.consumer.RangeAssignor" + ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor" + + def __init__(self, test_context): + super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0, + num_zk=1, num_brokers=1, topics={ + self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 } + }) + + def _verify_range_assignment(self, consumer): + # range assignment should give us two partition sets: (0, 1) and (2, 3) + assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()]) + assert assignment == set([ + frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]), + frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])]) + + def _verify_roundrobin_assignment(self, consumer): + assignment = set([frozenset(x) for x in consumer.current_assignment().values()]) + assert assignment == set([ + frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]), + frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])]) + + def rolling_update_test(self): + """ + Verify rolling updates of partition assignment strategies works correctly. In this + test, we use a rolling restart to change the group's assignment strategy from "range" + to "roundrobin." We verify after every restart that all members are still in the group + and that the correct assignment strategy was used. + """ + + # initialize the consumer using range assignment + consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE) + + consumer.start() + self.await_all_members(consumer) + self._verify_range_assignment(consumer) + + # change consumer configuration to prefer round-robin assignment, but still support range assignment + consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE + + # restart one of the nodes and verify that we are still using range assignment + consumer.stop_node(consumer.nodes[0]) + consumer.start_node(consumer.nodes[0]) + self.await_all_members(consumer) + self._verify_range_assignment(consumer) + + # now restart the other node and verify that we have switched to round-robin + consumer.stop_node(consumer.nodes[1]) + consumer.start_node(consumer.nodes[1]) + self.await_all_members(consumer) + self._verify_roundrobin_assignment(consumer) + + # if we want, we can now drop support for range assignment + consumer.assignment_strategy = self.ROUND_ROBIN + for node in consumer.nodes: + consumer.stop_node(node) + consumer.start_node(node) + self.await_all_members(consumer) + self._verify_roundrobin_assignment(consumer) http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/message_format_change.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/message_format_change.py b/tests/kafkatest/tests/message_format_change.py deleted file mode 100644 index 357fd17..0000000 --- a/tests/kafkatest/tests/message_format_change.py +++ /dev/null @@ -1,92 +0,0 @@ -# Copyright 2015 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.tests.test import Test -from ducktape.mark import parametrize -from ducktape.utils.util import wait_until -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService -from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.utils import is_int -from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest -from kafkatest.services.kafka import config_property -import time - - -class MessageFormatChangeTest(ProduceConsumeValidateTest): - - def __init__(self, test_context): - super(MessageFormatChangeTest, self).__init__(test_context=test_context) - - def setUp(self): - self.topic = "test_topic" - self.zk = ZookeeperService(self.test_context, num_nodes=1) - - self.zk.start() - - # Producer and consumer - self.producer_throughput = 10000 - self.num_producers = 1 - self.num_consumers = 1 - self.messages_per_producer = 100 - - def produce_and_consume(self, producer_version, consumer_version, group): - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, - self.topic, - throughput=self.producer_throughput, - message_validator=is_int, - version=KafkaVersion(producer_version)) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, - self.topic, consumer_timeout_ms=30000, - message_validator=is_int, version=KafkaVersion(consumer_version)) - self.consumer.group_id = group - self.run_produce_consume_validate(lambda: wait_until( - lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True, - timeout_sec=120, backoff_sec=1, - err_msg="Producer did not produce all messages in reasonable amount of time")) - - @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK)) - @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9)) - def test_compatibility(self, producer_version, consumer_version): - """ This tests performs the following checks: - The workload is a mix of 0.9.x and 0.10.x producers and consumers - that produce to and consume from a 0.10.x cluster - 1. initially the topic is using message format 0.9.0 - 2. change the message format version for topic to 0.10.0 on the fly. - 3. change the message format version for topic back to 0.9.0 on the fly. - - The producers and consumers should not have any issue. - - Note that for 0.9.x consumers/producers we only do steps 1 and 2 - """ - self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: { - "partitions": 3, - "replication-factor": 3, - 'configs': {"min.insync.replicas": 2}}}) - - self.kafka.start() - self.logger.info("First format change to 0.9.0") - self.kafka.alter_message_format(self.topic, str(LATEST_0_9)) - self.produce_and_consume(producer_version, consumer_version, "group1") - - self.logger.info("Second format change to 0.10.0") - self.kafka.alter_message_format(self.topic, str(LATEST_0_10)) - self.produce_and_consume(producer_version, consumer_version, "group2") - - if producer_version == str(TRUNK) and consumer_version == str(TRUNK): - self.logger.info("Third format change back to 0.9.0") - self.kafka.alter_message_format(self.topic, str(LATEST_0_9)) - self.produce_and_consume(producer_version, consumer_version, "group3") - - http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/message_format_change_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/message_format_change_test.py b/tests/kafkatest/tests/message_format_change_test.py new file mode 100644 index 0000000..357fd17 --- /dev/null +++ b/tests/kafkatest/tests/message_format_change_test.py @@ -0,0 +1,92 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test +from ducktape.mark import parametrize +from ducktape.utils.util import wait_until +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.utils import is_int +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest +from kafkatest.services.kafka import config_property +import time + + +class MessageFormatChangeTest(ProduceConsumeValidateTest): + + def __init__(self, test_context): + super(MessageFormatChangeTest, self).__init__(test_context=test_context) + + def setUp(self): + self.topic = "test_topic" + self.zk = ZookeeperService(self.test_context, num_nodes=1) + + self.zk.start() + + # Producer and consumer + self.producer_throughput = 10000 + self.num_producers = 1 + self.num_consumers = 1 + self.messages_per_producer = 100 + + def produce_and_consume(self, producer_version, consumer_version, group): + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, + self.topic, + throughput=self.producer_throughput, + message_validator=is_int, + version=KafkaVersion(producer_version)) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, + self.topic, consumer_timeout_ms=30000, + message_validator=is_int, version=KafkaVersion(consumer_version)) + self.consumer.group_id = group + self.run_produce_consume_validate(lambda: wait_until( + lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True, + timeout_sec=120, backoff_sec=1, + err_msg="Producer did not produce all messages in reasonable amount of time")) + + @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK)) + @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9)) + def test_compatibility(self, producer_version, consumer_version): + """ This tests performs the following checks: + The workload is a mix of 0.9.x and 0.10.x producers and consumers + that produce to and consume from a 0.10.x cluster + 1. initially the topic is using message format 0.9.0 + 2. change the message format version for topic to 0.10.0 on the fly. + 3. change the message format version for topic back to 0.9.0 on the fly. + - The producers and consumers should not have any issue. + - Note that for 0.9.x consumers/producers we only do steps 1 and 2 + """ + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: { + "partitions": 3, + "replication-factor": 3, + 'configs': {"min.insync.replicas": 2}}}) + + self.kafka.start() + self.logger.info("First format change to 0.9.0") + self.kafka.alter_message_format(self.topic, str(LATEST_0_9)) + self.produce_and_consume(producer_version, consumer_version, "group1") + + self.logger.info("Second format change to 0.10.0") + self.kafka.alter_message_format(self.topic, str(LATEST_0_10)) + self.produce_and_consume(producer_version, consumer_version, "group2") + + if producer_version == str(TRUNK) and consumer_version == str(TRUNK): + self.logger.info("Third format change back to 0.9.0") + self.kafka.alter_message_format(self.topic, str(LATEST_0_9)) + self.produce_and_consume(producer_version, consumer_version, "group3") + +
