http://git-wip-us.apache.org/repos/asf/kafka/blob/d50499a0/system_test/utils/kafka_system_test_utils.py ---------------------------------------------------------------------- diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py deleted file mode 100644 index a9b73f7..0000000 --- a/system_test/utils/kafka_system_test_utils.py +++ /dev/null @@ -1,2512 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -#!/usr/bin/env python - -# =================================== -# kafka_system_test_utils.py -# =================================== - -import datetime -import getpass -import hashlib -import inspect -import json -import logging -import os -import pprint -import re -import subprocess -import sys -import thread -import time -import traceback - -import system_test_utils -import metrics - -from datetime import datetime -from time import mktime - -# ==================================================================== -# Two logging formats are defined in system_test/system_test_runner.py -# ==================================================================== - -# 1. "namedLogger" is defined to log message in this format: -# "%(asctime)s - %(levelname)s - %(message)s %(name_of_class)s" -# usage: to log message and showing the class name of the message - -logger = logging.getLogger("namedLogger") -thisClassName = '(kafka_system_test_utils)' -d = {'name_of_class': thisClassName} - -# 2. "anonymousLogger" is defined to log message in this format: -# "%(asctime)s - %(levelname)s - %(message)s" -# usage: to log message without showing class name and it's appropriate -# for logging generic message such as "sleeping for 5 seconds" - -anonLogger = logging.getLogger("anonymousLogger") - - -# ===================================== -# Sample usage of getting testcase env -# ===================================== -def get_testcase_env(testcaseEnv): - anonLogger.info("================================================") - anonLogger.info("systemTestBaseDir : " + testcaseEnv.systemTestBaseDir) - anonLogger.info("testSuiteBaseDir : " + testcaseEnv.testSuiteBaseDir) - anonLogger.info("testCaseBaseDir : " + testcaseEnv.testCaseBaseDir) - anonLogger.info("testCaseLogsDir : " + testcaseEnv.testCaseLogsDir) - anonLogger.info("userDefinedEnvVarDict : (testcaseEnv.userDefinedEnvVarDict)") - anonLogger.info("================================================") - - -def get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, type): - - defaultLogDir = testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId - - # type is either "metrics" or "dashboards" or "default" - if type == "metrics": - return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId + "/metrics" - elif type == "log_segments" : - return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId + "/log_segments" - elif type == "default" : - return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId - elif type == "dashboards": - return testcaseEnv.testCaseLogsDir + "/dashboards" - elif type == "config": - return testcaseEnv.testCaseBaseDir + "/config" - else: - logger.error("unrecognized log directory type : " + type, extra=d) - logger.error("returning default log dir : " + defaultLogDir, extra=d) - return defaultLogDir - - -def generate_testcase_log_dirs(systemTestEnv, testcaseEnv): - - testcasePathName = testcaseEnv.testCaseBaseDir - logger.debug("testcase pathname: " + testcasePathName, extra=d) - - if not os.path.exists(testcasePathName + "/config") : os.makedirs(testcasePathName + "/config") - if not os.path.exists(testcasePathName + "/logs") : os.makedirs(testcasePathName + "/logs") - if not os.path.exists(testcasePathName + "/dashboards") : os.makedirs(testcasePathName + "/dashboards") - - dashboardsPathName = testcasePathName + "/dashboards" - if not os.path.exists(dashboardsPathName) : os.makedirs(dashboardsPathName) - - for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: - entityId = clusterEntityConfigDict["entity_id"] - role = clusterEntityConfigDict["role"] - - metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics") - if not os.path.exists(metricsPathName) : os.makedirs(metricsPathName) - - # create the role directory under dashboards - dashboardsRoleDir = dashboardsPathName + "/" + role - if not os.path.exists(dashboardsRoleDir) : os.makedirs(dashboardsRoleDir) - - -def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): - anonLogger.info("================================================") - anonLogger.info("collecting logs from remote machines") - anonLogger.info("================================================") - - testCaseBaseDir = testcaseEnv.testCaseBaseDir - tcConfigsList = testcaseEnv.testcaseConfigsList - - for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: - hostname = clusterEntityConfigDict["hostname"] - entity_id = clusterEntityConfigDict["entity_id"] - role = clusterEntityConfigDict["role"] - kafkaHome = clusterEntityConfigDict["kafka_home"] - - logger.debug("entity_id : " + entity_id, extra=d) - logger.debug("hostname : " + hostname, extra=d) - logger.debug("role : " + role, extra=d) - - configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config") - metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics") - logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "default") - rmtLogPathName = logPathName - rmtMetricsPathName = metricsPathName - - if hostname != "localhost": - rmtConfigPathName = replace_kafka_home(configPathName, kafkaHome) - rmtMetricsPathName = replace_kafka_home(metricsPathName, kafkaHome) - rmtLogPathName = replace_kafka_home(logPathName, kafkaHome) - - # ============================== - # collect entity log file - # ============================== - cmdList = ["scp", - hostname + ":" + rmtLogPathName + "/*", - logPathName] - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - # ============================== - # collect entity metrics file - # ============================== - cmdList = ["scp", - hostname + ":" + rmtMetricsPathName + "/*", - metricsPathName] - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - # ============================== - # collect broker log segment file - # ============================== - if role == "broker": - dataLogPathName = system_test_utils.get_data_by_lookup_keyval( - testcaseEnv.testcaseConfigsList, "entity_id", entity_id, "log.dir") - - cmdList = ["scp -r", - hostname + ":" + dataLogPathName, - logPathName] - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - # ============================== - # collect ZK log - # ============================== - if role == "zookeeper": - dataLogPathName = system_test_utils.get_data_by_lookup_keyval( - testcaseEnv.testcaseConfigsList, "entity_id", entity_id, "dataDir") - - cmdList = ["scp -r", - hostname + ":" + dataLogPathName, - logPathName] - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - # ============================== - # collect dashboards file - # ============================== - dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards") - rmtDashboardsPathName = dashboardsPathName - - if hostname != "localhost": - rmtDashboardsPathName = replace_kafka_home(dashboardsPathName, kafkaHome) - - cmdList = ["scp", - hostname + ":" + rmtDashboardsPathName + "/*", - dashboardsPathName] - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - -def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv): - testCaseBaseDir = testcaseEnv.testCaseBaseDir - - for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: - hostname = clusterEntityConfigDict["hostname"] - entity_id = clusterEntityConfigDict["entity_id"] - role = clusterEntityConfigDict["role"] - kafkaHome = clusterEntityConfigDict["kafka_home"] - - logger.debug("entity_id : " + entity_id, extra=d) - logger.debug("hostname : " + hostname, extra=d) - logger.debug("role : " + role, extra=d) - - configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config") - metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics") - dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards") - - if hostname != "localhost": - configPathName = replace_kafka_home(configPathName, kafkaHome) - metricsPathName = replace_kafka_home(metricsPathName, kafkaHome) - dashboardsPathName = replace_kafka_home(dashboardsPathName, kafkaHome) - - cmdList = ["ssh " + hostname, - "'mkdir -p", - configPathName, - metricsPathName, - dashboardsPathName + "'"] - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - -def init_entity_props(systemTestEnv, testcaseEnv): - clusterConfigsList = systemTestEnv.clusterEntityConfigDictList - testcaseConfigsList = testcaseEnv.testcaseConfigsList - testcasePathName = testcaseEnv.testCaseBaseDir - - try: - # consumer config / log files location - consEntityIdList = system_test_utils.get_data_from_list_of_dicts( \ - clusterConfigsList, "role", "console_consumer", "entity_id") - consLogList = system_test_utils.get_data_from_list_of_dicts( \ - testcaseConfigsList, "entity_id", consEntityIdList[0], "log_filename") - consLogPathname = testcasePathName + "/logs/" + consLogList[0] - consCfgList = system_test_utils.get_data_from_list_of_dicts( \ - testcaseConfigsList, "entity_id", consEntityIdList[0], "config_filename") - consCfgPathname = testcasePathName + "/config/" + consCfgList[0] - - # producer config / log files location - prodEntityIdList = system_test_utils.get_data_from_list_of_dicts( \ - clusterConfigsList, "role", "producer_performance", "entity_id") - prodLogList = system_test_utils.get_data_from_list_of_dicts( \ - testcaseConfigsList, "entity_id", prodEntityIdList[0], "log_filename") - prodLogPathname = testcasePathName + "/logs/" + prodLogList[0] - prodCfgList = system_test_utils.get_data_from_list_of_dicts( \ - testcaseConfigsList, "entity_id", prodEntityIdList[0], "config_filename") - prodCfgPathname = testcasePathName + "/config/" + prodCfgList[0] - except: - logger.error("Failed to initialize entity config/log path names: possibly mismatched " \ - + "number of entities in cluster_config.json & testcase_n_properties.json", extra=d) - raise - - testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consLogPathname - testcaseEnv.userDefinedEnvVarDict["consumerConfigPathName"] = consCfgPathname - testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = prodLogPathname - testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"] = prodCfgPathname - - -def copy_file_with_dict_values(srcFile, destFile, dictObj, keyValToAddDict): - infile = open(srcFile, "r") - inlines = infile.readlines() - infile.close() - - outfile = open(destFile, 'w') - for line in inlines: - for key in dictObj.keys(): - if (line.startswith(key + "=")): - line = key + "=" + dictObj[key] + "\n" - outfile.write(line) - - if (keyValToAddDict is not None): - for key in sorted(keyValToAddDict.iterkeys()): - line = key + "=" + keyValToAddDict[key] + "\n" - outfile.write(line) - - outfile.close() - -def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv): - logger.info("calling generate_properties_files", extra=d) - - clusterConfigsList = systemTestEnv.clusterEntityConfigDictList - tcPathname = testcaseEnv.testCaseBaseDir - tcConfigsList = testcaseEnv.testcaseConfigsList - - cfgTemplatePathname = os.path.abspath(testsuitePathname + "/config") - cfgDestPathname = os.path.abspath(tcPathname + "/config") - logger.info("config template (source) pathname : " + cfgTemplatePathname, extra=d) - logger.info("testcase config (dest) pathname : " + cfgDestPathname, extra=d) - - # loop through all zookeepers (if more than 1) to retrieve host and clientPort - # to construct a zookeeper.connect str for broker in the form of: - # zookeeper.connect=<host1>:<port1>,<host2>:<port2>,... - testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] = "" - testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] = "" - testcaseEnv.userDefinedEnvVarDict["sourceZkEntityIdList"] = [] - testcaseEnv.userDefinedEnvVarDict["targetZkEntityIdList"] = [] - testcaseEnv.userDefinedEnvVarDict["sourceZkHostPortDict"] = {} - testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"] = {} - testcaseEnv.userDefinedEnvVarDict["sourceBrokerEntityIdList"] = [] - testcaseEnv.userDefinedEnvVarDict["targetBrokerEntityIdList"] = [] - testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"] = "" - testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] = "" - - # update zookeeper cluster info into "testcaseEnv.userDefinedEnvVarDict" - zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper") - - for zkDict in zkDictList: - entityID = zkDict["entity_id"] - hostname = zkDict["hostname"] - clusterName = zkDict["cluster_name"] - clientPortList = system_test_utils.get_data_from_list_of_dicts(tcConfigsList, "entity_id", entityID, "clientPort") - clientPort = clientPortList[0] - - if clusterName == "source": - # update source cluster zookeeper entities - testcaseEnv.userDefinedEnvVarDict["sourceZkEntityIdList"].append(entityID) - if ( len(testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]) == 0 ): - testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] = hostname + ":" + clientPort - else: - testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] += "," + hostname + ":" + clientPort - - # generate these strings for zookeeper config: - # server.1=host1:2180:2182 - # server.2=host2:2180:2182 - zkClusterSize = len(testcaseEnv.userDefinedEnvVarDict["sourceZkHostPortDict"]) - zkClusterId = str(zkClusterSize + 1) - key = "server." + zkClusterId - val = hostname + ":" + str(int(clientPort) - 1) + ":" + str(int(clientPort) + 1) - testcaseEnv.userDefinedEnvVarDict["sourceZkHostPortDict"][key] = val - - elif clusterName == "target": - # update target cluster zookeeper entities - testcaseEnv.userDefinedEnvVarDict["targetZkEntityIdList"].append(entityID) - if ( len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) == 0 ): - testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] = hostname + ":" + clientPort - else: - testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] += "," + hostname + ":" + clientPort - - # generate these strings for zookeeper config: - # server.1=host1:2180:2182 - # server.2=host2:2180:2182 - zkClusterSize = len(testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"]) - zkClusterId = str(zkClusterSize + 1) - key = "server." + zkClusterId - val = hostname + ":" + str(int(clientPort) - 1) + ":" + str(int(clientPort) + 1) - testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"][key] = val - - else: - logger.error("Invalid cluster name: " + clusterName, extra=d) - raise Exception("Invalid cluster name : " + clusterName) - sys.exit(1) - - # update broker cluster info into "testcaseEnv.userDefinedEnvVarDict" - brokerDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "broker") - for brokerDict in brokerDictList: - entityID = brokerDict["entity_id"] - hostname = brokerDict["hostname"] - clusterName = brokerDict["cluster_name"] - portList = system_test_utils.get_data_from_list_of_dicts(tcConfigsList, "entity_id", entityID, "port") - port = portList[0] - - if clusterName == "source": - if ( len(testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"]) == 0 ): - testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"] = hostname + ":" + port - else: - testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"] += "," + hostname + ":" + port - elif clusterName == "target": - if ( len(testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]) == 0 ): - testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] = hostname + ":" + port - else: - testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] += "," + hostname + ":" + port - else: - logger.error("Invalid cluster name: " + clusterName, extra=d) - raise Exception("Invalid cluster name : " + clusterName) - sys.exit(1) - - # for each entity in the cluster config - for clusterCfg in clusterConfigsList: - cl_entity_id = clusterCfg["entity_id"] - - # loop through testcase config list 'tcConfigsList' for a matching cluster entity_id - for tcCfg in tcConfigsList: - if (tcCfg["entity_id"] == cl_entity_id): - - # copy the associated .properties template, update values, write to testcase_<xxx>/config - - if (clusterCfg["role"] == "broker"): - brokerVersion = "0.8" - try: - brokerVersion = tcCfg["version"] - except: - pass - - if (brokerVersion == "0.7"): - if clusterCfg["cluster_name"] == "source": - tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] - else: - logger.error("Unknown cluster name for 0.7: " + clusterName, extra=d) - sys.exit(1) - else: - if clusterCfg["cluster_name"] == "source": - tcCfg["zookeeper.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] - elif clusterCfg["cluster_name"] == "target": - tcCfg["zookeeper.connect"] = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] - else: - logger.error("Unknown cluster name: " + clusterName, extra=d) - sys.exit(1) - - addedCSVConfig = {} - addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", clusterCfg["entity_id"], "metrics") - addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" - addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" - addedCSVConfig["kafka.csv.metrics.reporter.enabled"] = "true" - addedCSVConfig["listeners"] = "PLAINTEXT://localhost:"+tcCfg["port"] - - if brokerVersion == "0.7": - addedCSVConfig["brokerid"] = tcCfg["brokerid"] - - copy_file_with_dict_values(cfgTemplatePathname + "/server.properties", - cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, addedCSVConfig) - - elif ( clusterCfg["role"] == "zookeeper"): - if clusterCfg["cluster_name"] == "source": - copy_file_with_dict_values(cfgTemplatePathname + "/zookeeper.properties", - cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, - testcaseEnv.userDefinedEnvVarDict["sourceZkHostPortDict"]) - elif clusterCfg["cluster_name"] == "target": - copy_file_with_dict_values(cfgTemplatePathname + "/zookeeper.properties", - cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, - testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"]) - else: - logger.error("Unknown cluster name: " + clusterName, extra=d) - sys.exit(1) - - elif ( clusterCfg["role"] == "mirror_maker"): - tcCfg["metadata.broker.list"] = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] - tcCfg["bootstrap.servers"] = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] # for new producer - copy_file_with_dict_values(cfgTemplatePathname + "/mirror_producer.properties", - cfgDestPathname + "/" + tcCfg["mirror_producer_config_filename"], tcCfg, None) - - # update zookeeper.connect with the zk entities specified in cluster_config.json - tcCfg["zookeeper.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] - copy_file_with_dict_values(cfgTemplatePathname + "/mirror_consumer.properties", - cfgDestPathname + "/" + tcCfg["mirror_consumer_config_filename"], tcCfg, None) - - else: - logger.debug("UNHANDLED role " + clusterCfg["role"], extra=d) - - # scp updated config files to remote hosts - scp_file_to_remote_host(clusterConfigsList, testcaseEnv) - - -def scp_file_to_remote_host(clusterEntityConfigDictList, testcaseEnv): - - testcaseConfigsList = testcaseEnv.testcaseConfigsList - - for clusterEntityConfigDict in clusterEntityConfigDictList: - hostname = clusterEntityConfigDict["hostname"] - kafkaHome = clusterEntityConfigDict["kafka_home"] - localTestcasePathName = testcaseEnv.testCaseBaseDir - remoteTestcasePathName = localTestcasePathName - - if hostname != "localhost": - remoteTestcasePathName = replace_kafka_home(localTestcasePathName, kafkaHome) - - cmdStr = "scp " + localTestcasePathName + "/config/* " + hostname + ":" + remoteTestcasePathName + "/config" - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - -def start_zookeepers(systemTestEnv, testcaseEnv): - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - - zkEntityIdList = system_test_utils.get_data_from_list_of_dicts( - clusterEntityConfigDictList, "role", "zookeeper", "entity_id") - - for zkEntityId in zkEntityIdList: - configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "zookeeper", zkEntityId, "config") - configFile = system_test_utils.get_data_by_lookup_keyval( - testcaseEnv.testcaseConfigsList, "entity_id", zkEntityId, "config_filename") - clientPort = system_test_utils.get_data_by_lookup_keyval( - testcaseEnv.testcaseConfigsList, "entity_id", zkEntityId, "clientPort") - dataDir = system_test_utils.get_data_by_lookup_keyval( - testcaseEnv.testcaseConfigsList, "entity_id", zkEntityId, "dataDir") - hostname = system_test_utils.get_data_by_lookup_keyval( - clusterEntityConfigDictList, "entity_id", zkEntityId, "hostname") - minusOnePort = str(int(clientPort) - 1) - plusOnePort = str(int(clientPort) + 1) - - # read configFile to find out the id of the zk and create the file "myid" - infile = open(configPathName + "/" + configFile, "r") - inlines = infile.readlines() - infile.close() - - for line in inlines: - if line.startswith("server.") and hostname + ":" + minusOnePort + ":" + plusOnePort in line: - # server.1=host1:2187:2189 - matchObj = re.match("server\.(.*?)=.*", line) - zkServerId = matchObj.group(1) - - cmdStr = "ssh " + hostname + " 'mkdir -p " + dataDir + "; echo " + zkServerId + " > " + dataDir + "/myid'" - logger.debug("executing command [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) - for line in subproc.stdout.readlines(): - pass # dummy loop to wait until producer is completed - - time.sleep(2) - start_entity_in_background(systemTestEnv, testcaseEnv, zkEntityId) - -def start_brokers(systemTestEnv, testcaseEnv): - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - - brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( - clusterEntityConfigDictList, "role", "broker", "entity_id") - - for brokerEntityId in brokerEntityIdList: - start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId) - -def start_console_consumers(systemTestEnv, testcaseEnv, onlyThisEntityId=None): - - if onlyThisEntityId is not None: - start_entity_in_background(systemTestEnv, testcaseEnv, onlyThisEntityId) - else: - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - consoleConsumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( - clusterEntityConfigDictList, "role", "console_consumer", "entity_id") - for entityId in consoleConsumerEntityIdList: - start_entity_in_background(systemTestEnv, testcaseEnv, entityId) - - -def start_mirror_makers(systemTestEnv, testcaseEnv, onlyThisEntityId=None): - - if onlyThisEntityId is not None: - start_entity_in_background(systemTestEnv, testcaseEnv, onlyThisEntityId) - else: - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( - clusterEntityConfigDictList, "role", "mirror_maker", "entity_id") - - for brokerEntityId in brokerEntityIdList: - start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId) - - -def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict): - - logger.info("looking up broker shutdown...", extra=d) - - # keep track of broker related data in this dict such as broker id, - # entity id and timestamp and return it to the caller function - shutdownBrokerDict = {} - - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( - clusterEntityConfigDictList, "role", "broker", "entity_id") - - for brokerEntityId in brokerEntityIdList: - - hostname = system_test_utils.get_data_by_lookup_keyval( - clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname") - logFile = system_test_utils.get_data_by_lookup_keyval( - testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename") - - logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") - cmdStrList = ["ssh " + hostname, - "\"grep -i -h '" + leaderAttributesDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] + "' ", - logPathName + "/" + logFile + " | ", - "sort | tail -1\""] - cmdStr = " ".join(cmdStrList) - - logger.debug("executing command [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) - for line in subproc.stdout.readlines(): - - line = line.rstrip('\n') - - if leaderAttributesDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] in line: - logger.debug("found the log line : " + line, extra=d) - try: - matchObj = re.match(leaderAttributesDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"], line) - datetimeStr = matchObj.group(1) - datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f") - unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond - #print "{0:.3f}".format(unixTs) - - # update shutdownBrokerDict when - # 1. shutdownBrokerDict has no logline entry - # 2. shutdownBrokerDict has existing logline enty but found another logline with more recent timestamp - if (len(shutdownBrokerDict) > 0 and shutdownBrokerDict["timestamp"] < unixTs) or (len(shutdownBrokerDict) == 0): - shutdownBrokerDict["timestamp"] = unixTs - shutdownBrokerDict["brokerid"] = matchObj.group(2) - shutdownBrokerDict["hostname"] = hostname - shutdownBrokerDict["entity_id"] = brokerEntityId - logger.debug("brokerid: [" + shutdownBrokerDict["brokerid"] + \ - "] entity_id: [" + shutdownBrokerDict["entity_id"] + "]", extra=d) - except: - logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d) - raise - - return shutdownBrokerDict - - -def get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict): - - logger.debug("looking up leader...", extra=d) - - # keep track of leader related data in this dict such as broker id, - # entity id and timestamp and return it to the caller function - leaderDict = {} - - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \ - clusterEntityConfigDictList, "role", "broker", "entity_id") - - for brokerEntityId in brokerEntityIdList: - - hostname = system_test_utils.get_data_by_lookup_keyval( \ - clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname") - kafkaHome = system_test_utils.get_data_by_lookup_keyval( \ - clusterEntityConfigDictList, "entity_id", brokerEntityId, "kafka_home") - logFile = system_test_utils.get_data_by_lookup_keyval( \ - testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename") - - logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") - - if hostname != "localhost": - logPathName = replace_kafka_home(logPathName, kafkaHome) - - cmdStrList = ["ssh " + hostname, - "\"grep -i -h '" + leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] + "' ", - logPathName + "/" + logFile + " | ", - "sort | tail -1\""] - cmdStr = " ".join(cmdStrList) - - logger.debug("executing command [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) - for line in subproc.stdout.readlines(): - - line = line.rstrip('\n') - - if leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] in line: - logger.debug("found the log line : " + line, extra=d) - try: - matchObj = re.match(leaderAttributesDict["REGX_LEADER_ELECTION_PATTERN"], line) - datetimeStr = matchObj.group(1) - datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f") - unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond - #print "{0:.3f}".format(unixTs) - - # update leaderDict when - # 1. leaderDict has no logline entry - # 2. leaderDict has existing logline entry but found another logline with more recent timestamp - if (len(leaderDict) > 0 and leaderDict["timestamp"] < unixTs) or (len(leaderDict) == 0): - leaderDict["timestamp"] = unixTs - leaderDict["brokerid"] = matchObj.group(2) - leaderDict["topic"] = matchObj.group(3) - leaderDict["partition"] = matchObj.group(4) - leaderDict["entity_id"] = brokerEntityId - leaderDict["hostname"] = hostname - logger.debug("brokerid: [" + leaderDict["brokerid"] + "] entity_id: [" + leaderDict["entity_id"] + "]", extra=d) - except: - logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d) - raise - #else: - # logger.debug("unmatched line found [" + line + "]", extra=d) - - return leaderDict - - -def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): - - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - - # cluster configurations: - hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname") - role = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "role") - kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "kafka_home") - javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "java_home") - jmxPort = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "jmx_port") - clusterName = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "cluster_name") - - # testcase configurations: - testcaseConfigsList = testcaseEnv.testcaseConfigsList - clientPort = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "clientPort") - configFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "config_filename") - logFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename") - - useNewProducer = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "new-producer") - mmConsumerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, - "mirror_consumer_config_filename") - mmProducerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, - "mirror_producer_config_filename") - - logger.info("starting " + role + " in host [" + hostname + "] on client port [" + clientPort + "]", extra=d) - - configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "config") - logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "default") - - if hostname != "localhost": - configPathName = replace_kafka_home(configPathName, kafkaHome) - logPathName = replace_kafka_home(logPathName, kafkaHome) - - if role == "zookeeper": - cmdList = ["ssh " + hostname, - "'JAVA_HOME=" + javaHome, - "JMX_PORT=" + jmxPort, - kafkaHome + "/bin/zookeeper-server-start.sh ", - configPathName + "/" + configFile + " &> ", - logPathName + "/" + logFile + " & echo pid:$! > ", - logPathName + "/entity_" + entityId + "_pid'"] - - elif role == "broker": - cmdList = ["ssh " + hostname, - "'JAVA_HOME=" + javaHome, - "JMX_PORT=" + jmxPort, - "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/log4j.properties" % kafkaHome, - kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka", - configPathName + "/" + configFile + " >> ", - logPathName + "/" + logFile + " & echo pid:$! > ", - logPathName + "/entity_" + entityId + "_pid'"] - - elif role == "mirror_maker": - if useNewProducer.lower() == "true": - cmdList = ["ssh " + hostname, - "'JAVA_HOME=" + javaHome, - "JMX_PORT=" + jmxPort, - kafkaHome + "/bin/kafka-run-class.sh kafka.tools.MirrorMaker", - "--consumer.config " + configPathName + "/" + mmConsumerConfigFile, - "--producer.config " + configPathName + "/" + mmProducerConfigFile, - "--new.producer", - "--whitelist=\".*\" >> ", - logPathName + "/" + logFile + " & echo pid:$! > ", - logPathName + "/entity_" + entityId + "_pid'"] - else: - cmdList = ["ssh " + hostname, - "'JAVA_HOME=" + javaHome, - "JMX_PORT=" + jmxPort, - kafkaHome + "/bin/kafka-run-class.sh kafka.tools.MirrorMaker", - "--consumer.config " + configPathName + "/" + mmConsumerConfigFile, - "--producer.config " + configPathName + "/" + mmProducerConfigFile, - "--whitelist=\".*\" >> ", - logPathName + "/" + logFile + " & echo pid:$! > ", - logPathName + "/entity_" + entityId + "_pid'"] - - elif role == "console_consumer": - clusterToConsumeFrom = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "cluster_name") - numTopicsForAutoGenString = -1 - try: - numTopicsForAutoGenString = int(testcaseEnv.testcaseArgumentsDict["num_topics_for_auto_generated_string"]) - except: - pass - - topic = "" - if numTopicsForAutoGenString < 0: - topic = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "topic") - else: - topic = generate_topics_string("topic", numTopicsForAutoGenString) - - # update this variable and will be used by data validation functions - testcaseEnv.consumerTopicsString = topic - - # 2. consumer timeout - timeoutMs = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "consumer-timeout-ms") - - # 3. consumer formatter - formatterOption = "" - try: - formatterOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "formatter") - except: - pass - - # 4. consumer config - consumerProperties = {} - consumerProperties["consumer.timeout.ms"] = timeoutMs - try: - groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "group.id") - consumerProperties["group.id"] = groupOption - except: - pass - - props_file_path=write_consumer_properties(consumerProperties) - scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/" - logger.debug("executing command [" + scpCmdStr + "]", extra=d) - system_test_utils.sys_call(scpCmdStr) - - if len(formatterOption) > 0: - formatterOption = " --formatter " + formatterOption + " " - - # get zookeeper connect string - zkConnectStr = "" - if clusterName == "source": - zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] - elif clusterName == "target": - zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] - else: - logger.error("Invalid cluster name : " + clusterName, extra=d) - sys.exit(1) - cmdList = ["ssh " + hostname, - "'JAVA_HOME=" + javaHome, - "JMX_PORT=" + jmxPort, - kafkaHome + "/bin/kafka-run-class.sh kafka.tools.ConsoleConsumer", - "--zookeeper " + zkConnectStr, - "--topic " + topic, - "--consumer.config /tmp/consumer.properties", - "--csv-reporter-enabled", - formatterOption, - "--from-beginning", - " >> " + logPathName + "/" + logFile + " & echo pid:$! > ", - logPathName + "/entity_" + entityId + "_pid'"] - - cmdStr = " ".join(cmdList) - - logger.debug("executing command: [" + cmdStr + "]", extra=d) - system_test_utils.async_sys_call(cmdStr) - logger.info("sleeping for 5 seconds.", extra=d) - time.sleep(5) - - pidCmdStr = "ssh " + hostname + " 'cat " + logPathName + "/entity_" + entityId + "_pid' 2> /dev/null" - logger.debug("executing command: [" + pidCmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) - - # keep track of the remote entity pid in a dictionary - for line in subproc.stdout.readlines(): - if line.startswith("pid"): - line = line.rstrip('\n') - logger.debug("found pid line: [" + line + "]", extra=d) - tokens = line.split(':') - if role == "zookeeper": - testcaseEnv.entityZkParentPidDict[entityId] = tokens[1] - elif role == "broker": - testcaseEnv.entityBrokerParentPidDict[entityId] = tokens[1] - elif role == "mirror_maker": - testcaseEnv.entityMirrorMakerParentPidDict[entityId] = tokens[1] - elif role == "console_consumer": - testcaseEnv.entityConsoleConsumerParentPidDict[entityId] = tokens[1] - - -def start_console_consumer(systemTestEnv, testcaseEnv): - - clusterList = systemTestEnv.clusterEntityConfigDictList - - consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterList, "role", "console_consumer") - for consumerConfig in consumerConfigList: - host = consumerConfig["hostname"] - entityId = consumerConfig["entity_id"] - jmxPort = consumerConfig["jmx_port"] - role = consumerConfig["role"] - clusterName = consumerConfig["cluster_name"] - kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home") - javaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "java_home") - jmxPort = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "jmx_port") - kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" - - logger.info("starting console consumer", extra=d) - - consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") - metricsDir = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "metrics"), - - if host != "localhost": - consumerLogPath = replace_kafka_home(consumerLogPath, kafkaHome) - #metricsDir = replace_kafka_home(metricsDir, kafkaHome) - - consumerLogPathName = consumerLogPath + "/console_consumer.log" - - testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consumerLogPathName - - # testcase configurations: - testcaseList = testcaseEnv.testcaseConfigsList - - # get testcase arguments - # 1. topics - numTopicsForAutoGenString = -1 - try: - numTopicsForAutoGenString = int(testcaseEnv.testcaseArgumentsDict["num_topics_for_auto_generated_string"]) - except: - pass - - topic = "" - if numTopicsForAutoGenString < 0: - topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic") - else: - topic = generate_topics_string("topic", numTopicsForAutoGenString) - - # update this variable and will be used by data validation functions - testcaseEnv.consumerTopicsString = topic - - # 2. consumer timeout - timeoutMs = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "consumer-timeout-ms") - - # 3. consumer formatter - formatterOption = "" - try: - formatterOption = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "formatter") - except: - pass - - if len(formatterOption) > 0: - formatterOption = " --formatter " + formatterOption + " " - - # get zookeeper connect string - zkConnectStr = "" - if clusterName == "source": - zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] - elif clusterName == "target": - zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] - else: - logger.error("Invalid cluster name : " + clusterName, extra=d) - sys.exit(1) - - consumerProperties = {} - consumerProperties["consumer.timeout.ms"] = timeoutMs - props_file_path=write_consumer_properties(consumerProperties) - scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/" - logger.debug("executing command [" + scpCmdStr + "]", extra=d) - system_test_utils.sys_call(scpCmdStr) - - cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, - "JMX_PORT=" + jmxPort, - kafkaRunClassBin + " kafka.tools.ConsoleConsumer", - "--zookeeper " + zkConnectStr, - "--topic " + topic, - "--consumer.config /tmp/consumer.properties", - "--csv-reporter-enabled", - #"--metrics-dir " + metricsDir, - formatterOption, - "--from-beginning ", - " >> " + consumerLogPathName, - " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] - - cmdStr = " ".join(cmdList) - - logger.debug("executing command: [" + cmdStr + "]", extra=d) - system_test_utils.async_sys_call(cmdStr) - - pidCmdStr = "ssh " + host + " 'cat " + consumerLogPath + "/entity_" + entityId + "_pid'" - logger.debug("executing command: [" + pidCmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) - - # keep track of the remote entity pid in a dictionary - for line in subproc.stdout.readlines(): - if line.startswith("pid"): - line = line.rstrip('\n') - logger.debug("found pid line: [" + line + "]", extra=d) - tokens = line.split(':') - testcaseEnv.consumerHostParentPidDict[host] = tokens[1] - -def start_producer_performance(systemTestEnv, testcaseEnv, kafka07Client): - - entityConfigList = systemTestEnv.clusterEntityConfigDictList - testcaseConfigsList = testcaseEnv.testcaseConfigsList - brokerListStr = "" - - # construct "broker-list" for producer - for entityConfig in entityConfigList: - entityRole = entityConfig["role"] - if entityRole == "broker": - hostname = entityConfig["hostname"] - entityId = entityConfig["entity_id"] - port = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "port") - - producerConfigList = system_test_utils.get_dict_from_list_of_dicts(entityConfigList, "role", "producer_performance") - for producerConfig in producerConfigList: - host = producerConfig["hostname"] - entityId = producerConfig["entity_id"] - jmxPort = producerConfig["jmx_port"] - role = producerConfig["role"] - - thread.start_new_thread(start_producer_in_thread, (testcaseEnv, entityConfigList, producerConfig, kafka07Client)) - logger.debug("calling testcaseEnv.lock.acquire()", extra=d) - testcaseEnv.lock.acquire() - testcaseEnv.numProducerThreadsRunning += 1 - logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d) - time.sleep(1) - logger.debug("calling testcaseEnv.lock.release()", extra=d) - testcaseEnv.lock.release() - -def generate_topics_string(topicPrefix, numOfTopics): - # return a topics string in the following format: - # <topicPrefix>_0001,<topicPrefix>_0002,... - # eg. "topic_0001,topic_0002,...,topic_xxxx" - - topicsStr = "" - counter = 1 - idx = "1" - while counter <= numOfTopics: - if counter <= 9: - idx = "000" + str(counter) - elif counter <= 99: - idx = "00" + str(counter) - elif counter <= 999: - idx = "0" + str(counter) - elif counter <= 9999: - idx = str(counter) - else: - raise Exception("Error: no. of topics must be under 10000 - current topics count : " + counter) - - if len(topicsStr) == 0: - topicsStr = topicPrefix + "_" + idx - else: - topicsStr = topicsStr + "," + topicPrefix + "_" + idx - - counter += 1 - return topicsStr - -def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafka07Client): - host = producerConfig["hostname"] - entityId = producerConfig["entity_id"] - jmxPort = producerConfig["jmx_port"] - role = producerConfig["role"] - clusterName = producerConfig["cluster_name"] - kafkaHome = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "kafka_home") - javaHome = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "java_home") - jmxPort = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "jmx_port") - kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" - - # first keep track of its pid - testcaseEnv.producerHostParentPidDict[entityId] = os.getpid() - - # get optional testcase arguments - numTopicsForAutoGenString = -1 - try: - numTopicsForAutoGenString = int(testcaseEnv.testcaseArgumentsDict["num_topics_for_auto_generated_string"]) - except: - pass - - # testcase configurations: - testcaseConfigsList = testcaseEnv.testcaseConfigsList - topic = "" - if numTopicsForAutoGenString < 0: - topic = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "topic") - else: - topic = generate_topics_string("topic", numTopicsForAutoGenString) - - # update this variable and will be used by data validation functions - testcaseEnv.producerTopicsString = topic - - threads = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "threads") - compCodec = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "compression-codec") - messageSize = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message-size") - noMsgPerBatch = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message") - requestNumAcks = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "request-num-acks") - syncMode = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "sync") - useNewProducer = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "new-producer") - retryBackoffMs = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-retry-backoff-ms") - numOfRetries = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-num-retries") - - # for optional properties in testcase_xxxx_properties.json, - # check the length of returned value for those properties: - if len(retryBackoffMs) == 0: # no setting for "producer-retry-backoff-ms" - retryBackoffMs = "100" # default - if len(numOfRetries) == 0: # no setting for "producer-num-retries" - numOfRetries = "3" # default - - brokerListStr = "" - if clusterName == "source": - brokerListStr = testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"] - elif clusterName == "target": - brokerListStr = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] - else: - logger.error("Unknown cluster name: " + clusterName, extra=d) - sys.exit(1) - - logger.info("starting producer preformance", extra=d) - - producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") - metricsDir = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "metrics") - - if host != "localhost": - producerLogPath = replace_kafka_home(producerLogPath, kafkaHome) - metricsDir = replace_kafka_home(metricsDir, kafkaHome) - - producerLogPathName = producerLogPath + "/producer_performance.log" - - testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName - - counter = 0 - producerSleepSec = int(testcaseEnv.testcaseArgumentsDict["sleep_seconds_between_producer_calls"]) - - boolArgumentsStr = "" - if syncMode.lower() == "true": - boolArgumentsStr = boolArgumentsStr + " --sync" - if useNewProducer.lower() == "true": - boolArgumentsStr = boolArgumentsStr + " --new-producer" - - # keep calling producer until signaled to stop by: - # testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] - while 1: - logger.debug("calling testcaseEnv.lock.acquire()", extra=d) - testcaseEnv.lock.acquire() - if not testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]: - initMsgId = counter * int(noMsgPerBatch) - - logger.info("#### [producer thread] status of stopBackgroundProducer : [False] => producing [" \ - + str(noMsgPerBatch) + "] messages with starting message id : [" + str(initMsgId) + "]", extra=d) - - cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, - "JMX_PORT=" + jmxPort, - "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome, - kafkaRunClassBin + " kafka.tools.ProducerPerformance", - "--broker-list " + brokerListStr, - "--initial-message-id " + str(initMsgId), - "--messages " + noMsgPerBatch, - "--topics " + topic, - "--threads " + threads, - "--compression-codec " + compCodec, - "--message-size " + messageSize, - "--request-num-acks " + requestNumAcks, - "--producer-retry-backoff-ms " + retryBackoffMs, - "--producer-num-retries " + numOfRetries, - "--csv-reporter-enabled", - "--metrics-dir " + metricsDir, - boolArgumentsStr, - " >> " + producerLogPathName, - " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid", - " & wait'"] - - if kafka07Client: - cmdList[:] = [] - - brokerInfoStr = "" - tokenList = brokerListStr.split(',') - index = 1 - for token in tokenList: - if len(brokerInfoStr) == 0: - brokerInfoStr = str(index) + ":" + token - else: - brokerInfoStr += "," + str(index) + ":" + token - index += 1 - - brokerInfoStr = "broker.list=" + brokerInfoStr - - cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, - "JMX_PORT=" + jmxPort, - "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome, - kafkaRunClassBin + " kafka.tools.ProducerPerformance", - "--brokerinfo " + brokerInfoStr, - "--initial-message-id " + str(initMsgId), - "--messages " + noMsgPerBatch, - "--topic " + topic, - "--threads " + threads, - "--compression-codec " + compCodec, - "--message-size " + messageSize, - "--vary-message-size --async", - " >> " + producerLogPathName, - " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid", - " & wait'"] - - cmdStr = " ".join(cmdList) - logger.debug("executing command: [" + cmdStr + "]", extra=d) - - subproc = system_test_utils.sys_call_return_subproc(cmdStr) - logger.debug("waiting for producer to finish", extra=d) - subproc.communicate() - logger.debug("producer finished", extra=d) - else: - testcaseEnv.numProducerThreadsRunning -= 1 - logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d) - logger.debug("calling testcaseEnv.lock.release()", extra=d) - testcaseEnv.lock.release() - break - - counter += 1 - logger.debug("calling testcaseEnv.lock.release()", extra=d) - testcaseEnv.lock.release() - time.sleep(int(producerSleepSec)) - - # wait until other producer threads also stops and - # let the main testcase know all producers have stopped - while 1: - logger.debug("calling testcaseEnv.lock.acquire()", extra=d) - testcaseEnv.lock.acquire() - time.sleep(1) - if testcaseEnv.numProducerThreadsRunning == 0: - testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = True - logger.debug("calling testcaseEnv.lock.release()", extra=d) - testcaseEnv.lock.release() - break - else: - logger.debug("waiting for TRUE of testcaseEnv.userDefinedEnvVarDict['backgroundProducerStopped']", extra=d) - logger.debug("calling testcaseEnv.lock.release()", extra=d) - testcaseEnv.lock.release() - time.sleep(1) - - # finally remove itself from the tracking pids - del testcaseEnv.producerHostParentPidDict[entityId] - -def stop_remote_entity(systemTestEnv, entityId, parentPid, signalType="SIGTERM"): - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - - hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname") - pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid) - - logger.info("terminating (" + signalType + ") process id: " + parentPid + " in host: " + hostname, extra=d) - - if signalType.lower() == "sigterm": - system_test_utils.sigterm_remote_process(hostname, pidStack) - elif signalType.lower() == "sigkill": - system_test_utils.sigkill_remote_process(hostname, pidStack) - else: - logger.error("Invalid signal type: " + signalType, extra=d) - raise Exception("Invalid signal type: " + signalType) - - -def force_stop_remote_entity(systemTestEnv, entityId, parentPid): - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - - hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname") - pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid) - - logger.debug("terminating process id: " + parentPid + " in host: " + hostname, extra=d) - system_test_utils.sigkill_remote_process(hostname, pidStack) - - -def create_topic_for_producer_performance(systemTestEnv, testcaseEnv): - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - - prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance") - - for prodPerfCfg in prodPerfCfgList: - topicsStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfg["entity_id"], "topic") - zkEntityId = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "entity_id") - zkHost = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname") - kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home") - javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home") - createTopicBin = kafkaHome + "/bin/kafka-topics.sh --create" - - logger.debug("zkEntityId : " + zkEntityId, extra=d) - logger.debug("createTopicBin : " + createTopicBin, extra=d) - - zkConnectStr = "" - topicsList = topicsStr.split(',') - - if len(testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]) > 0: - zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] - elif len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) > 0: - zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] - else: - raise Exception("Empty zkConnectStr found") - - testcaseBaseDir = testcaseEnv.testCaseBaseDir - - if zkHost != "localhost": - testcaseBaseDir = replace_kafka_home(testcaseBaseDir, kafkaHome) - - for topic in topicsList: - logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d) - cmdList = ["ssh " + zkHost, - "'JAVA_HOME=" + javaHome, - createTopicBin, - " --topic " + topic, - " --zookeeper " + zkConnectStr, - " --replication-factor " + testcaseEnv.testcaseArgumentsDict["replica_factor"], - " --partitions " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ", - testcaseBaseDir + "/logs/create_source_cluster_topic.log'"] - - cmdStr = " ".join(cmdList) - logger.debug("executing command: [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) - -def create_topic(systemTestEnv, testcaseEnv, topic, replication_factor, num_partitions): - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - zkEntityId = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "entity_id") - kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home") - javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home") - createTopicBin = kafkaHome + "/bin/kafka-topics.sh --create" - zkConnectStr = "" - zkHost = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname") - if len(testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]) > 0: - zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] - elif len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) > 0: - zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] - else: - raise Exception("Empty zkConnectStr found") - - testcaseBaseDir = testcaseEnv.testCaseBaseDir - - testcaseBaseDir = replace_kafka_home(testcaseBaseDir, kafkaHome) - - logger.debug("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d) - cmdList = ["ssh " + zkHost, - "'JAVA_HOME=" + javaHome, - createTopicBin, - " --topic " + topic, - " --zookeeper " + zkConnectStr, - " --replication-factor " + str(replication_factor), - " --partitions " + str(num_partitions) + " >> ", - testcaseBaseDir + "/logs/create_source_cluster_topic.log'"] - - cmdStr = " ".join(cmdList) - logger.info("executing command: [" + cmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(cmdStr) - - - -def get_message_id(logPathName, topic=""): - logLines = open(logPathName, "r").readlines() - messageIdList = [] - - for line in logLines: - if not "MessageID" in line: - continue - else: - matchObj = re.match('.*Topic:(.*?):.*:MessageID:(.*?):', line) - if len(topic) == 0: - messageIdList.append( matchObj.group(2) ) - else: - if topic == matchObj.group(1): - messageIdList.append( matchObj.group(2) ) - - return messageIdList - -def get_message_checksum(logPathName): - logLines = open(logPathName, "r").readlines() - messageChecksumList = [] - - for line in logLines: - if not "checksum:" in line: - continue - else: - matchObj = re.match('.*checksum:(\d*).*', line) - if matchObj is not None: - checksum = matchObj.group(1) - messageChecksumList.append( checksum ) - else: - logger.error("unexpected log line : " + line, extra=d) - - return messageChecksumList - - -def validate_data_matched(systemTestEnv, testcaseEnv, replicationUtils): - logger.info("#### Inside validate_data_matched", extra=d) - - validationStatusDict = testcaseEnv.validationStatusDict - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - - prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance") - consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer") - - consumerDuplicateCount = 0 - - for prodPerfCfg in prodPerfCfgList: - producerEntityId = prodPerfCfg["entity_id"] - topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") - logger.debug("working on topic : " + topic, extra=d) - acks = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks") - - consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \ - clusterEntityConfigDictList, "role", "console_consumer", "entity_id") - - matchingConsumerEntityId = None - for consumerEntityId in consumerEntityIdList: - consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") - if consumerTopic in topic: - matchingConsumerEntityId = consumerEntityId - logger.info("matching consumer entity id found", extra=d) - break - - if matchingConsumerEntityId is None: - logger.info("matching consumer entity id NOT found", extra=d) - break - - msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( \ - testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") + "/msg_id_missing_in_consumer.log" - producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default") - producerLogPathName = producerLogPath + "/producer_performance.log" - - consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") - consumerLogPathName = consumerLogPath + "/console_consumer.log" - - producerMsgIdList = get_message_id(producerLogPathName) - consumerMsgIdList = get_message_id(consumerLogPathName) - producerMsgIdSet = set(producerMsgIdList) - consumerMsgIdSet = set(consumerMsgIdList) - - consumerDuplicateCount = len(consumerMsgIdList) - len(consumerMsgIdSet) - missingUniqConsumerMsgId = system_test_utils.subtract_list(producerMsgIdSet, consumerMsgIdSet) - - outfile = open(msgIdMissingInConsumerLogPathName, "w") - for id in missingUniqConsumerMsgId: - outfile.write(id + "\n") - outfile.close() - - logger.info("no. of unique messages on topic [" + topic + "] sent from publisher : " + str(len(producerMsgIdSet)), extra=d) - logger.info("no. of unique messages on topic [" + topic + "] received by consumer : " + str(len(consumerMsgIdSet)), extra=d) - validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet)) - validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet)) - - missingPercentage = len(missingUniqConsumerMsgId) * 100.00 / len(producerMsgIdSet) - logger.info("Data loss threshold % : " + str(replicationUtils.ackOneDataLossThresholdPercent), extra=d) - logger.warn("Data loss % on topic : " + topic + " : " + str(missingPercentage), extra=d) - - if ( len(missingUniqConsumerMsgId) == 0 and len(producerMsgIdSet) > 0 ): - validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" - elif (acks == "1"): - if missingPercentage <= replicationUtils.ackOneDataLossThresholdPercent: - validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" - logger.warn("Test case (Acks = 1) passes with less than " + str(replicationUtils.ackOneDataLossThresholdPercent) \ - + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d) - else: - validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" - logger.error("Test case (Acks = 1) failed with more than " + str(replicationUtils.ackOneDataLossThresholdPercent) \ - + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d) - else: - validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" - logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d) - - -def validate_leader_election_successful(testcaseEnv, leaderDict, validationStatusDict): - logger.debug("#### Inside validate_leader_election_successful", extra=d) - - if ( len(leaderDict) > 0 ): - try: - leaderBrokerId = leaderDict["brokerid"] - leaderEntityId = leaderDict["entity_id"] - leaderPid = testcaseEnv.entityBrokerParentPidDict[leaderEntityId] - hostname = leaderDict["hostname"] - - logger.info("found leader in entity [" + leaderEntityId + "] with brokerid [" + \ - leaderBrokerId + "] for partition [" + leaderDict["partition"] + "]", extra=d) - validationStatusDict["Validate leader election successful"] = "PASSED" - return True - except Exception, e: - logger.error("leader info not completed: {0}".format(e), extra=d) - traceback.print_exc() - print leaderDict - traceback.print_exc() - validationStatusDict["Validate leader election successful"] = "FAILED" - return False - else: - validationStatusDict["Validate leader election successful"] = "FAILED" - return False - - -def cleanup_data_at_remote_hosts(systemTestEnv, testcaseEnv): - - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - testcaseConfigsList = testcaseEnv.testcaseConfigsList - testCaseBaseDir = testcaseEnv.testCaseBaseDir - - # clean up the following directories in localhost - # system_test/<xxxx_testsuite>/testcase_xxxx/config - # system_test/<xxxx_testsuite>/testcase_xxxx/dashboards - # system_test/<xxxx_testsuite>/testcase_xxxx/logs - logger.info("cleaning up test case dir: [" + testCaseBaseDir + "]", extra=d) - - if "system_test" not in testCaseBaseDir: - # logger.warn("possible destructive command [" + cmdStr + "]", extra=d) - logger.warn("check config file: system_test/cluster_config.properties", extra=d) - logger.warn("aborting test...", extra=d) - sys.exit(1) - else: - system_test_utils.sys_call("rm -rf " + testCaseBaseDir + "/config/*") - system_test_utils.sys_call("rm -rf " + testCaseBaseDir + "/dashboards/*") - system_test_utils.sys_call("rm -rf " + testCaseBaseDir + "/logs/*") - - for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: - - hostname = clusterEntityConfigDict["hostname"] - entityId = clusterEntityConfigDict["entity_id"] - role = clusterEntityConfigDict["role"] - kafkaHome = clusterEntityConfigDict["kafka_home"] - cmdStr = "" - dataDir = "" - - if hostname == "localhost": - remoteTestCaseBaseDir = testCaseBaseDir - else: - remoteTestCaseBaseDir = replace_kafka_home(testCaseBaseDir, kafkaHome) - - logger.info("cleaning up data dir on host: [" + hostname + "]", extra=d) - - if role == 'zookeeper': - dataDir = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "dataDir") - elif role == 'broker': - dataDir = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log.dir") - else: - logger.info("skipping role [" + role + "] on host : [" + hostname + "]", extra=d) - continue - - cmdStr = "ssh " + hostname + " 'rm -rf " + dataDir + "'" - - if not dataDir.startswith("/tmp"): - logger.warn("possible destructive command [" + cmdStr + "]", extra=d) - logger.warn("check config file: system_test/cluster_config.properties", extra=d) - logger.warn("aborting test...", extra=d) - sys.exit(1) - - # ============================ - # cleaning data dir - # ============================ - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - # ============================ - # cleaning log/metrics/svg, ... - # ============================ - if system_test_utils.remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"): - # so kafkaHome is a real kafka installation - cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.log' | xargs rm 2> /dev/null\"" - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*_pid' | xargs rm 2> /dev/null\"" - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.csv' | xargs rm 2> /dev/null\"" - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.svg' | xargs rm 2> /dev/null\"" - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - - cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.html' | xargs rm 2> /dev/null\"" - logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) - -def replace_kafka_home(systemTestSubDirPath, kafkaHome): - matchObj = re.match(".*(\/system_test\/.*)$", systemTestSubDirPath) - relativeSubDirPath = matchObj.group(1) - return kafkaHome + relativeSubDirPath - -def get_entity_log_directory(testCaseBaseDir, entity_id, role): - return testCaseBaseDir + "/logs/" + role + "-" + entity_id - -def get_entities_for_role(clusterConfig, role): - return filter(lambda entity: entity['role'] == role, clusterConfig) - -def stop_consumer(): - system_test_utils.sys_call("ps -ef | grep ConsoleConsumer | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15") - -def ps_grep_terminate_running_entity(systemTestEnv): - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - username = getpass.getuser() - - for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList: - hostname = clusterEntityConfigDict["hostname"] - cmdList = ["ssh " + hostname, - "\"ps auxw | grep -v grep | grep -v Bootstrap | grep -v vim | grep ^" + username, - "| grep -i 'java\|server\-start\|run\-\|producer\|consumer\|jmxtool' | grep kafka", - "| tr -s ' ' | cut -f2 -d ' ' | xargs kill -9" + "\""] - - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr + "]", extra=d) - - system_test_utils.sys_call(cmdStr) - -def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict, leaderAttributesDict): - leaderEntityId = None - leaderBrokerId = None - leaderPPid = None - shutdownLeaderTimestamp = None - leaderReElectionLatency = -1 - - if testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED": - # leader election is not successful - something is wrong => so skip this testcase - return None - else: - # leader elected => stop leader - try: - leaderEntityId = leaderDict["entity_id"] - leaderBrokerId = leaderDict["brokerid"] - leaderPPid = testcaseEnv.entityBrokerParentPidDict[leaderEntityId] - except: - logger.info("leader details unavailable", extra=d) - raise - - logger.info("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid, extra=d) - signalType = None - try: - signalType = testcaseEnv.testcaseArgumentsDict["signal_type"] - except: - pass - - if signalType is None or signalType.lower() == "sigterm": - stop_remote_entity(systemTestEnv, leaderEntityId, leaderPPid) - elif signalType.lower() == "sigkill": - stop_remote_entity(systemTestEnv, leaderEntityId, leaderPPid, "SIGKILL") - else: - logger.error("Unsupported signal type: " + signalType, extra=d) - raise Exception("Unsupported signal type: " + signalType) - - logger.info("sleeping for 10s for leader re-election to complete", extra=d) - time.sleep(10) - - # get broker shut down completed timestamp - shutdownBrokerDict = get_broker_shutdown_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict) - shutdownTimestamp = -1 - - try: - shutdownTimestamp = shutdownBrokerDict["timestamp"] - logger.debug("unix timestamp of shut down completed: " + str("{0:.6f}".format(shutdownTimestamp)), extra=d) - except: - logger.warn("unable to find broker shut down timestamp", extra=d) - - logger.info("looking up new leader", extra=d) - leaderDict2 = get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict) - logger.debug("unix timestamp of new elected leader: " + str("{0:.6f}".format(leaderDict2["timestamp"])), extra=d) - - if shutdownTimestamp > 0: - leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownTimestamp) - logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec", extra=d) - - return leaderReElectionLatency - - -def stop_all_remote_running_processes(systemTestEnv, testcaseEnv): - - entityConfigs = systemTestEnv.clusterEntityConfigDictList - - # If there are any alive local threads that keep starting remote producer performance, we need to kill them; - # note we do not need to stop remote processes since they will terminate themselves eventually. - if len(testcaseEnv.producerHostParentPidDict) != 0: - # ============================================= - # tell producer to stop - # ============================================= - logger.debug("calling testcaseEnv.lock.acquire()", extra=d) - testcaseEnv.lock.acquire() - testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True - logger.debug("calling testcaseEnv.lock.release()", extra=d) - testcaseEnv.lock.release() - - # ============================================= - # wait for producer thread's update of - # "backgroundProducerStopped" to be "True" - # ============================================= - while 1: - logger.debug("calling testcaseEnv.lock.acquire()", extra=d) - testcaseEnv.lock.acquire() - logger.info("status of backgroundProducerStopped : [" + \ - str(testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=d) - if testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]: - logger.debug("calling testcaseEnv.lock.release()", extra=d) - testcaseEnv.lock.release() - logger.info("all producer threads completed", extra=d) - break - logger.debug("calling testcaseEnv.lock.release()", extra=d) - testcaseEnv.lock.release() - - testcaseEnv.producerHostParentPidDict.clear() - - for hostname, consumerPPid in testcaseEnv.consumerHostParentPidDict.items(): - consumerEntityId = system_test_utils.get_data_by_lookup_keyval(entityConfigs, "hostname", hostname, "entity_id") - stop_remote_entity(systemTestEnv, consumerEntityId, consumerPPid) - - for entityId, jmxParentPidList in testcaseEnv.entityJmxParentPidDict.items(): - for jmxParentPid in jmxParentPidList: - stop_remote_entity(systemTestEnv, entityId, jmxParentPid) - - for entityId, mirrorMakerParentPid in testcaseEnv.entityMirrorMakerParentPidDict.items(): - stop_remote_entity(systemTestEnv, entityId, mirrorMakerParentPid) - - for entityId, consumerParentPid in testcaseEnv.entityConsoleConsumerParentPidDict.items(): - stop_remote_entity(systemTestEnv, entityId, consumerParentPid) - - for entityId, brokerParentPid in testcaseEnv.entityBrokerParentPidDict.items(): - stop_remote_entity(systemTestEnv, entityId, brokerParentPid) - - for entityId, zkParentPid in testcaseEnv.entityZkParentPidDict.items(): - stop_remote_entity(systemTestEnv, entityId, zkParentPid) - - -def start_migration_tool(systemTestEnv, testcaseEnv, onlyThisEntityId=None): - clusterConfigList = systemTestEnv.clusterEntityConfigDictList - migrationToolConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", "migration_tool") - - for migrationToolConfig in migrationToolConfigList: - - entityId = migrationToolConfig["entity_id"] - - if onlyThisEntityId is None or entityId == onlyThisEntityId: - - host = migrationToolConfig["hostname"] - jmxPort = migrationToolConfig["jmx_port"] - role = migrationToolConfig["role"] - kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "kafka_home") - javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "java_home") - jmxPort = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "jmx_port") - kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" - - logger.info("starting kafka migration tool", extra=d) - migrationToolLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "migration_tool", entityId, "default") - migrationToolLogPathName = migrationToolLogPath + "/migration_tool.log" - testcaseEnv.userDefinedEnvVarDict["migrationToolLogPathName"] = migrationToolLogPathName - - testcaseConfigsList = testcaseEnv.testcaseConfigsList - numProducers = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "num.producers") - numStreams = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "num.streams") - producerConfig = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer.config") - consumerConfig = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "consumer.config") - zkClientJar = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "zkclient.01.jar") - kafka07Jar = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "kafka.07.jar") - whiteList = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "whitelist") - logFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename") - - cmdList = ["ssh " + host, - "'JAVA_HOME=" + javaHome, -
<TRUNCATED>
