Repository: kafka
Updated Branches:
  refs/heads/trunk bf3bfd674 -> 57dbe39ae


KAFKA-5743; Ducktape services should use subdirs of /mnt

Author: Colin P. Mccabe <cmcc...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #3680 from cmccabe/KAFKA-5743


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57dbe39a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57dbe39a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57dbe39a

Branch: refs/heads/trunk
Commit: 57dbe39ae1e8312c5e0e6603061d2816a8c05198
Parents: bf3bfd6
Author: Colin P. Mccabe <cmcc...@confluent.io>
Authored: Mon Aug 21 18:36:45 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Mon Aug 21 18:36:45 2017 +0100

----------------------------------------------------------------------
 tests/kafkatest/services/console_consumer.py       |  7 ++++---
 tests/kafkatest/services/kafka/config.py           |  2 +-
 tests/kafkatest/services/kafka/kafka.py            | 17 +++++++++--------
 tests/kafkatest/services/monitor/jmx.py            | 16 +++++++++-------
 .../services/performance/producer_performance.py   |  3 ++-
 .../services/templates/zookeeper.properties        |  2 +-
 tests/kafkatest/services/zookeeper.py              | 17 ++++++++++-------
 7 files changed, 36 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py 
b/tests/kafkatest/services/console_consumer.py
index 312131e..556f43a 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -73,8 +73,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
     LOG_FILE = os.path.join(LOG_DIR, "console_consumer.log")
     LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
     CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "console_consumer.properties")
-    JMX_TOOL_LOG = "/mnt/jmx_tool.log"
-    JMX_TOOL_ERROR_LOG = "/mnt/jmx_tool.err.log"
+    JMX_TOOL_LOG = os.path.join(PERSISTENT_ROOT, "jmx_tool.log")
+    JMX_TOOL_ERROR_LOG = os.path.join(PERSISTENT_ROOT, "jmx_tool.err.log")
 
     logs = {
         "consumer_stdout": {
@@ -120,7 +120,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
             print_timestamp             if True, print each message's 
timestamp as well
             isolation_level             How to handle transactional messages.
         """
-        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or 
[])
+        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or 
[],
+                          root=ConsoleConsumer.PERSISTENT_ROOT)
         BackgroundThreadService.__init__(self, context, num_nodes)
         self.kafka = kafka
         self.new_consumer = new_consumer

http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/kafka/config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config.py 
b/tests/kafkatest/services/kafka/config.py
index 462277f..f2d6946 100644
--- a/tests/kafkatest/services/kafka/config.py
+++ b/tests/kafkatest/services/kafka/config.py
@@ -24,7 +24,7 @@ class KafkaConfig(dict):
     DEFAULTS = {
         config_property.PORT: 9092,
         config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536,
-        config_property.LOG_DIRS: 
"/mnt/kafka-data-logs-1,/mnt/kafka-data-logs-2",
+        config_property.LOG_DIRS: 
"/mnt/kafka/kafka-data-logs-1,/mnt/kafka/kafka-data-logs-2",
         config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: 2000
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index febfe55..743f63a 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -37,7 +37,7 @@ Port = collections.namedtuple('Port', ['name', 'number', 
'open'])
 
 class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
-    PERSISTENT_ROOT = "/mnt"
+    PERSISTENT_ROOT = "/mnt/kafka"
     STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, 
"server-start-stdout-stderr.log")
     LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "kafka-log4j.properties")
     # Logs such as controller.log, server.log, etc all go here
@@ -80,7 +80,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         :type topics: dict
         """
         Service.__init__(self, context, num_nodes)
-        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or 
[])
+        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or 
[],
+                          root=KafkaService.PERSISTENT_ROOT)
 
         self.zk = zk
 
@@ -94,7 +95,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.zk_set_acl = False
         self.server_prop_overides = server_prop_overides
         self.log_level = "DEBUG"
-        self.num_nodes = num_nodes
         self.zk_chroot = zk_chroot
 
         #
@@ -201,7 +201,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         # TODO - clean up duplicate configuration logic
         prop_file = cfg.render()
         prop_file += self.render('kafka.properties', node=node, 
broker_id=self.idx(node),
-                                 security_config=self.security_config)
+                                 security_config=self.security_config, 
num_nodes=self.num_nodes)
         return prop_file
 
     def start_cmd(self, node):
@@ -216,6 +216,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         return cmd
 
     def start_node(self, node):
+        node.account.mkdirs(KafkaService.PERSISTENT_ROOT)
         prop_file = self.prop_file(node)
         self.logger.info("kafka.properties:")
         self.logger.info(prop_file)
@@ -243,8 +244,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
     def pids(self, node):
         """Return process ids associated with running processes on the given 
node."""
         try:
-            cmd = "ps ax | grep -i kafka | grep java | grep -v grep | awk 
'{print $1}'"
-
+            cmd = "jcmd | grep -e kafka.Kafka | awk '{print $1}'"
             pid_arr = [pid for pid in node.account.ssh_capture(cmd, 
allow_fail=True, callback=int)]
             return pid_arr
         except (RemoteCommandError, ValueError) as e:
@@ -271,7 +271,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         JmxMixin.clean_node(self, node)
         self.security_config.clean_node(node)
         node.account.kill_process("kafka", clean_shutdown=False, 
allow_fail=True)
-        node.account.ssh("sudo rm -rf /mnt/*", allow_fail=False)
+        node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, 
allow_fail=False)
 
     def create_topic(self, topic_cfg, node=None):
         """Run the admin tool create topic command.
@@ -648,7 +648,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         if partitions:
             cmd += '  --partitions %s' % partitions
 
-        cmd += " 2>> /mnt/get_offset_shell.log | tee -a 
/mnt/get_offset_shell.log &"
+        cmd += " 2>> %s/get_offset_shell.log" % KafkaService.PERSISTENT_ROOT
+        cmd += " | tee -a %s/get_offset_shell.log &" % 
KafkaService.PERSISTENT_ROOT
         output = ""
         self.logger.debug(cmd)
         for line in node.account.ssh_capture(cmd):

http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/monitor/jmx.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/monitor/jmx.py 
b/tests/kafkatest/services/monitor/jmx.py
index 7331cb9..2872201 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import os
+
 from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.utils.util import wait_until
 from kafkatest.version import get_version, V_0_11_0_0, DEV_BRANCH
@@ -25,7 +27,7 @@ class JmxMixin(object):
     - we assume the service using JmxMixin also uses KafkaPathResolverMixin
     - this uses the --wait option for JmxTool, so the list of object names 
must be explicit; no patterns are permitted
     """
-    def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None):
+    def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, 
root="/mnt"):
         self.jmx_object_names = jmx_object_names
         self.jmx_attributes = jmx_attributes or []
         self.jmx_port = 9192
@@ -35,12 +37,12 @@ class JmxMixin(object):
         self.maximum_jmx_value = {}  # map from object_attribute_name to 
maximum value observed over time
         self.average_jmx_value = {}  # map from object_attribute_name to 
average value observed over time
 
-        self.jmx_tool_log = "/mnt/jmx_tool.log"
-        self.jmx_tool_err_log = "/mnt/jmx_tool.err.log"
+        self.jmx_tool_log = os.path.join(root, "jmx_tool.log")
+        self.jmx_tool_err_log = os.path.join(root, "jmx_tool.err.log")
 
     def clean_node(self, node):
         node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True)
-        node.account.ssh("rm -rf %s" % self.jmx_tool_log, allow_fail=False)
+        node.account.ssh("rm -f -- %s %s" % (self.jmx_tool_log, 
self.jmx_tool_err_log), allow_fail=False)
 
     def start_jmx_tool(self, idx, node):
         if self.jmx_object_names is None:
@@ -83,10 +85,10 @@ class JmxMixin(object):
     def _jmx_has_output(self, node):
         """Helper used as a proxy to determine whether jmx is running by that 
jmx_tool_log contains output."""
         try:
-            node.account.ssh("test -z \"$(cat %s)\"" % self.jmx_tool_log, 
allow_fail=False)
-            return False
-        except RemoteCommandError:
+            node.account.ssh("test -s %s" % self.jmx_tool_log, 
allow_fail=False)
             return True
+        except RemoteCommandError:
+            return False
 
     def read_jmx_output(self, idx, node):
         if not self.started[idx-1]:

http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py 
b/tests/kafkatest/services/performance/producer_performance.py
index ff92da8..38bcc8c 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -37,7 +37,8 @@ class ProducerPerformanceService(JmxMixin, 
PerformanceService):
     def __init__(self, context, num_nodes, kafka, topic, num_records, 
record_size, throughput, version=DEV_BRANCH, settings=None,
                  intermediate_stats=False, client_id="producer-performance", 
jmx_object_names=None, jmx_attributes=None):
 
-        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or 
[])
+        JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or 
[],
+                          root=ProducerPerformanceService.PERSISTENT_ROOT)
         PerformanceService.__init__(self, context, num_nodes)
 
         self.logs = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/templates/zookeeper.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/zookeeper.properties 
b/tests/kafkatest/services/templates/zookeeper.properties
index 6048f31..c6fa43f 100644
--- a/tests/kafkatest/services/templates/zookeeper.properties
+++ b/tests/kafkatest/services/templates/zookeeper.properties
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-dataDir=/mnt/zookeeper
+dataDir=/mnt/zookeeper/data
 clientPort=2181
 maxClientCnxns=0
 initLimit=5

http://git-wip-us.apache.org/repos/asf/kafka/blob/57dbe39a/tests/kafkatest/services/zookeeper.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/zookeeper.py 
b/tests/kafkatest/services/zookeeper.py
index 060d632..be802bb 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 
+import os
 import re
 import time
 
@@ -27,13 +28,15 @@ from kafkatest.version import DEV_BRANCH
 
 
 class ZookeeperService(KafkaPathResolverMixin, Service):
+    ROOT = "/mnt/zookeeper"
+    DATA = os.path.join(ROOT, "data")
 
     logs = {
         "zk_log": {
-            "path": "/mnt/zk.log",
+            "path": "%s/zk.log" % ROOT,
             "collect_default": True},
         "zk_data": {
-            "path": "/mnt/zookeeper",
+            "path": DATA,
             "collect_default": False}
     }
 
@@ -64,19 +67,19 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         idx = self.idx(node)
         self.logger.info("Starting ZK node %d on %s", idx, 
node.account.hostname)
 
-        node.account.ssh("mkdir -p /mnt/zookeeper")
-        node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx)
+        node.account.ssh("mkdir -p %s" % ZookeeperService.DATA)
+        node.account.ssh("echo %d > %s/myid" % (idx, ZookeeperService.DATA))
 
         self.security_config.setup_node(node)
         config_file = self.render('zookeeper.properties')
         self.logger.info("zookeeper.properties:")
         self.logger.info(config_file)
-        node.account.create_file("/mnt/zookeeper.properties", config_file)
+        node.account.create_file("%s/zookeeper.properties" % 
ZookeeperService.ROOT, config_file)
 
         start_cmd = "export KAFKA_OPTS=\"%s\";" % (self.kafka_opts + ' ' + 
self.security_system_properties) \
             if self.security_config.zk_sasl else self.kafka_opts
         start_cmd += "%s " % self.path.script("zookeeper-server-start.sh", 
node)
-        start_cmd += "/mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &" % 
self.logs["zk_log"]
+        start_cmd += "%s/zookeeper.properties &>> %s &" % 
(ZookeeperService.ROOT, self.logs["zk_log"]["path"])
         node.account.ssh(start_cmd)
 
         time.sleep(5)  # give it some time to start
@@ -104,7 +107,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
             self.logger.warn("%s %s was still alive at cleanup time. Killing 
forcefully..." %
                              (self.__class__.__name__, node.account))
         node.account.kill_process("zookeeper", clean_shutdown=False, 
allow_fail=True)
-        node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties 
/mnt/zk.log", allow_fail=False)
+        node.account.ssh("rm -rf -- %s" % ZookeeperService.ROOT, 
allow_fail=False)
 
 
     def connect_setting(self, chroot=None):

Reply via email to