http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core/upgrade_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py new file mode 100644 index 0000000..15a9696 --- /dev/null +++ b/tests/kafkatest/tests/core/upgrade_test.py @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.mark import parametrize + +import json + +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import config_property +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest +from kafkatest.utils import is_int +from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, TRUNK, KafkaVersion + +class TestUpgrade(ProduceConsumeValidateTest): + + def __init__(self, test_context): + super(TestUpgrade, self).__init__(test_context=test_context) + + def setUp(self): + self.topic = "test_topic" + self.zk = ZookeeperService(self.test_context, num_nodes=1) + self.zk.start() + + # Producer and consumer + self.producer_throughput = 10000 + self.num_producers = 1 + self.num_consumers = 1 + + def perform_upgrade(self, from_kafka_version, to_message_format_version=None): + self.logger.info("First pass bounce - rolling upgrade") + for node in self.kafka.nodes: + self.kafka.stop_node(node) + node.version = TRUNK + node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version + node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version + self.kafka.start_node(node) + + self.logger.info("Second pass bounce - remove inter.broker.protocol.version config") + for node in self.kafka.nodes: + self.kafka.stop_node(node) + del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] + if to_message_format_version is None: + del node.config[config_property.MESSAGE_FORMAT_VERSION] + else: + node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version + self.kafka.start_node(node) + + @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"], new_consumer=False) + @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"]) + @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=False) + @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], security_protocol="SASL_SSL") + @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"]) + @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"], new_consumer=False) + @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"]) + @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False) + @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"]) + @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=False) + @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"]) + @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"], new_consumer=False) + @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"], new_consumer=False) + def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types, + new_consumer=True, security_protocol="PLAINTEXT"): + """Test upgrade of Kafka broker cluster from 0.8.2, 0.9.0 or 0.10.0 to the current version + + from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X, 0.9.0.x or 0.10.0.x + + If to_message_format_version is None, it means that we will upgrade to default (latest) + message format version. It is possible to upgrade to 0.10 brokers but still use message + format version 0.9 + + - Start 3 node broker cluster on version 'from_kafka_version' + - Start producer and consumer in the background + - Perform two-phase rolling upgrade + - First phase: upgrade brokers to 0.10 with inter.broker.protocol.version set to + from_kafka_version and log.message.format.version set to from_kafka_version + - Second phase: remove inter.broker.protocol.version config with rolling bounce; if + to_message_format_version is set to 0.9, set log.message.format.version to + to_message_format_version, otherwise remove log.message.format.version config + - Finally, validate that every message acked by the producer was consumed by the consumer + """ + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, + version=KafkaVersion(from_kafka_version), + topics={self.topic: {"partitions": 3, "replication-factor": 3, + 'configs': {"min.insync.replicas": 2}}}) + self.kafka.security_protocol = security_protocol + self.kafka.interbroker_security_protocol = security_protocol + self.kafka.start() + + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, + self.topic, throughput=self.producer_throughput, + message_validator=is_int, + compression_types=compression_types, + version=KafkaVersion(from_kafka_version)) + + assert self.zk.query("/cluster/id") is None + + # TODO - reduce the timeout + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, + self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer, + message_validator=is_int, version=KafkaVersion(from_kafka_version)) + + self.run_produce_consume_validate(core_test_action=lambda: self.perform_upgrade(from_kafka_version, + to_message_format_version)) + + cluster_id_json = self.zk.query("/cluster/id") + assert cluster_id_json is not None + try: + cluster_id = json.loads(cluster_id_json) + except : + self.logger.debug("Data in /cluster/id znode could not be parsed. Data = %s" % cluster_id_json) + + self.logger.debug("Cluster id [%s]", cluster_id) + assert len(cluster_id["id"]) == 22
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py new file mode 100644 index 0000000..0cfdf16 --- /dev/null +++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py @@ -0,0 +1,115 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.mark import matrix + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest +from kafkatest.services.security.kafka_acls import ACLs +from kafkatest.utils import is_int + +class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest): + """Tests a rolling upgrade for zookeeper. + """ + + def __init__(self, test_context): + super(ZooKeeperSecurityUpgradeTest, self).__init__(test_context=test_context) + + def setUp(self): + self.topic = "test_topic" + self.group = "group" + self.producer_throughput = 100 + self.num_producers = 1 + self.num_consumers = 1 + self.acls = ACLs(self.test_context) + + self.zk = ZookeeperService(self.test_context, num_nodes=3) + + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: { + "partitions": 3, + "replication-factor": 3, + 'configs': {"min.insync.replicas": 2}}}) + + def create_producer_and_consumer(self): + self.producer = VerifiableProducer( + self.test_context, self.num_producers, self.kafka, self.topic, + throughput=self.producer_throughput) + + self.consumer = ConsoleConsumer( + self.test_context, self.num_consumers, self.kafka, self.topic, + consumer_timeout_ms=60000, message_validator=is_int) + + self.consumer.group_id = self.group + + @property + def no_sasl(self): + return self.kafka.security_protocol == "PLAINTEXT" or self.kafka.security_protocol == "SSL" + + @property + def is_secure(self): + return self.kafka.security_protocol == "SASL_PLAINTEXT" \ + or self.kafka.security_protocol == "SSL" \ + or self.kafka.security_protocol == "SASL_SSL" + + def run_zk_migration(self): + # change zk config (auth provider + jaas login) + self.zk.kafka_opts = self.zk.security_system_properties + self.zk.zk_sasl = True + if self.no_sasl: + self.kafka.start_minikdc(self.zk.zk_principals) + # restart zk + for node in self.zk.nodes: + self.zk.stop_node(node) + self.zk.start_node(node) + + # restart broker with jaas login + for node in self.kafka.nodes: + self.kafka.stop_node(node) + self.kafka.start_node(node) + + # run migration tool + for node in self.zk.nodes: + self.zk.zookeeper_migration(node, "secure") + + # restart broker with zookeeper.set.acl=true and acls + self.kafka.zk_set_acl = True + for node in self.kafka.nodes: + self.kafka.stop_node(node) + self.kafka.start_node(node) + + @matrix(security_protocol=["PLAINTEXT","SSL","SASL_SSL","SASL_PLAINTEXT"]) + def test_zk_security_upgrade(self, security_protocol): + self.zk.start() + self.kafka.security_protocol = security_protocol + self.kafka.interbroker_security_protocol = security_protocol + + # set acls + if self.is_secure: + self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER + self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group) + + if(self.no_sasl): + self.kafka.start() + else: + self.kafka.start(self.zk.zk_principals) + + #Create Producer and Consumer + self.create_producer_and_consumer() + + #Run upgrade + self.run_produce_consume_validate(self.run_zk_migration) http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core1/__init__.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core1/__init__.py b/tests/kafkatest/tests/core1/__init__.py deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core1/consumer_group_command_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core1/consumer_group_command_test.py b/tests/kafkatest/tests/core1/consumer_group_command_test.py deleted file mode 100644 index c3f59d9..0000000 --- a/tests/kafkatest/tests/core1/consumer_group_command_test.py +++ /dev/null @@ -1,106 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ducktape.utils.util import wait_until -from ducktape.tests.test import Test -from ducktape.mark import matrix - -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.security.security_config import SecurityConfig - -import os -import re - -TOPIC = "topic-consumer-group-command" - -class ConsumerGroupCommandTest(Test): - """ - Tests ConsumerGroupCommand - """ - # Root directory for persistent output - PERSISTENT_ROOT = "/mnt/consumer_group_command" - COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties") - - def __init__(self, test_context): - super(ConsumerGroupCommandTest, self).__init__(test_context) - self.num_zk = 1 - self.num_brokers = 1 - self.topics = { - TOPIC: {'partitions': 1, 'replication-factor': 1} - } - self.zk = ZookeeperService(test_context, self.num_zk) - - def setUp(self): - self.zk.start() - - def start_kafka(self, security_protocol, interbroker_security_protocol): - self.kafka = KafkaService( - self.test_context, self.num_brokers, - self.zk, security_protocol=security_protocol, - interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) - self.kafka.start() - - def start_consumer(self, security_protocol): - enable_new_consumer = security_protocol == SecurityConfig.SSL - self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC, - consumer_timeout_ms=None, new_consumer=enable_new_consumer) - self.consumer.start() - - def setup_and_verify(self, security_protocol, group=None): - self.start_kafka(security_protocol, security_protocol) - self.start_consumer(security_protocol) - consumer_node = self.consumer.nodes[0] - wait_until(lambda: self.consumer.alive(consumer_node), - timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") - kafka_node = self.kafka.nodes[0] - if security_protocol is not SecurityConfig.PLAINTEXT: - prop_file = str(self.kafka.security_config.client_config()) - self.logger.debug(prop_file) - kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) - kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file) - - # Verify ConsumerGroupCommand lists expected consumer groups - enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT - command_config_file = None - if enable_new_consumer: - command_config_file = self.COMMAND_CONFIG_FILE - - if group: - wait_until(lambda: re.search("topic-consumer-group-command",self.kafka.describe_consumer_group(group=group, node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file)), timeout_sec=10, - err_msg="Timed out waiting to list expected consumer groups.") - else: - wait_until(lambda: "test-consumer-group" in self.kafka.list_consumer_groups(node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10, - err_msg="Timed out waiting to list expected consumer groups.") - - self.consumer.stop() - - @matrix(security_protocol=['PLAINTEXT', 'SSL']) - def test_list_consumer_groups(self, security_protocol='PLAINTEXT'): - """ - Tests if ConsumerGroupCommand is listing correct consumer groups - :return: None - """ - self.setup_and_verify(security_protocol) - - @matrix(security_protocol=['PLAINTEXT', 'SSL']) - def test_describe_consumer_group(self, security_protocol='PLAINTEXT'): - """ - Tests if ConsumerGroupCommand is describing a consumer group correctly - :return: None - """ - self.setup_and_verify(security_protocol, group="test-consumer-group") http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core1/get_offset_shell_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core1/get_offset_shell_test.py b/tests/kafkatest/tests/core1/get_offset_shell_test.py deleted file mode 100644 index 38bd9dc..0000000 --- a/tests/kafkatest/tests/core1/get_offset_shell_test.py +++ /dev/null @@ -1,91 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ducktape.utils.util import wait_until -from ducktape.tests.test import Test -from kafkatest.services.verifiable_producer import VerifiableProducer - -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.security.security_config import SecurityConfig - -TOPIC = "topic-get-offset-shell" -MAX_MESSAGES = 100 -NUM_PARTITIONS = 1 -REPLICATION_FACTOR = 1 - -class GetOffsetShellTest(Test): - """ - Tests GetOffsetShell tool - """ - def __init__(self, test_context): - super(GetOffsetShellTest, self).__init__(test_context) - self.num_zk = 1 - self.num_brokers = 1 - self.messages_received_count = 0 - self.topics = { - TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR} - } - - self.zk = ZookeeperService(test_context, self.num_zk) - - - - def setUp(self): - self.zk.start() - - def start_kafka(self, security_protocol, interbroker_security_protocol): - self.kafka = KafkaService( - self.test_context, self.num_brokers, - self.zk, security_protocol=security_protocol, - interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) - self.kafka.start() - - def start_producer(self): - # This will produce to kafka cluster - self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES) - self.producer.start() - current_acked = self.producer.num_acked - wait_until(lambda: self.producer.num_acked >= current_acked + MAX_MESSAGES, timeout_sec=10, - err_msg="Timeout awaiting messages to be produced and acked") - - def start_consumer(self, security_protocol): - enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT - self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC, - consumer_timeout_ms=1000, new_consumer=enable_new_consumer) - self.consumer.start() - - def test_get_offset_shell(self, security_protocol='PLAINTEXT'): - """ - Tests if GetOffsetShell is getting offsets correctly - :return: None - """ - self.start_kafka(security_protocol, security_protocol) - self.start_producer() - - # Assert that offset fetched without any consumers consuming is 0 - assert self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, 0) - - self.start_consumer(security_protocol) - - node = self.consumer.nodes[0] - - wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") - - # Assert that offset is correctly indicated by GetOffsetShell tool - wait_until(lambda: "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, MAX_MESSAGES) in self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), timeout_sec=10, - err_msg="Timed out waiting to reach expected offset.") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core1/reassign_partitions_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core1/reassign_partitions_test.py b/tests/kafkatest/tests/core1/reassign_partitions_test.py deleted file mode 100644 index 850e2aa..0000000 --- a/tests/kafkatest/tests/core1/reassign_partitions_test.py +++ /dev/null @@ -1,110 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.mark import parametrize -from ducktape.utils.util import wait_until - -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest -from kafkatest.utils import is_int -import random - -class ReassignPartitionsTest(ProduceConsumeValidateTest): - """ - These tests validate partition reassignment. - Create a topic with few partitions, load some data, trigger partition re-assignment with and without broker failure, - check that partition re-assignment can complete and there is no data loss. - """ - - def __init__(self, test_context): - """:type test_context: ducktape.tests.test.TestContext""" - super(ReassignPartitionsTest, self).__init__(test_context=test_context) - - self.topic = "test_topic" - self.zk = ZookeeperService(test_context, num_nodes=1) - self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk, topics={self.topic: { - "partitions": 20, - "replication-factor": 3, - 'configs': {"min.insync.replicas": 2}} - }) - self.num_partitions = 20 - self.timeout_sec = 60 - self.producer_throughput = 1000 - self.num_producers = 1 - self.num_consumers = 1 - - def setUp(self): - self.zk.start() - - def min_cluster_size(self): - # Override this since we're adding services outside of the constructor - return super(ReassignPartitionsTest, self).min_cluster_size() + self.num_producers + self.num_consumers - - def clean_bounce_some_brokers(self): - """Bounce every other broker""" - for node in self.kafka.nodes[::2]: - self.kafka.restart_node(node, clean_shutdown=True) - - def reassign_partitions(self, bounce_brokers): - partition_info = self.kafka.parse_describe_topic(self.kafka.describe_topic(self.topic)) - self.logger.debug("Partitions before reassignment:" + str(partition_info)) - - # jumble partition assignment in dictionary - seed = random.randint(0, 2 ** 31 - 1) - self.logger.debug("Jumble partition assignment with seed " + str(seed)) - random.seed(seed) - # The list may still be in order, but that's ok - shuffled_list = range(0, self.num_partitions) - random.shuffle(shuffled_list) - - for i in range(0, self.num_partitions): - partition_info["partitions"][i]["partition"] = shuffled_list[i] - self.logger.debug("Jumbled partitions: " + str(partition_info)) - - # send reassign partitions command - self.kafka.execute_reassign_partitions(partition_info) - - if bounce_brokers: - # bounce a few brokers at the same time - self.clean_bounce_some_brokers() - - # Wait until finished or timeout - wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec, backoff_sec=.5) - - @parametrize(security_protocol="PLAINTEXT", bounce_brokers=True) - @parametrize(security_protocol="PLAINTEXT", bounce_brokers=False) - def test_reassign_partitions(self, bounce_brokers, security_protocol): - """Reassign partitions tests. - Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2 - - - Produce messages in the background - - Consume messages in the background - - Reassign partitions - - If bounce_brokers is True, also bounce a few brokers while partition re-assignment is in progress - - When done reassigning partitions and bouncing brokers, stop producing, and finish consuming - - Validate that every acked message was consumed - """ - - self.kafka.security_protocol = security_protocol - self.kafka.interbroker_security_protocol = security_protocol - new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int) - self.kafka.start() - - self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers)) http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core1/simple_consumer_shell_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core1/simple_consumer_shell_test.py b/tests/kafkatest/tests/core1/simple_consumer_shell_test.py deleted file mode 100644 index 74a7eeb..0000000 --- a/tests/kafkatest/tests/core1/simple_consumer_shell_test.py +++ /dev/null @@ -1,75 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ducktape.utils.util import wait_until -from ducktape.tests.test import Test -from kafkatest.services.simple_consumer_shell import SimpleConsumerShell -from kafkatest.services.verifiable_producer import VerifiableProducer - -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService -TOPIC = "topic-simple-consumer-shell" -MAX_MESSAGES = 100 -NUM_PARTITIONS = 1 -REPLICATION_FACTOR = 1 - -class SimpleConsumerShellTest(Test): - """ - Tests SimpleConsumerShell tool - """ - def __init__(self, test_context): - super(SimpleConsumerShellTest, self).__init__(test_context) - self.num_zk = 1 - self.num_brokers = 1 - self.messages_received_count = 0 - self.topics = { - TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR} - } - - self.zk = ZookeeperService(test_context, self.num_zk) - - def setUp(self): - self.zk.start() - - def start_kafka(self): - self.kafka = KafkaService( - self.test_context, self.num_brokers, - self.zk, topics=self.topics) - self.kafka.start() - - def run_producer(self): - # This will produce to kafka cluster - self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES) - self.producer.start() - wait_until(lambda: self.producer.num_acked == MAX_MESSAGES, timeout_sec=10, - err_msg="Timeout awaiting messages to be produced and acked") - - def start_simple_consumer_shell(self): - self.simple_consumer_shell = SimpleConsumerShell(self.test_context, 1, self.kafka, TOPIC) - self.simple_consumer_shell.start() - - def test_simple_consumer_shell(self): - """ - Tests if SimpleConsumerShell is fetching expected records - :return: None - """ - self.start_kafka() - self.run_producer() - self.start_simple_consumer_shell() - - # Assert that SimpleConsumerShell is fetching expected number of messages - wait_until(lambda: self.simple_consumer_shell.get_output().count("\n") == (MAX_MESSAGES + 1), timeout_sec=10, - err_msg="Timed out waiting to receive expected number of messages.") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core1/throttling_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core1/throttling_test.py b/tests/kafkatest/tests/core1/throttling_test.py deleted file mode 100644 index 2e21322..0000000 --- a/tests/kafkatest/tests/core1/throttling_test.py +++ /dev/null @@ -1,173 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time -import math -from ducktape.mark import parametrize -from ducktape.utils.util import wait_until - -from kafkatest.services.performance import ProducerPerformanceService -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.utils import is_int - - -class ThrottlingTest(ProduceConsumeValidateTest): - """Tests throttled partition reassignment. This is essentially similar - to the reassign_partitions_test, except that we throttle the reassignment - and verify that it takes a sensible amount of time given the throttle - and the amount of data being moved. - - Since the correctness is time dependent, this test also simplifies the - cluster topology. In particular, we fix the number of brokers, the - replication-factor, the number of partitions, the partition size, and - the number of partitions being moved so that we can accurately predict - the time throttled reassignment should take. - """ - - def __init__(self, test_context): - """:type test_context: ducktape.tests.test.TestContext""" - super(ThrottlingTest, self).__init__(test_context=test_context) - - self.topic = "test_topic" - self.zk = ZookeeperService(test_context, num_nodes=1) - # Because we are starting the producer/consumer/validate cycle _after_ - # seeding the cluster with big data (to test throttling), we need to - # Start the consumer from the end of the stream. further, we need to - # ensure that the consumer is fully started before the producer starts - # so that we don't miss any messages. This timeout ensures the sufficient - # condition. - self.consumer_init_timeout_sec = 10 - self.num_brokers = 6 - self.num_partitions = 3 - self.kafka = KafkaService(test_context, - num_nodes=self.num_brokers, - zk=self.zk, - topics={ - self.topic: { - "partitions": self.num_partitions, - "replication-factor": 2, - "configs": { - "segment.bytes": 64 * 1024 * 1024 - } - } - }) - self.producer_throughput = 1000 - self.timeout_sec = 400 - self.num_records = 2000 - self.record_size = 4096 * 100 # 400 KB - # 1 MB per partition on average. - self.partition_size = (self.num_records * self.record_size) / self.num_partitions - self.num_producers = 2 - self.num_consumers = 1 - self.throttle = 4 * 1024 * 1024 # 4 MB/s - - def setUp(self): - self.zk.start() - - def min_cluster_size(self): - # Override this since we're adding services outside of the constructor - return super(ThrottlingTest, self).min_cluster_size() +\ - self.num_producers + self.num_consumers - - def clean_bounce_some_brokers(self): - """Bounce every other broker""" - for node in self.kafka.nodes[::2]: - self.kafka.restart_node(node, clean_shutdown=True) - - def reassign_partitions(self, bounce_brokers, throttle): - """This method reassigns partitions using a throttle. It makes an - assertion about the minimum amount of time the reassignment should take - given the value of the throttle, the number of partitions being moved, - and the size of each partition. - """ - partition_info = self.kafka.parse_describe_topic( - self.kafka.describe_topic(self.topic)) - self.logger.debug("Partitions before reassignment:" + - str(partition_info)) - max_num_moves = 0 - for i in range(0, self.num_partitions): - old_replicas = set(partition_info["partitions"][i]["replicas"]) - new_part = (i+1) % self.num_partitions - new_replicas = set(partition_info["partitions"][new_part]["replicas"]) - max_num_moves = max(len(new_replicas - old_replicas), max_num_moves) - partition_info["partitions"][i]["partition"] = new_part - self.logger.debug("Jumbled partitions: " + str(partition_info)) - - self.kafka.execute_reassign_partitions(partition_info, - throttle=throttle) - start = time.time() - if bounce_brokers: - # bounce a few brokers at the same time - self.clean_bounce_some_brokers() - - # Wait until finished or timeout - size_per_broker = max_num_moves * self.partition_size - self.logger.debug("Max amount of data transfer per broker: %fb", - size_per_broker) - estimated_throttled_time = math.ceil(float(size_per_broker) / - self.throttle) - estimated_time_with_buffer = estimated_throttled_time * 2 - self.logger.debug("Waiting %ds for the reassignment to complete", - estimated_time_with_buffer) - wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), - timeout_sec=estimated_time_with_buffer, backoff_sec=.5) - stop = time.time() - time_taken = stop - start - self.logger.debug("Transfer took %d second. Estimated time : %ds", - time_taken, - estimated_throttled_time) - assert time_taken >= estimated_throttled_time, \ - ("Expected rebalance to take at least %ds, but it took %ds" % ( - estimated_throttled_time, - time_taken)) - - @parametrize(bounce_brokers=False) - @parametrize(bounce_brokers=True) - def test_throttled_reassignment(self, bounce_brokers): - security_protocol = 'PLAINTEXT' - self.kafka.security_protocol = security_protocol - self.kafka.interbroker_security_protocol = security_protocol - - producer_id = 'bulk_producer' - bulk_producer = ProducerPerformanceService( - context=self.test_context, num_nodes=1, kafka=self.kafka, - topic=self.topic, num_records=self.num_records, - record_size=self.record_size, throughput=-1, client_id=producer_id, - jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id], - jmx_attributes=['outgoing-byte-rate']) - - - self.producer = VerifiableProducer(context=self.test_context, - num_nodes=1, - kafka=self.kafka, topic=self.topic, - message_validator=is_int, - throughput=self.producer_throughput) - - self.consumer = ConsoleConsumer(self.test_context, - self.num_consumers, - self.kafka, - self.topic, - consumer_timeout_ms=60000, - message_validator=is_int, - from_beginning=False) - - self.kafka.start() - bulk_producer.run() - self.run_produce_consume_validate(core_test_action= - lambda: self.reassign_partitions(bounce_brokers, self.throttle)) http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core2/__init__.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core2/__init__.py b/tests/kafkatest/tests/core2/__init__.py deleted file mode 100644 index ec20143..0000000 --- a/tests/kafkatest/tests/core2/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core2/compatibility_test_new_broker_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core2/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core2/compatibility_test_new_broker_test.py deleted file mode 100644 index d6a0a12..0000000 --- a/tests/kafkatest/tests/core2/compatibility_test_new_broker_test.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright 2015 Confluent Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.mark import parametrize -from ducktape.utils.util import wait_until - -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka import KafkaService -from kafkatest.services.kafka import config_property -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest -from kafkatest.utils import is_int -from kafkatest.version import LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion - - -# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x) -class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest): - - def __init__(self, test_context): - super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context) - - def setUp(self): - self.topic = "test_topic" - self.zk = ZookeeperService(self.test_context, num_nodes=1) - - self.zk.start() - - # Producer and consumer - self.producer_throughput = 10000 - self.num_producers = 1 - self.num_consumers = 1 - self.messages_per_producer = 1000 - - @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None) - @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None) - @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=None) - @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None) - @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=None) - @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=str("LogAppendTime")) - @parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime")) - @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime")) - @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime")) - def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None): - - self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: { - "partitions": 3, - "replication-factor": 3, - 'configs': {"min.insync.replicas": 2}}}) - for node in self.kafka.nodes: - if timestamp_type is not None: - node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type - self.kafka.start() - - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, - self.topic, throughput=self.producer_throughput, - message_validator=is_int, - compression_types=compression_types, - version=KafkaVersion(producer_version)) - - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, - self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer, - message_validator=is_int, version=KafkaVersion(consumer_version)) - - self.run_produce_consume_validate(lambda: wait_until( - lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True, - timeout_sec=120, backoff_sec=1, - err_msg="Producer did not produce all messages in reasonable amount of time")) http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/mirror_maker/__init__.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/mirror_maker/__init__.py b/tests/kafkatest/tests/mirror_maker/__init__.py deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/mirror_maker/mirror_maker_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/mirror_maker/mirror_maker_test.py b/tests/kafkatest/tests/mirror_maker/mirror_maker_test.py deleted file mode 100644 index afb1972..0000000 --- a/tests/kafkatest/tests/mirror_maker/mirror_maker_test.py +++ /dev/null @@ -1,179 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.utils.util import wait_until -from ducktape.mark import parametrize, matrix, ignore - -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.mirror_maker import MirrorMaker -from kafkatest.services.security.minikdc import MiniKdc -from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest -from kafkatest.utils import is_int - -import time - - -class TestMirrorMakerService(ProduceConsumeValidateTest): - """Sanity checks on mirror maker service class.""" - def __init__(self, test_context): - super(TestMirrorMakerService, self).__init__(test_context) - - self.topic = "topic" - self.source_zk = ZookeeperService(test_context, num_nodes=1) - self.target_zk = ZookeeperService(test_context, num_nodes=1) - - self.source_kafka = KafkaService(test_context, num_nodes=1, zk=self.source_zk, - topics={self.topic: {"partitions": 1, "replication-factor": 1}}) - self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk, - topics={self.topic: {"partitions": 1, "replication-factor": 1}}) - # This will produce to source kafka cluster - self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic, - throughput=1000) - self.mirror_maker = MirrorMaker(test_context, num_nodes=1, source=self.source_kafka, target=self.target_kafka, - whitelist=self.topic, offset_commit_interval_ms=1000) - # This will consume from target kafka cluster - self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic, - message_validator=is_int, consumer_timeout_ms=60000) - - def setUp(self): - # Source cluster - self.source_zk.start() - - # Target cluster - self.target_zk.start() - - def start_kafka(self, security_protocol): - self.source_kafka.security_protocol = security_protocol - self.source_kafka.interbroker_security_protocol = security_protocol - self.target_kafka.security_protocol = security_protocol - self.target_kafka.interbroker_security_protocol = security_protocol - if self.source_kafka.security_config.has_sasl_kerberos: - minikdc = MiniKdc(self.source_kafka.context, self.source_kafka.nodes + self.target_kafka.nodes) - self.source_kafka.minikdc = minikdc - self.target_kafka.minikdc = minikdc - minikdc.start() - self.source_kafka.start() - self.target_kafka.start() - - def bounce(self, clean_shutdown=True): - """Bounce mirror maker with a clean (kill -15) or hard (kill -9) shutdown""" - - # Wait until messages start appearing in the target cluster - wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=15) - - # Wait for at least one offset to be committed. - # - # This step is necessary to prevent data loss with default mirror maker settings: - # currently, if we don't have at least one committed offset, - # and we bounce mirror maker, the consumer internals will throw OffsetOutOfRangeException, and the default - # auto.offset.reset policy ("largest") will kick in, causing mirrormaker to start consuming from the largest - # offset. As a result, any messages produced to the source cluster while mirrormaker was dead won't get - # mirrored to the target cluster. - # (see https://issues.apache.org/jira/browse/KAFKA-2759) - # - # This isn't necessary with kill -15 because mirror maker commits its offsets during graceful - # shutdown. - if not clean_shutdown: - time.sleep(self.mirror_maker.offset_commit_interval_ms / 1000.0 + .5) - - for i in range(3): - self.logger.info("Bringing mirror maker nodes down...") - for node in self.mirror_maker.nodes: - self.mirror_maker.stop_node(node, clean_shutdown=clean_shutdown) - - num_consumed = len(self.consumer.messages_consumed[1]) - self.logger.info("Bringing mirror maker nodes back up...") - for node in self.mirror_maker.nodes: - self.mirror_maker.start_node(node) - - # Ensure new messages are once again showing up on the target cluster - # new consumer requires higher timeout here - wait_until(lambda: len(self.consumer.messages_consumed[1]) > num_consumed + 100, timeout_sec=60) - - def wait_for_n_messages(self, n_messages=100): - """Wait for a minimum number of messages to be successfully produced.""" - wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10, - err_msg="Producer failed to produce %d messages in a reasonable amount of time." % n_messages) - - @parametrize(security_protocol='PLAINTEXT', new_consumer=False) - @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True]) - def test_simple_end_to_end(self, security_protocol, new_consumer): - """ - Test end-to-end behavior under non-failure conditions. - - Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster. - One is source, and the other is target. Single-node mirror maker mirrors from source to target. - - - Start mirror maker. - - Produce a small number of messages to the source cluster. - - Consume messages from target. - - Verify that number of consumed messages matches the number produced. - """ - self.start_kafka(security_protocol) - self.consumer.new_consumer = new_consumer - - self.mirror_maker.new_consumer = new_consumer - self.mirror_maker.start() - - mm_node = self.mirror_maker.nodes[0] - with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor: - if new_consumer: - monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.") - else: - monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.") - - self.run_produce_consume_validate(core_test_action=self.wait_for_n_messages) - self.mirror_maker.stop() - - @matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False]) - @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL']) - def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True, security_protocol='PLAINTEXT'): - """ - Test end-to-end behavior under failure conditions. - - Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster. - One is source, and the other is target. Single-node mirror maker mirrors from source to target. - - - Start mirror maker. - - Produce to source cluster, and consume from target cluster in the background. - - Bounce MM process - - Verify every message acknowledged by the source producer is consumed by the target consumer - """ - if new_consumer and not clean_shutdown: - # Increase timeout on downstream console consumer; mirror maker with new consumer takes extra time - # during hard bounce. This is because the restarted mirror maker consumer won't be able to rejoin - # the group until the previous session times out - self.consumer.consumer_timeout_ms = 60000 - - self.start_kafka(security_protocol) - self.consumer.new_consumer = new_consumer - - self.mirror_maker.offsets_storage = offsets_storage - self.mirror_maker.new_consumer = new_consumer - self.mirror_maker.start() - - # Wait until mirror maker has reset fetch offset at least once before continuing with the rest of the test - mm_node = self.mirror_maker.nodes[0] - with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor: - if new_consumer: - monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.") - else: - monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.") - - self.run_produce_consume_validate(core_test_action=lambda: self.bounce(clean_shutdown=clean_shutdown)) - self.mirror_maker.stop() http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/produce_consume_validate.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py index 3b54ad7..801ccde 100644 --- a/tests/kafkatest/tests/produce_consume_validate.py +++ b/tests/kafkatest/tests/produce_consume_validate.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import traceback from ducktape.tests.test import Test from ducktape.utils.util import wait_until @@ -103,7 +102,7 @@ class ProduceConsumeValidateTest(Test): except BaseException as e: for s in self.test_context.services: self.mark_for_collect(s) - raise Exception(traceback.format_exc(e)) + raise @staticmethod def annotate_missing_msgs(missing, acked, consumed, msg): http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/replication/__init__.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/replication/__init__.py b/tests/kafkatest/tests/replication/__init__.py deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/replication/replication_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/replication/replication_test.py b/tests/kafkatest/tests/replication/replication_test.py deleted file mode 100644 index f815034..0000000 --- a/tests/kafkatest/tests/replication/replication_test.py +++ /dev/null @@ -1,154 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.utils.util import wait_until - -from ducktape.mark import matrix - -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest -from kafkatest.utils import is_int - -import signal - -def broker_node(test, broker_type): - """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0 - """ - if broker_type == "leader": - node = test.kafka.leader(test.topic, partition=0) - elif broker_type == "controller": - node = test.kafka.controller() - else: - raise Exception("Unexpected broker type %s." % (broker_type)) - - return node - -def clean_shutdown(test, broker_type): - """Discover broker node of requested type and shut it down cleanly. - """ - node = broker_node(test, broker_type) - test.kafka.signal_node(node, sig=signal.SIGTERM) - - -def hard_shutdown(test, broker_type): - """Discover broker node of requested type and shut it down with a hard kill.""" - node = broker_node(test, broker_type) - test.kafka.signal_node(node, sig=signal.SIGKILL) - - -def clean_bounce(test, broker_type): - """Chase the leader of one partition and restart it cleanly.""" - for i in range(5): - prev_broker_node = broker_node(test, broker_type) - test.kafka.restart_node(prev_broker_node, clean_shutdown=True) - - -def hard_bounce(test, broker_type): - """Chase the leader and restart it with a hard kill.""" - for i in range(5): - prev_broker_node = broker_node(test, broker_type) - test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL) - - # Since this is a hard kill, we need to make sure the process is down and that - # zookeeper has registered the loss by expiring the broker's session timeout. - - wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node), - timeout_sec=test.kafka.zk_session_timeout + 5, - err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account)) - - test.kafka.start_node(prev_broker_node) - -failures = { - "clean_shutdown": clean_shutdown, - "hard_shutdown": hard_shutdown, - "clean_bounce": clean_bounce, - "hard_bounce": hard_bounce -} - - -class ReplicationTest(ProduceConsumeValidateTest): - """ - Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages - (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop - too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose - ordering guarantees. - - Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked, - we might exit early if some messages are duplicated (though not an issue here since producer retries==0) - - Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively - consumed messages. Since we run the producer to completion before running the consumer, this is a reliable - indicator that nothing is left to consume. - """ - - def __init__(self, test_context): - """:type test_context: ducktape.tests.test.TestContext""" - super(ReplicationTest, self).__init__(test_context=test_context) - - self.topic = "test_topic" - self.zk = ZookeeperService(test_context, num_nodes=1) - self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: { - "partitions": 3, - "replication-factor": 3, - 'configs': {"min.insync.replicas": 2}} - }) - self.producer_throughput = 1000 - self.num_producers = 1 - self.num_consumers = 1 - - def setUp(self): - self.zk.start() - - def min_cluster_size(self): - """Override this since we're adding services outside of the constructor""" - return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers - - - @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], - broker_type=["leader"], - security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]) - @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], - broker_type=["controller"], - security_protocol=["PLAINTEXT", "SASL_SSL"]) - @matrix(failure_mode=["hard_bounce"], - broker_type=["leader"], - security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"]) - def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI"): - """Replication tests. - These tests verify that replication provides simple durability guarantees by checking that data acked by - brokers is still available for consumption in the face of various failure scenarios. - - Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2 - - - Produce messages in the background - - Consume messages in the background - - Drive broker failures (shutdown, or bounce repeatedly with kill -15 or kill -9) - - When done driving failures, stop producing, and finish consuming - - Validate that every acked message was consumed - """ - - self.kafka.security_protocol = security_protocol - self.kafka.interbroker_security_protocol = security_protocol - self.kafka.client_sasl_mechanism = client_sasl_mechanism - self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism - new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int) - self.kafka.start() - - self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type)) http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/security1/__init__.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/security1/__init__.py b/tests/kafkatest/tests/security1/__init__.py deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/security1/security_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/security1/security_test.py b/tests/kafkatest/tests/security1/security_test.py deleted file mode 100644 index b6bc656..0000000 --- a/tests/kafkatest/tests/security1/security_test.py +++ /dev/null @@ -1,106 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.mark import parametrize - -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.security.security_config import SecurityConfig -from kafkatest.services.security.security_config import SslStores -from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest -from kafkatest.utils import is_int -import time - -class TestSslStores(SslStores): - def __init__(self): - super(TestSslStores, self).__init__() - self.invalid_hostname = False - self.generate_ca() - self.generate_truststore() - - def hostname(self, node): - if (self.invalid_hostname): - return "invalidhost" - else: - return super(TestSslStores, self).hostname(node) - -class SecurityTest(ProduceConsumeValidateTest): - """ - These tests validate security features. - """ - - def __init__(self, test_context): - """:type test_context: ducktape.tests.test.TestContext""" - super(SecurityTest, self).__init__(test_context=test_context) - - self.topic = "test_topic" - self.zk = ZookeeperService(test_context, num_nodes=1) - self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: { - "partitions": 2, - "replication-factor": 1} - }) - self.num_partitions = 2 - self.timeout_sec = 10000 - self.producer_throughput = 1000 - self.num_producers = 1 - self.num_consumers = 1 - - def setUp(self): - self.zk.start() - - @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL') - @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') - def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol): - """ - Test that invalid hostname in certificate results in connection failures. - When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure. - When security_protocol=PLAINTEXT and interbroker_security_protocol=SSL, controller connections fail - with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE. - """ - - self.kafka.security_protocol = security_protocol - self.kafka.interbroker_security_protocol = interbroker_security_protocol - SecurityConfig.ssl_stores = TestSslStores() - - SecurityConfig.ssl_stores.invalid_hostname = True - self.kafka.start() - self.create_producer_and_consumer() - self.producer.log_level = "TRACE" - self.producer.start() - self.consumer.start() - time.sleep(10) - assert self.producer.num_acked == 0, "Messages published successfully, endpoint validation did not fail with invalid hostname" - error = 'SSLHandshakeException' if security_protocol is 'SSL' else 'LEADER_NOT_AVAILABLE' - for node in self.producer.nodes: - node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE)) - for node in self.consumer.nodes: - node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE)) - - self.producer.stop() - self.consumer.stop() - self.producer.log_level = "INFO" - - SecurityConfig.ssl_stores.invalid_hostname = False - for node in self.kafka.nodes: - self.kafka.restart_node(node, clean_shutdown=True) - self.create_producer_and_consumer() - self.run_produce_consume_validate() - - def create_producer_and_consumer(self): - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=10000, message_validator=is_int) - http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/security1/zookeeper_security_upgrade_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/security1/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/security1/zookeeper_security_upgrade_test.py deleted file mode 100644 index 0cfdf16..0000000 --- a/tests/kafkatest/tests/security1/zookeeper_security_upgrade_test.py +++ /dev/null @@ -1,115 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.mark import matrix - -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest -from kafkatest.services.security.kafka_acls import ACLs -from kafkatest.utils import is_int - -class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest): - """Tests a rolling upgrade for zookeeper. - """ - - def __init__(self, test_context): - super(ZooKeeperSecurityUpgradeTest, self).__init__(test_context=test_context) - - def setUp(self): - self.topic = "test_topic" - self.group = "group" - self.producer_throughput = 100 - self.num_producers = 1 - self.num_consumers = 1 - self.acls = ACLs(self.test_context) - - self.zk = ZookeeperService(self.test_context, num_nodes=3) - - self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: { - "partitions": 3, - "replication-factor": 3, - 'configs': {"min.insync.replicas": 2}}}) - - def create_producer_and_consumer(self): - self.producer = VerifiableProducer( - self.test_context, self.num_producers, self.kafka, self.topic, - throughput=self.producer_throughput) - - self.consumer = ConsoleConsumer( - self.test_context, self.num_consumers, self.kafka, self.topic, - consumer_timeout_ms=60000, message_validator=is_int) - - self.consumer.group_id = self.group - - @property - def no_sasl(self): - return self.kafka.security_protocol == "PLAINTEXT" or self.kafka.security_protocol == "SSL" - - @property - def is_secure(self): - return self.kafka.security_protocol == "SASL_PLAINTEXT" \ - or self.kafka.security_protocol == "SSL" \ - or self.kafka.security_protocol == "SASL_SSL" - - def run_zk_migration(self): - # change zk config (auth provider + jaas login) - self.zk.kafka_opts = self.zk.security_system_properties - self.zk.zk_sasl = True - if self.no_sasl: - self.kafka.start_minikdc(self.zk.zk_principals) - # restart zk - for node in self.zk.nodes: - self.zk.stop_node(node) - self.zk.start_node(node) - - # restart broker with jaas login - for node in self.kafka.nodes: - self.kafka.stop_node(node) - self.kafka.start_node(node) - - # run migration tool - for node in self.zk.nodes: - self.zk.zookeeper_migration(node, "secure") - - # restart broker with zookeeper.set.acl=true and acls - self.kafka.zk_set_acl = True - for node in self.kafka.nodes: - self.kafka.stop_node(node) - self.kafka.start_node(node) - - @matrix(security_protocol=["PLAINTEXT","SSL","SASL_SSL","SASL_PLAINTEXT"]) - def test_zk_security_upgrade(self, security_protocol): - self.zk.start() - self.kafka.security_protocol = security_protocol - self.kafka.interbroker_security_protocol = security_protocol - - # set acls - if self.is_secure: - self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER - self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group) - - if(self.no_sasl): - self.kafka.start() - else: - self.kafka.start(self.zk.zk_principals) - - #Create Producer and Consumer - self.create_producer_and_consumer() - - #Run upgrade - self.run_produce_consume_validate(self.run_zk_migration) http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/security2/__init__.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/security2/__init__.py b/tests/kafkatest/tests/security2/__init__.py deleted file mode 100644 index e69de29..0000000