This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new 90fb79b KAFKA-7834: Extend collected logs in system test services to include heap dumps 90fb79b is described below commit 90fb79b4c17b242b83288bcabe06466347f5141f Author: Konstantine Karantasis <konstant...@confluent.io> AuthorDate: Mon Feb 4 16:46:03 2019 -0800 KAFKA-7834: Extend collected logs in system test services to include heap dumps * Enable heap dumps on OOM with -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=<file.bin> in the major services in system tests * Collect the heap dump from the predefined location as part of the result logs for each service * Change Connect service to delete the whole root directory instead of individual expected files * Tested by running the full suite of system tests Author: Konstantine Karantasis <konstant...@confluent.io> Reviewers: Ewen Cheslack-Postava <e...@confluent.io> Closes #6158 from kkonstantine/KAFKA-7834 --- tests/kafkatest/services/connect.py | 26 ++++++++++++++++++++++---- tests/kafkatest/services/kafka/kafka.py | 11 +++++++++-- tests/kafkatest/services/zookeeper.py | 10 ++++++++-- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index bf38e50..40c2cf3 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -42,6 +42,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): PID_FILE = os.path.join(PERSISTENT_ROOT, "connect.pid") EXTERNAL_CONFIGS_FILE = os.path.join(PERSISTENT_ROOT, "connect-external-configs.properties") CONNECT_REST_PORT = 8083 + HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "connect_heap_dump.bin") # Currently the Connect worker supports waiting on three modes: STARTUP_MODE_INSTANT = 'INSTANT' @@ -61,6 +62,9 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): "connect_stderr": { "path": STDERR_FILE, "collect_default": True}, + "connect_heap_dump_file": { + "path": HEAP_DUMP_FILE, + "collect_default": True} } def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 60): @@ -160,8 +164,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): def clean_node(self, node): node.account.kill_process("connect", clean_shutdown=False, allow_fail=True) self.security_config.clean_node(node) - all_files = " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE, self.EXTERNAL_CONFIGS_FILE] + self.config_filenames() + self.files) - node.account.ssh("rm -rf " + all_files, allow_fail=False) + other_files = " ".join(self.config_filenames() + self.files) + node.account.ssh("rm -rf -- %s %s" % (ConnectServiceBase.PERSISTENT_ROOT, other_files), allow_fail=False) def config_filenames(self): return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])] @@ -252,6 +256,14 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): def _base_url(self, node): return 'http://' + node.account.externally_routable_ip + ':' + str(self.CONNECT_REST_PORT) + def append_to_environment_variable(self, envvar, value): + env_opts = self.environment[envvar] + if env_opts is None: + env_opts = "\"%s\"" % value + else: + env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value) + self.environment[envvar] = env_opts + class ConnectStandaloneService(ConnectServiceBase): """Runs Kafka Connect in standalone mode.""" @@ -266,7 +278,10 @@ class ConnectStandaloneService(ConnectServiceBase): def start_cmd(self, node, connector_configs): cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE - cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts + heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \ + self.logs["connect_heap_dump_file"]["path"] + other_kafka_opts = self.security_config.kafka_opts.strip('\"') + cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts) for envvar in self.environment: cmd += "export %s=%s; " % (envvar, str(self.environment[envvar])) cmd += "%s %s " % (self.path.script("connect-standalone.sh", node), self.CONFIG_FILE) @@ -314,7 +329,10 @@ class ConnectDistributedService(ConnectServiceBase): # connector_configs argument is intentionally ignored in distributed service. def start_cmd(self, node, connector_configs): cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE - cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts + heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \ + self.logs["connect_heap_dump_file"]["path"] + other_kafka_opts = self.security_config.kafka_opts.strip('\"') + cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts) for envvar in self.environment: cmd += "export %s=%s; " % (envvar, str(self.environment[envvar])) cmd += "%s %s " % (self.path.script("connect-distributed.sh", node), self.CONFIG_FILE) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 2258e27..a59bb71 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -49,6 +49,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties") # Kafka Authorizer SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer" + HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin") logs = { "kafka_server_start_stdout_stderr": { @@ -65,7 +66,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): "collect_default": False}, "kafka_data_2": { "path": DATA_LOG_DIR_2, - "collect_default": False} + "collect_default": False}, + "kafka_heap_dump_file": { + "path": HEAP_DUMP_FILE, + "collect_default": True} } def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, @@ -247,7 +251,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def start_cmd(self, node): cmd = "export JMX_PORT=%d; " % self.jmx_port cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG - cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts + heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \ + self.logs["kafka_heap_dump_file"]["path"] + other_kafka_opts = self.security_config.kafka_opts.strip('\"') + cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts) cmd += "%s %s 1>> %s 2>> %s &" % \ (self.path.script("kafka-server-start.sh", node), KafkaService.CONFIG_FILE, diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py index 5bda867..f6a6b02 100644 --- a/tests/kafkatest/services/zookeeper.py +++ b/tests/kafkatest/services/zookeeper.py @@ -30,6 +30,7 @@ from kafkatest.version import DEV_BRANCH class ZookeeperService(KafkaPathResolverMixin, Service): ROOT = "/mnt/zookeeper" DATA = os.path.join(ROOT, "data") + HEAP_DUMP_FILE = os.path.join(ROOT, "zk_heap_dump.bin") logs = { "zk_log": { @@ -37,7 +38,10 @@ class ZookeeperService(KafkaPathResolverMixin, Service): "collect_default": True}, "zk_data": { "path": DATA, - "collect_default": False} + "collect_default": False}, + "zk_heap_dump_file": { + "path": HEAP_DUMP_FILE, + "collect_default": True} } def __init__(self, context, num_nodes, zk_sasl = False): @@ -76,8 +80,10 @@ class ZookeeperService(KafkaPathResolverMixin, Service): self.logger.info(config_file) node.account.create_file("%s/zookeeper.properties" % ZookeeperService.ROOT, config_file) - start_cmd = "export KAFKA_OPTS=\"%s\";" % (self.kafka_opts + ' ' + self.security_system_properties) \ + heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % self.logs["zk_heap_dump_file"]["path"] + other_kafka_opts = self.kafka_opts + ' ' + self.security_system_properties \ if self.security_config.zk_sasl else self.kafka_opts + start_cmd = "export KAFKA_OPTS=\"%s %s\";" % (heap_kafka_opts, other_kafka_opts) start_cmd += "%s " % self.path.script("zookeeper-server-start.sh", node) start_cmd += "%s/zookeeper.properties &>> %s &" % (ZookeeperService.ROOT, self.logs["zk_log"]["path"]) node.account.ssh(start_cmd)