Repository: kafka Updated Branches: refs/heads/trunk 2e91806db -> f154956a7
KAFKA-2845: new client old broker compatibility Author: Geoff Anderson <[email protected]> Reviewers: Ismael Juma, Guozhang Wang Closes #537 from granders/KAFKA-2845-new-client-old-broker-compatibility Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f154956a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f154956a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f154956a Branch: refs/heads/trunk Commit: f154956a76790220a9ecb84d88644076c6885683 Parents: 2e91806 Author: Geoff Anderson <[email protected]> Authored: Wed Nov 18 17:13:21 2015 -0800 Committer: Confluent <[email protected]> Committed: Wed Nov 18 17:13:21 2015 -0800 ---------------------------------------------------------------------- tests/kafkatest/services/console_consumer.py | 7 +- tests/kafkatest/tests/compatibility_test.py | 101 ++++++++++++++++++++++ tests/kafkatest/tests/upgrade_test.py | 3 - 3 files changed, 105 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f154956a/tests/kafkatest/services/console_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 84d358d..e42b20e 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -99,9 +99,9 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService): "collect_default": True} } - def __init__(self, context, num_nodes, kafka, topic, new_consumer=False, message_validator=None, - from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", - print_key=False, jmx_object_names=None, jmx_attributes=[]): + def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=False, + message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=TRUNK, + client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=[]): """ Args: context: standard context @@ -121,6 +121,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService): BackgroundThreadService.__init__(self, context, num_nodes) self.kafka = kafka self.new_consumer = new_consumer + self.group_id = group_id self.args = { 'topic': topic, } http://git-wip-us.apache.org/repos/asf/kafka/blob/f154956a/tests/kafkatest/tests/compatibility_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/compatibility_test.py b/tests/kafkatest/tests/compatibility_test.py new file mode 100644 index 0000000..0310d2f --- /dev/null +++ b/tests/kafkatest/tests/compatibility_test.py @@ -0,0 +1,101 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.console_consumer import ConsoleConsumer, is_int + + +class ClientCompatibilityTest(Test): + + def __init__(self, test_context): + super(ClientCompatibilityTest, self).__init__(test_context=test_context) + + def setUp(self): + self.topic = "test_topic" + self.zk = ZookeeperService(self.test_context, num_nodes=1) + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: { + "partitions": 3, + "replication-factor": 3, + "min.insync.replicas": 2}}) + self.zk.start() + self.kafka.start() + + # Producer and consumer + self.producer_throughput = 10000 + self.num_producers = 1 + self.num_consumers = 1 + + def test_producer_back_compatibility(self): + """Run 0.9.X java producer against 0.8.X brokers. + This test documents the fact that java producer v0.9.0.0 and later won't run against 0.8.X brokers + the broker responds to a V1 produce request with a V0 fetch response; the client then tries to parse this V0 + produce response as a V1 produce response, resulting in a BufferUnderflowException + """ + self.producer = VerifiableProducer( + self.test_context, self.num_producers, self.kafka, self.topic, max_messages=100, + throughput=self.producer_throughput, version=TRUNK) + + node = self.producer.nodes[0] + try: + self.producer.start() + self.producer.wait() + raise Exception("0.9.X java producer should not run successfully against 0.8.X broker") + except: + # Expected + pass + finally: + self.producer.kill_node(node, clean_shutdown=False) + + self.logger.info("Grepping producer log for expected error type") + node.account.ssh("egrep -m 1 %s %s" % ("\"org\.apache\.kafka\.common\.protocol\.types\.SchemaException.*throttle_time_ms.*: java\.nio\.BufferUnderflowException\"", self.producer.LOG_FILE), allow_fail=False) + + def test_consumer_back_compatibility(self): + """Run the scala 0.8.X consumer against an 0.9.X cluster. + Expect 0.8.X scala consumer to fail with buffer underflow. This error is the same as when an 0.9.X producer + is run against an 0.8.X broker: the broker responds to a V1 fetch request with a V0 fetch response; the + client then tries to parse this V0 fetch response as a V1 fetch response, resulting in a BufferUnderflowException + """ + num_messages = 10 + self.producer = VerifiableProducer( + self.test_context, self.num_producers, self.kafka, self.topic, max_messages=num_messages, + throughput=self.producer_throughput, version=LATEST_0_8_2) + + self.consumer = ConsoleConsumer( + self.test_context, self.num_consumers, self.kafka, self.topic, group_id="consumer-09X", + consumer_timeout_ms=10000, message_validator=is_int, version=TRUNK) + + self.old_consumer = ConsoleConsumer( + self.test_context, self.num_consumers, self.kafka, self.topic, group_id="consumer-08X", + consumer_timeout_ms=10000, message_validator=is_int, version=LATEST_0_8_2) + + self.producer.run() + self.consumer.run() + self.old_consumer.run() + + consumed = len(self.consumer.messages_consumed[1]) + old_consumed = len(self.old_consumer.messages_consumed[1]) + assert old_consumed == num_messages, "Expected 0.8.X scala consumer to consume %d, but only got %d" % (num_messages, old_consumed) + assert consumed == 0, "Expected 0.9.X scala consumer to fail to consume any messages, but got %d" % consumed + + self.logger.info("Grepping consumer log for expected error type") + node = self.consumer.nodes[0] + node.account.ssh("egrep -m 1 %s %s" % ("\"java\.nio\.BufferUnderflowException\"", self.consumer.LOG_FILE), allow_fail=False) + + + http://git-wip-us.apache.org/repos/asf/kafka/blob/f154956a/tests/kafkatest/tests/upgrade_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py index 97605cd..245129a 100644 --- a/tests/kafkatest/tests/upgrade_test.py +++ b/tests/kafkatest/tests/upgrade_test.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK @@ -77,5 +76,3 @@ class TestUpgrade(ProduceConsumeValidateTest): """ self.run_produce_consume_validate(core_test_action=self.perform_upgrade) - -
