Repository: kafka Updated Branches: refs/heads/trunk bf3bfd674 -> 57dbe39ae
KAFKA-5743; Ducktape services should use subdirs of /mnt Author: Colin P. Mccabe <cmcc...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #3680 from cmccabe/KAFKA-5743 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57dbe39a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57dbe39a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57dbe39a Branch: refs/heads/trunk Commit: 57dbe39ae1e8312c5e0e6603061d2816a8c05198 Parents: bf3bfd6 Author: Colin P. Mccabe <cmcc...@confluent.io> Authored: Mon Aug 21 18:36:45 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Mon Aug 21 18:36:45 2017 +0100 ---------------------------------------------------------------------- tests/kafkatest/services/console_consumer.py | 7 ++++--- tests/kafkatest/services/kafka/config.py | 2 +- tests/kafkatest/services/kafka/kafka.py | 17 +++++++++-------- tests/kafkatest/services/monitor/jmx.py | 16 +++++++++------- .../services/performance/producer_performance.py | 3 ++- .../services/templates/zookeeper.properties | 2 +- tests/kafkatest/services/zookeeper.py | 17 ++++++++++------- 7 files changed, 36 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/console_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 312131e..556f43a 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -73,8 +73,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) LOG_FILE = os.path.join(LOG_DIR, "console_consumer.log") LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "console_consumer.properties") - JMX_TOOL_LOG = "/mnt/jmx_tool.log" - JMX_TOOL_ERROR_LOG = "/mnt/jmx_tool.err.log" + JMX_TOOL_LOG = os.path.join(PERSISTENT_ROOT, "jmx_tool.log") + JMX_TOOL_ERROR_LOG = os.path.join(PERSISTENT_ROOT, "jmx_tool.err.log") logs = { "consumer_stdout": { @@ -120,7 +120,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) print_timestamp if True, print each message's timestamp as well isolation_level How to handle transactional messages. """ - JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or []) + JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [], + root=ConsoleConsumer.PERSISTENT_ROOT) BackgroundThreadService.__init__(self, context, num_nodes) self.kafka = kafka self.new_consumer = new_consumer http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/kafka/config.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/config.py b/tests/kafkatest/services/kafka/config.py index 462277f..f2d6946 100644 --- a/tests/kafkatest/services/kafka/config.py +++ b/tests/kafkatest/services/kafka/config.py @@ -24,7 +24,7 @@ class KafkaConfig(dict): DEFAULTS = { config_property.PORT: 9092, config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536, - config_property.LOG_DIRS: "/mnt/kafka-data-logs-1,/mnt/kafka-data-logs-2", + config_property.LOG_DIRS: "/mnt/kafka/kafka-data-logs-1,/mnt/kafka/kafka-data-logs-2", config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: 2000 } http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/kafka/kafka.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index febfe55..743f63a 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -37,7 +37,7 @@ Port = collections.namedtuple('Port', ['name', 'number', 'open']) class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): - PERSISTENT_ROOT = "/mnt" + PERSISTENT_ROOT = "/mnt/kafka" STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "server-start-stdout-stderr.log") LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "kafka-log4j.properties") # Logs such as controller.log, server.log, etc all go here @@ -80,7 +80,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): :type topics: dict """ Service.__init__(self, context, num_nodes) - JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or []) + JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [], + root=KafkaService.PERSISTENT_ROOT) self.zk = zk @@ -94,7 +95,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.zk_set_acl = False self.server_prop_overides = server_prop_overides self.log_level = "DEBUG" - self.num_nodes = num_nodes self.zk_chroot = zk_chroot # @@ -201,7 +201,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): # TODO - clean up duplicate configuration logic prop_file = cfg.render() prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node), - security_config=self.security_config) + security_config=self.security_config, num_nodes=self.num_nodes) return prop_file def start_cmd(self, node): @@ -216,6 +216,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): return cmd def start_node(self, node): + node.account.mkdirs(KafkaService.PERSISTENT_ROOT) prop_file = self.prop_file(node) self.logger.info("kafka.properties:") self.logger.info(prop_file) @@ -243,8 +244,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def pids(self, node): """Return process ids associated with running processes on the given node.""" try: - cmd = "ps ax | grep -i kafka | grep java | grep -v grep | awk '{print $1}'" - + cmd = "jcmd | grep -e kafka.Kafka | awk '{print $1}'" pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] return pid_arr except (RemoteCommandError, ValueError) as e: @@ -271,7 +271,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): JmxMixin.clean_node(self, node) self.security_config.clean_node(node) node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True) - node.account.ssh("sudo rm -rf /mnt/*", allow_fail=False) + node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, allow_fail=False) def create_topic(self, topic_cfg, node=None): """Run the admin tool create topic command. @@ -648,7 +648,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): if partitions: cmd += ' --partitions %s' % partitions - cmd += " 2>> /mnt/get_offset_shell.log | tee -a /mnt/get_offset_shell.log &" + cmd += " 2>> %s/get_offset_shell.log" % KafkaService.PERSISTENT_ROOT + cmd += " | tee -a %s/get_offset_shell.log &" % KafkaService.PERSISTENT_ROOT output = "" self.logger.debug(cmd) for line in node.account.ssh_capture(cmd): http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/monitor/jmx.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index 7331cb9..2872201 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os + from ducktape.cluster.remoteaccount import RemoteCommandError from ducktape.utils.util import wait_until from kafkatest.version import get_version, V_0_11_0_0, DEV_BRANCH @@ -25,7 +27,7 @@ class JmxMixin(object): - we assume the service using JmxMixin also uses KafkaPathResolverMixin - this uses the --wait option for JmxTool, so the list of object names must be explicit; no patterns are permitted """ - def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None): + def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, root="/mnt"): self.jmx_object_names = jmx_object_names self.jmx_attributes = jmx_attributes or [] self.jmx_port = 9192 @@ -35,12 +37,12 @@ class JmxMixin(object): self.maximum_jmx_value = {} # map from object_attribute_name to maximum value observed over time self.average_jmx_value = {} # map from object_attribute_name to average value observed over time - self.jmx_tool_log = "/mnt/jmx_tool.log" - self.jmx_tool_err_log = "/mnt/jmx_tool.err.log" + self.jmx_tool_log = os.path.join(root, "jmx_tool.log") + self.jmx_tool_err_log = os.path.join(root, "jmx_tool.err.log") def clean_node(self, node): node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True) - node.account.ssh("rm -rf %s" % self.jmx_tool_log, allow_fail=False) + node.account.ssh("rm -f -- %s %s" % (self.jmx_tool_log, self.jmx_tool_err_log), allow_fail=False) def start_jmx_tool(self, idx, node): if self.jmx_object_names is None: @@ -83,10 +85,10 @@ class JmxMixin(object): def _jmx_has_output(self, node): """Helper used as a proxy to determine whether jmx is running by that jmx_tool_log contains output.""" try: - node.account.ssh("test -z \"$(cat %s)\"" % self.jmx_tool_log, allow_fail=False) - return False - except RemoteCommandError: + node.account.ssh("test -s %s" % self.jmx_tool_log, allow_fail=False) return True + except RemoteCommandError: + return False def read_jmx_output(self, idx, node): if not self.started[idx-1]: http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/performance/producer_performance.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index ff92da8..38bcc8c 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -37,7 +37,8 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=DEV_BRANCH, settings=None, intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=None): - JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or []) + JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [], + root=ProducerPerformanceService.PERSISTENT_ROOT) PerformanceService.__init__(self, context, num_nodes) self.logs = { http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/templates/zookeeper.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/templates/zookeeper.properties b/tests/kafkatest/services/templates/zookeeper.properties index 6048f31..c6fa43f 100644 --- a/tests/kafkatest/services/templates/zookeeper.properties +++ b/tests/kafkatest/services/templates/zookeeper.properties @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -dataDir=/mnt/zookeeper +dataDir=/mnt/zookeeper/data clientPort=2181 maxClientCnxns=0 initLimit=5 http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/zookeeper.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py index 060d632..be802bb 100644 --- a/tests/kafkatest/services/zookeeper.py +++ b/tests/kafkatest/services/zookeeper.py @@ -14,6 +14,7 @@ # limitations under the License. +import os import re import time @@ -27,13 +28,15 @@ from kafkatest.version import DEV_BRANCH class ZookeeperService(KafkaPathResolverMixin, Service): + ROOT = "/mnt/zookeeper" + DATA = os.path.join(ROOT, "data") logs = { "zk_log": { - "path": "/mnt/zk.log", + "path": "%s/zk.log" % ROOT, "collect_default": True}, "zk_data": { - "path": "/mnt/zookeeper", + "path": DATA, "collect_default": False} } @@ -64,19 +67,19 @@ class ZookeeperService(KafkaPathResolverMixin, Service): idx = self.idx(node) self.logger.info("Starting ZK node %d on %s", idx, node.account.hostname) - node.account.ssh("mkdir -p /mnt/zookeeper") - node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx) + node.account.ssh("mkdir -p %s" % ZookeeperService.DATA) + node.account.ssh("echo %d > %s/myid" % (idx, ZookeeperService.DATA)) self.security_config.setup_node(node) config_file = self.render('zookeeper.properties') self.logger.info("zookeeper.properties:") self.logger.info(config_file) - node.account.create_file("/mnt/zookeeper.properties", config_file) + node.account.create_file("%s/zookeeper.properties" % ZookeeperService.ROOT, config_file) start_cmd = "export KAFKA_OPTS=\"%s\";" % (self.kafka_opts + ' ' + self.security_system_properties) \ if self.security_config.zk_sasl else self.kafka_opts start_cmd += "%s " % self.path.script("zookeeper-server-start.sh", node) - start_cmd += "/mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &" % self.logs["zk_log"] + start_cmd += "%s/zookeeper.properties &>> %s &" % (ZookeeperService.ROOT, self.logs["zk_log"]["path"]) node.account.ssh(start_cmd) time.sleep(5) # give it some time to start @@ -104,7 +107,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service): self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % (self.__class__.__name__, node.account)) node.account.kill_process("zookeeper", clean_shutdown=False, allow_fail=True) - node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False) + node.account.ssh("rm -rf -- %s" % ZookeeperService.ROOT, allow_fail=False) def connect_setting(self, chroot=None):