Repository: kafka Updated Branches: refs/heads/trunk 9ef2b3ce8 -> 949577ca7
KAFKA-5768; Upgrade to ducktape 0.7.1 Author: Colin P. Mccabe <cmcc...@confluent.io> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #3721 from cmccabe/KAFKA-5768 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/949577ca Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/949577ca Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/949577ca Branch: refs/heads/trunk Commit: 949577ca77d5c143774d4879b19505b4d6284de3 Parents: 9ef2b3c Author: Colin P. Mccabe <cmcc...@confluent.io> Authored: Tue Aug 29 16:41:19 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Aug 29 16:41:19 2017 -0700 ---------------------------------------------------------------------- tests/docker/Dockerfile | 2 +- tests/kafkatest/services/kafka/kafka.py | 9 ++++--- .../kafkatest/services/kafka_log4j_appender.py | 11 +++++--- tests/kafkatest/services/mirror_maker.py | 19 +++++++------- tests/kafkatest/services/monitor/jmx.py | 9 +++++-- .../services/performance/end_to_end_latency.py | 11 +++++--- .../services/performance/performance.py | 18 ++++++++++--- .../services/replica_verification_tool.py | 11 +++++--- .../kafkatest/services/simple_consumer_shell.py | 9 ++++--- tests/kafkatest/services/zookeeper.py | 27 ++++++++++++-------- tests/setup.py | 2 +- 11 files changed, 86 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/949577ca/tests/docker/Dockerfile ---------------------------------------------------------------------- diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index c0980cb..895f4fe 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -33,7 +33,7 @@ LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev && apt-get -y clean -RUN pip install -U pip && pip install --upgrade cffi virtualenv pyasn1 Sphinx sphinx-argparse sphinx-rtd-theme boto3 pycrypto pywinrm && pip install --upgrade ducktape==0.6.0 +RUN pip install -U pip setuptools && pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm && pip install --upgrade ducktape==0.7.1 # Set up ssh COPY ./ssh-config /root/.ssh/config http://git-wip-us.apache.org/repos/asf/kafka/blob/949577ca/tests/kafkatest/services/kafka/kafka.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 8c21d68..10398c6 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -36,7 +36,6 @@ Port = collections.namedtuple('Port', ['name', 'number', 'open']) class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): - PERSISTENT_ROOT = "/mnt/kafka" STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "server-start-stdout-stderr.log") LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "kafka-log4j.properties") @@ -244,7 +243,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def pids(self, node): """Return process ids associated with running processes on the given node.""" try: - cmd = "jcmd | grep -e kafka.Kafka | awk '{print $1}'" + cmd = "jcmd | grep -e %s | awk '{print $1}'" % self.java_class_name() pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] return pid_arr except (RemoteCommandError, ValueError) as e: @@ -270,7 +269,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def clean_node(self, node): JmxMixin.clean_node(self, node) self.security_config.clean_node(node) - node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True) + node.account.kill_java_processes(self.java_class_name(), + clean_shutdown=False, allow_fail=True) node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, allow_fail=False) def create_topic(self, topic_cfg, node=None): @@ -656,3 +656,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): output += line self.logger.debug(output) return output + + def java_class_name(self): + return "kafka.Kafka" http://git-wip-us.apache.org/repos/asf/kafka/blob/949577ca/tests/kafkatest/services/kafka_log4j_appender.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py index 29a4202..76419c6 100644 --- a/tests/kafkatest/services/kafka_log4j_appender.py +++ b/tests/kafkatest/services/kafka_log4j_appender.py @@ -45,7 +45,8 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService): def start_cmd(self, node): cmd = self.path.script("kafka-run-class.sh", node) - cmd += " org.apache.kafka.tools.VerifiableLog4jAppender" + cmd += " " + cmd += self.java_class_name() cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_protocol)) if self.max_messages > 0: @@ -67,12 +68,16 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService): return cmd def stop_node(self, node): - node.account.kill_process("VerifiableLog4jAppender", allow_fail=False) + node.account.kill_java_processes(self.java_class_name(), allow_fail=False) stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec) assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \ (str(node.account), str(self.stop_timeout_sec)) def clean_node(self, node): - node.account.kill_process("VerifiableLog4jAppender", clean_shutdown=False, allow_fail=False) + node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False, + allow_fail=False) node.account.ssh("rm -rf /mnt/kafka_log4j_appender.log", allow_fail=False) + + def java_class_name(self): + return "org.apache.kafka.tools.VerifiableLog4jAppender" http://git-wip-us.apache.org/repos/asf/kafka/blob/949577ca/tests/kafkatest/services/mirror_maker.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py index 55b75d0..847fa35 100644 --- a/tests/kafkatest/services/mirror_maker.py +++ b/tests/kafkatest/services/mirror_maker.py @@ -123,7 +123,8 @@ class MirrorMaker(KafkaPathResolverMixin, Service): if self.external_jars is not None: cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % self.external_jars cmd += "export CLASSPATH; " - cmd += " %s kafka.tools.MirrorMaker" % self.path.script("kafka-run-class.sh", node) + cmd += " %s %s" % (self.path.script("kafka-run-class.sh", node), + self.java_class_name()) cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG cmd += " --offset.commit.interval.ms %s" % str(self.offset_commit_interval_ms) @@ -141,12 +142,7 @@ class MirrorMaker(KafkaPathResolverMixin, Service): return cmd def pids(self, node): - try: - cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'" - pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] - return pid_arr - except (RemoteCommandError, ValueError): - return [] + return node.account.java_pids(self.java_class_name()) def alive(self, node): return len(self.pids(node)) > 0 @@ -185,7 +181,8 @@ class MirrorMaker(KafkaPathResolverMixin, Service): self.logger.debug("Mirror maker is alive") def stop_node(self, node, clean_shutdown=True): - node.account.kill_process("java", allow_fail=True, clean_shutdown=clean_shutdown) + node.account.kill_java_processes(self.java_class_name(), allow_fail=True, + clean_shutdown=clean_shutdown) wait_until(lambda: not self.alive(node), timeout_sec=30, backoff_sec=.5, err_msg="Mirror maker took to long to stop.") @@ -193,6 +190,10 @@ class MirrorMaker(KafkaPathResolverMixin, Service): if self.alive(node): self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % (self.__class__.__name__, node.account)) - node.account.kill_process("java", clean_shutdown=False, allow_fail=True) + node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False, + allow_fail=True) node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False) self.security_config.clean_node(node) + + def java_class_name(self): + return "kafka.tools.MirrorMaker" http://git-wip-us.apache.org/repos/asf/kafka/blob/949577ca/tests/kafkatest/services/monitor/jmx.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index 2872201..36df21b 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -41,7 +41,8 @@ class JmxMixin(object): self.jmx_tool_err_log = os.path.join(root, "jmx_tool.err.log") def clean_node(self, node): - node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True) + node.account.kill_java_processes(self.jmx_class_name(), clean_shutdown=False, + allow_fail=True) node.account.ssh("rm -f -- %s %s" % (self.jmx_tool_log, self.jmx_tool_err_log), allow_fail=False) def start_jmx_tool(self, idx, node): @@ -67,7 +68,8 @@ class JmxMixin(object): use_jmxtool_version = get_version(node) if use_jmxtool_version <= V_0_11_0_0: use_jmxtool_version = DEV_BRANCH - cmd = "%s kafka.tools.JmxTool " % self.path.script("kafka-run-class.sh", use_jmxtool_version) + cmd = "%s %s " % (self.path.script("kafka-run-class.sh", use_jmxtool_version), + self.jmx_class_name()) cmd += "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port cmd += " --wait" for jmx_object_name in self.jmx_object_names: @@ -131,3 +133,6 @@ class JmxMixin(object): def read_jmx_output_all_nodes(self): for node in self.nodes: self.read_jmx_output(self.idx(node), node) + + def jmx_class_name(self): + return "kafka.tools.JmxTool" http://git-wip-us.apache.org/repos/asf/kafka/blob/949577ca/tests/kafkatest/services/performance/end_to_end_latency.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py index d0385f4..2c7f69a 100644 --- a/tests/kafkatest/services/performance/end_to_end_latency.py +++ b/tests/kafkatest/services/performance/end_to_end_latency.py @@ -46,7 +46,8 @@ class EndToEndLatencyService(PerformanceService): } def __init__(self, context, num_nodes, kafka, topic, num_records, compression_type="none", version=DEV_BRANCH, acks=1): - super(EndToEndLatencyService, self).__init__(context, num_nodes) + super(EndToEndLatencyService, self).__init__(context, num_nodes, + root=EndToEndLatencyService.PERSISTENT_ROOT) self.kafka = kafka self.security_config = kafka.security_config.client_config() @@ -76,12 +77,13 @@ class EndToEndLatencyService(PerformanceService): 'zk_connect': self.kafka.zk_connect_setting(), 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), 'config_file': EndToEndLatencyService.CONFIG_FILE, - 'kafka_run_class': self.path.script("kafka-run-class.sh", node) + 'kafka_run_class': self.path.script("kafka-run-class.sh", node), + 'java_class_name': self.java_class_name() }) cmd = "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % EndToEndLatencyService.LOG4J_CONFIG if node.version >= V_0_9_0_0: - cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s kafka.tools.EndToEndLatency " % args + cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s %(java_class_name)s " % args cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args else: # Set fetch max wait to 0 to match behavior in later versions @@ -117,3 +119,6 @@ class EndToEndLatencyService(PerformanceService): results['latency_99th_ms'] = float(line.split()[6][:-1]) results['latency_999th_ms'] = float(line.split()[9]) self.results[idx-1] = results + + def java_class_name(self): + return "kafka.tools.EndToEndLatency" http://git-wip-us.apache.org/repos/asf/kafka/blob/949577ca/tests/kafkatest/services/performance/performance.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/performance.py b/tests/kafkatest/services/performance/performance.py index d6d4f14..ec2b63e 100644 --- a/tests/kafkatest/services/performance/performance.py +++ b/tests/kafkatest/services/performance/performance.py @@ -19,22 +19,32 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin class PerformanceService(KafkaPathResolverMixin, BackgroundThreadService): - def __init__(self, context, num_nodes, stop_timeout_sec=30): + def __init__(self, context, num_nodes, root="/mnt/*", stop_timeout_sec=30): super(PerformanceService, self).__init__(context, num_nodes) self.results = [None] * self.num_nodes self.stats = [[] for x in range(self.num_nodes)] self.stop_timeout_sec = stop_timeout_sec + self.root = root + + def java_class_name(self): + """ + Returns the name of the Java class which this service creates. Subclasses should override + this method, so that we know the name of the java process to stop. If it is not + overridden, we will kill all java processes in PerformanceService#stop_node (for backwards + compatibility.) + """ + return "" def stop_node(self, node): - node.account.kill_process("java", clean_shutdown=True, allow_fail=True) + node.account.kill_java_processes(self.java_class_name(), clean_shutdown=True, allow_fail=True) stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec) assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \ (str(node.account), str(self.stop_timeout_sec)) def clean_node(self, node): - node.account.kill_process("java", clean_shutdown=False, allow_fail=True) - node.account.ssh("rm -rf /mnt/*", allow_fail=False) + node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False, allow_fail=True) + node.account.ssh("rm -rf -- %s" % self.root, allow_fail=False) def throughput(records_per_sec, mb_per_sec): http://git-wip-us.apache.org/repos/asf/kafka/blob/949577ca/tests/kafkatest/services/replica_verification_tool.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py index a2753fd..c65be34 100644 --- a/tests/kafkatest/services/replica_verification_tool.py +++ b/tests/kafkatest/services/replica_verification_tool.py @@ -70,19 +70,24 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService): def start_cmd(self, node): cmd = self.path.script("kafka-run-class.sh", node) - cmd += " kafka.tools.ReplicaVerificationTool" + cmd += " %s" % self.java_class_name() cmd += " --broker-list %s --topic-white-list %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms) cmd += " 2>> /mnt/replica_verification_tool.log | tee -a /mnt/replica_verification_tool.log &" return cmd def stop_node(self, node): - node.account.kill_process("java", clean_shutdown=True, allow_fail=True) + node.account.kill_java_processes(self.java_class_name(), clean_shutdown=True, + allow_fail=True) stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec) assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \ (str(node.account), str(self.stop_timeout_sec)) def clean_node(self, node): - node.account.kill_process("java", clean_shutdown=False, allow_fail=True) + node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False, + allow_fail=True) node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False) + + def java_class_name(self): + return "kafka.tools.ReplicaVerificationTool" http://git-wip-us.apache.org/repos/asf/kafka/blob/949577ca/tests/kafkatest/services/simple_consumer_shell.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/simple_consumer_shell.py b/tests/kafkatest/services/simple_consumer_shell.py index 7204748..76820f0 100644 --- a/tests/kafkatest/services/simple_consumer_shell.py +++ b/tests/kafkatest/services/simple_consumer_shell.py @@ -46,7 +46,7 @@ class SimpleConsumerShell(KafkaPathResolverMixin, BackgroundThreadService): def start_cmd(self, node): cmd = self.path.script("kafka-run-class.sh", node) - cmd += " kafka.tools.SimpleConsumerShell" + cmd += " %s" % self.java_class_name() cmd += " --topic %s --broker-list %s --partition %s --no-wait-at-logend" % (self.topic, self.kafka.bootstrap_servers(), self.partition) cmd += " 2>> /mnt/get_simple_consumer_shell.log | tee -a /mnt/get_simple_consumer_shell.log &" @@ -56,12 +56,15 @@ class SimpleConsumerShell(KafkaPathResolverMixin, BackgroundThreadService): return self.output def stop_node(self, node): - node.account.kill_process("SimpleConsumerShell", allow_fail=False) + node.account.kill_java_processes(self.java_class_name(), allow_fail=False) stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec) assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \ (str(node.account), str(self.stop_timeout_sec)) def clean_node(self, node): - node.account.kill_process("SimpleConsumerShell", clean_shutdown=False, allow_fail=False) + node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False, allow_fail=False) node.account.ssh("rm -rf /mnt/simple_consumer_shell.log", allow_fail=False) + + def java_class_name(self): + return "kafka.tools.SimpleConsumerShell" http://git-wip-us.apache.org/repos/asf/kafka/blob/949577ca/tests/kafkatest/services/zookeeper.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py index 1775554..b469c0c 100644 --- a/tests/kafkatest/services/zookeeper.py +++ b/tests/kafkatest/services/zookeeper.py @@ -94,12 +94,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service): return False def pids(self, node): - try: - cmd = "ps ax | grep -i zookeeper | grep java | grep -v grep | awk '{print $1}'" - pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] - return pid_arr - except (RemoteCommandError, ValueError) as e: - return [] + return node.account.java_pids(self.java_class_name()) def alive(self, node): return len(self.pids(node)) > 0 @@ -107,7 +102,8 @@ class ZookeeperService(KafkaPathResolverMixin, Service): def stop_node(self, node): idx = self.idx(node) self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname)) - node.account.kill_process("zookeeper", allow_fail=False) + node.account.kill_java_processes(self.java_class_name(), allow_fail=False) + node.account.kill_java_processes(self.java_query_class_name(), allow_fail=False) wait_until(lambda: not self.alive(node), timeout_sec=5, err_msg="Timed out waiting for zookeeper to stop.") def clean_node(self, node): @@ -115,7 +111,10 @@ class ZookeeperService(KafkaPathResolverMixin, Service): if self.alive(node): self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % (self.__class__.__name__, node.account)) - node.account.kill_process("zookeeper", clean_shutdown=False, allow_fail=True) + node.account.kill_java_processes(self.java_class_name(), + clean_shutdown=False, allow_fail=True) + node.account.kill_java_processes(self.java_query_class_name(), + clean_shutdown=False, allow_fail=False) node.account.ssh("rm -rf -- %s" % ZookeeperService.ROOT, allow_fail=False) @@ -145,8 +144,8 @@ class ZookeeperService(KafkaPathResolverMixin, Service): chroot_path = ('' if chroot is None else chroot) + path kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH) - cmd = "%s kafka.tools.ZooKeeperMainWrapper -server %s get %s" % \ - (kafka_run_class, self.connect_setting(), chroot_path) + cmd = "%s %s -server %s get %s" % \ + (kafka_run_class, self.java_query_class_name(), self.connect_setting(), chroot_path) self.logger.debug(cmd) node = self.nodes[0] @@ -158,3 +157,11 @@ class ZookeeperService(KafkaPathResolverMixin, Service): if match is not None: result = match.groups()[0] return result + + def java_class_name(self): + """ The class name of the Zookeeper quorum peers. """ + return "org.apache.zookeeper.server.quorum.QuorumPeerMain" + + def java_query_class_name(self): + """ The class name of the Zookeeper tool within Kafka. """ + return "kafka.tools.ZooKeeperMainWrapper" http://git-wip-us.apache.org/repos/asf/kafka/blob/949577ca/tests/setup.py ---------------------------------------------------------------------- diff --git a/tests/setup.py b/tests/setup.py index 631beac..24ee4eb 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -51,7 +51,7 @@ setup(name="kafkatest", license="apache2.0", packages=find_packages(), include_package_data=True, - install_requires=["ducktape==0.6.0", "requests>=2.5.0"], + install_requires=["ducktape==0.7.1", "requests>=2.5.0"], tests_require=["pytest", "mock"], cmdclass={'test': PyTest}, )