Repository: kafka Updated Branches: refs/heads/trunk 30e78fa00 -> a1eb12d7c
KAFKA-3188: Compatibility test for old and new clients with 0.10 broker apovzner becketqin please have a look if you can. Thanks. Author: Eno Thereska <[email protected]> Reviewers: Anna Povzner, Gwen Shapira Closes #1059 from enothereska/kafka-3188-compatibility Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1eb12d7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1eb12d7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1eb12d7 Branch: refs/heads/trunk Commit: a1eb12d7c6ad9422b9cf24b670d1b4c11227b03e Parents: 30e78fa Author: Eno Thereska <[email protected]> Authored: Thu Mar 17 13:17:01 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Thu Mar 17 13:17:01 2016 -0700 ---------------------------------------------------------------------- .../kafkatest/services/kafka/config_property.py | 2 +- .../tests/compatibility_test_new_broker.py | 78 ++++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a1eb12d7/tests/kafkatest/services/kafka/config_property.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py index 8f30f13..e1801ef 100644 --- a/tests/kafkatest/services/kafka/config_property.py +++ b/tests/kafkatest/services/kafka/config_property.py @@ -41,7 +41,7 @@ ZOOKEEPER_CONNECT = "zookeeper.connect" ZOOKEEPER_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms" INTER_BROKER_PROTOCOL_VERSION = "inter.broker.protocol.version" MESSAGE_FORMAT_VERSION = "log.message.format.version" - +MESSAGE_TIMESTAMP_TYPE = "message.timestamp.type" """ http://git-wip-us.apache.org/repos/asf/kafka/blob/a1eb12d7/tests/kafkatest/tests/compatibility_test_new_broker.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/compatibility_test_new_broker.py b/tests/kafkatest/tests/compatibility_test_new_broker.py new file mode 100644 index 0000000..2c261df --- /dev/null +++ b/tests/kafkatest/tests/compatibility_test_new_broker.py @@ -0,0 +1,78 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test +from ducktape.mark import parametrize +from ducktape.utils.util import wait_until +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.utils import is_int +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest +from kafkatest.services.kafka import config_property + +# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x) +class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest): + + def __init__(self, test_context): + super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context) + + def setUp(self): + self.topic = "test_topic" + self.zk = ZookeeperService(self.test_context, num_nodes=1) + + self.zk.start() + + # Producer and consumer + self.producer_throughput = 10000 + self.num_producers = 1 + self.num_consumers = 1 + self.messages_per_producer = 1000 + + @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], timestamp_type=None) + @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None) + @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=None) + @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None) + @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=None) + @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("CreateTime")) + @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime")) + @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime")) + @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=str("LogAppendTime")) + def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=False, timestamp_type=None): + + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: { + "partitions": 3, + "replication-factor": 3, + 'configs': {"min.insync.replicas": 2}}}) + for node in self.kafka.nodes: + if timestamp_type is not None: + node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type + self.kafka.start() + + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, + self.topic, throughput=self.producer_throughput, + message_validator=is_int, + compression_types=compression_types, + version=KafkaVersion(producer_version)) + + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, + self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer, + message_validator=is_int, version=KafkaVersion(consumer_version)) + + self.run_produce_consume_validate(lambda: wait_until( + lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True, + timeout_sec=120, backoff_sec=1, + err_msg="Producer did not produce all messages in reasonable amount of time"))
