Repository: kafka Updated Branches: refs/heads/trunk c001b2040 -> d50499a0e
http://git-wip-us.apache.org/repos/asf/kafka/blob/d50499a0/system_test/utils/metrics.py ---------------------------------------------------------------------- diff --git a/system_test/utils/metrics.py b/system_test/utils/metrics.py deleted file mode 100644 index 3e66348..0000000 --- a/system_test/utils/metrics.py +++ /dev/null @@ -1,298 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -#!/usr/bin/env python - -# =================================== -# file: metrics.py -# =================================== - -import inspect -import json -import logging -import os -import signal -import subprocess -import sys -import traceback - -import csv -import time -import matplotlib as mpl -mpl.use('Agg') -import matplotlib.pyplot as plt -from collections import namedtuple -import numpy - -from pyh import * -import kafka_system_test_utils -import system_test_utils - -logger = logging.getLogger("namedLogger") -thisClassName = '(metrics)' -d = {'name_of_class': thisClassName} - -attributeNameToNameInReportedFileMap = { - 'Min': 'min', - 'Max': 'max', - 'Mean': 'mean', - '50thPercentile': 'median', - 'StdDev': 'stddev', - '95thPercentile': '95%', - '99thPercentile': '99%', - '999thPercentile': '99.9%', - 'Count': 'count', - 'OneMinuteRate': '1 min rate', - 'MeanRate': 'mean rate', - 'FiveMinuteRate': '5 min rate', - 'FifteenMinuteRate': '15 min rate', - 'Value': 'value' -} - -def getCSVFileNameFromMetricsMbeanName(mbeanName): - return mbeanName.replace(":type=", ".").replace(",name=", ".") + ".csv" - -def read_metrics_definition(metricsFile): - metricsFileData = open(metricsFile, "r").read() - metricsJsonData = json.loads(metricsFileData) - allDashboards = metricsJsonData['dashboards'] - allGraphs = [] - for dashboard in allDashboards: - dashboardName = dashboard['name'] - graphs = dashboard['graphs'] - for graph in graphs: - bean = graph['bean_name'] - allGraphs.append(graph) - attributes = graph['attributes'] - #print "Filtering on attributes " + attributes - return allGraphs - -def get_dashboard_definition(metricsFile, role): - metricsFileData = open(metricsFile, "r").read() - metricsJsonData = json.loads(metricsFileData) - allDashboards = metricsJsonData['dashboards'] - dashboardsForRole = [] - for dashboard in allDashboards: - if dashboard['role'] == role: - dashboardsForRole.append(dashboard) - return dashboardsForRole - -def ensure_valid_headers(headers, attributes): - if headers[0] != "# time": - raise Exception("First column should be time") - for header in headers: - logger.debug(header, extra=d) - # there should be exactly one column with a name that matches attributes - try: - attributeColumnIndex = headers.index(attributes) - return attributeColumnIndex - except ValueError as ve: - #print "#### attributes : ", attributes - #print "#### headers : ", headers - raise Exception("There should be exactly one column that matches attribute: {0} in".format(attributes) + - " headers: {0}".format(",".join(headers))) - -def plot_graphs(inputCsvFiles, labels, title, xLabel, yLabel, attribute, outputGraphFile): - if not inputCsvFiles: return - - # create empty plot - fig=plt.figure() - fig.subplots_adjust(bottom=0.2) - ax=fig.add_subplot(111) - labelx = -0.3 # axes coords - ax.set_xlabel(xLabel) - ax.set_ylabel(yLabel) - ax.grid() - #ax.yaxis.set_label_coords(labelx, 0.5) - Coordinates = namedtuple("Coordinates", 'x y') - plots = [] - coordinates = [] - # read data for all files, organize by label in a dict - for fileAndLabel in zip(inputCsvFiles, labels): - inputCsvFile = fileAndLabel[0] - label = fileAndLabel[1] - csv_reader = list(csv.reader(open(inputCsvFile, "rb"))) - x,y = [],[] - xticks_labels = [] - try: - # read first line as the headers - headers = csv_reader.pop(0) - attributeColumnIndex = ensure_valid_headers(headers, attributeNameToNameInReportedFileMap[attribute]) - logger.debug("Column index for attribute {0} is {1}".format(attribute, attributeColumnIndex), extra=d) - start_time = (int)(os.path.getctime(inputCsvFile) * 1000) - int(csv_reader[0][0]) - for line in csv_reader: - if(len(line) == 0): - continue - yVal = float(line[attributeColumnIndex]) - xVal = int(line[0]) - y.append(yVal) - epoch= start_time + int(line[0]) - x.append(xVal) - xticks_labels.append(time.strftime("%H:%M:%S", time.localtime(epoch))) - coordinates.append(Coordinates(xVal, yVal)) - p1 = ax.plot(x,y) - plots.append(p1) - except Exception as e: - logger.error("ERROR while plotting data for {0}: {1}".format(inputCsvFile, e), extra=d) - traceback.print_exc() - # find xmin, xmax, ymin, ymax from all csv files - xmin = min(map(lambda coord: coord.x, coordinates)) - xmax = max(map(lambda coord: coord.x, coordinates)) - ymin = min(map(lambda coord: coord.y, coordinates)) - ymax = max(map(lambda coord: coord.y, coordinates)) - # set x and y axes limits - plt.xlim(xmin, xmax) - plt.ylim(ymin, ymax) - # set ticks accordingly - xticks = numpy.arange(xmin, xmax, 0.2*xmax) -# yticks = numpy.arange(ymin, ymax) - plt.xticks(xticks,xticks_labels,rotation=17) -# plt.yticks(yticks) - plt.legend(plots,labels, loc=2) - plt.title(title) - plt.savefig(outputGraphFile) - -def draw_all_graphs(metricsDescriptionFile, testcaseEnv, clusterConfig): - # go through each role and plot graphs for the role's metrics - roles = set(map(lambda config: config['role'], clusterConfig)) - for role in roles: - dashboards = get_dashboard_definition(metricsDescriptionFile, role) - entities = kafka_system_test_utils.get_entities_for_role(clusterConfig, role) - for dashboard in dashboards: - graphs = dashboard['graphs'] - # draw each graph for all entities - draw_graph_for_role(graphs, entities, role, testcaseEnv) - -def draw_graph_for_role(graphs, entities, role, testcaseEnv): - for graph in graphs: - graphName = graph['graph_name'] - yLabel = graph['y_label'] - inputCsvFiles = [] - graphLegendLabels = [] - for entity in entities: - entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entity['entity_id'], "metrics") - entityMetricCsvFile = entityMetricsDir + "/" + getCSVFileNameFromMetricsMbeanName(graph['bean_name']) - if(not os.path.exists(entityMetricCsvFile)): - logger.warn("The file {0} does not exist for plotting".format(entityMetricCsvFile), extra=d) - else: - inputCsvFiles.append(entityMetricCsvFile) - graphLegendLabels.append(role + "-" + entity['entity_id']) -# print "Plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id']) - try: - # plot one graph per mbean attribute - labels = graph['y_label'].split(',') - fullyQualifiedAttributeNames = map(lambda attribute: graph['bean_name'] + ':' + attribute, - graph['attributes'].split(',')) - attributes = graph['attributes'].split(',') - for labelAndAttribute in zip(labels, fullyQualifiedAttributeNames, attributes): - outputGraphFile = testcaseEnv.testCaseDashboardsDir + "/" + role + "/" + labelAndAttribute[1] + ".svg" - plot_graphs(inputCsvFiles, graphLegendLabels, graph['graph_name'] + '-' + labelAndAttribute[2], - "time", labelAndAttribute[0], labelAndAttribute[2], outputGraphFile) -# print "Finished plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id']) - except Exception as e: - logger.error("ERROR while plotting graph {0}: {1}".format(outputGraphFile, e), extra=d) - traceback.print_exc() - -def build_all_dashboards(metricsDefinitionFile, testcaseDashboardsDir, clusterConfig): - metricsHtmlFile = testcaseDashboardsDir + "/metrics.html" - centralDashboard = PyH('Kafka Metrics Dashboard') - centralDashboard << h1('Kafka Metrics Dashboard', cl='center') - roles = set(map(lambda config: config['role'], clusterConfig)) - for role in roles: - entities = kafka_system_test_utils.get_entities_for_role(clusterConfig, role) - dashboardPagePath = build_dashboard_for_role(metricsDefinitionFile, role, - entities, testcaseDashboardsDir) - centralDashboard << a(role, href = dashboardPagePath) - centralDashboard << br() - - centralDashboard.printOut(metricsHtmlFile) - -def build_dashboard_for_role(metricsDefinitionFile, role, entities, testcaseDashboardsDir): - # build all dashboards for the input entity's based on its role. It can be one of kafka, zookeeper, producer - # consumer - dashboards = get_dashboard_definition(metricsDefinitionFile, role) - entityDashboard = PyH('Kafka Metrics Dashboard for ' + role) - entityDashboard << h1('Kafka Metrics Dashboard for ' + role, cl='center') - entityDashboardHtml = testcaseDashboardsDir + "/" + role + "-dashboards.html" - for dashboard in dashboards: - # place the graph svg files in this dashboard - allGraphs = dashboard['graphs'] - for graph in allGraphs: - attributes = map(lambda attribute: graph['bean_name'] + ':' + attribute, - graph['attributes'].split(',')) - for attribute in attributes: - graphFileLocation = testcaseDashboardsDir + "/" + role + "/" + attribute + ".svg" - entityDashboard << embed(src = graphFileLocation, type = "image/svg+xml") - entityDashboard.printOut(entityDashboardHtml) - return entityDashboardHtml - -def start_metrics_collection(jmxHost, jmxPort, role, entityId, systemTestEnv, testcaseEnv): - logger.info("starting metrics collection on jmx port : " + jmxPort, extra=d) - jmxUrl = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi" - clusterConfig = systemTestEnv.clusterEntityConfigDictList - metricsDefinitionFile = systemTestEnv.METRICS_PATHNAME - entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics") - dashboardsForRole = get_dashboard_definition(metricsDefinitionFile, role) - mbeansForRole = get_mbeans_for_role(dashboardsForRole) - - kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "kafka_home") - javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "java_home") - - for mbean in mbeansForRole: - outputCsvFile = entityMetricsDir + "/" + mbean + ".csv" - startMetricsCmdList = ["ssh " + jmxHost, - "'JAVA_HOME=" + javaHome, - "JMX_PORT= " + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool", - "--jmx-url " + jmxUrl, - "--object-name " + mbean + " 1> ", - outputCsvFile + " & echo pid:$! > ", - entityMetricsDir + "/entity_pid'"] - - startMetricsCommand = " ".join(startMetricsCmdList) - logger.debug("executing command: [" + startMetricsCommand + "]", extra=d) - system_test_utils.async_sys_call(startMetricsCommand) - time.sleep(1) - - pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid' 2> /dev/null" - logger.debug("executing command: [" + pidCmdStr + "]", extra=d) - subproc = system_test_utils.sys_call_return_subproc(pidCmdStr) - - # keep track of JMX ppid in a dictionary of entity_id to list of JMX ppid - # testcaseEnv.entityJmxParentPidDict: - # key: entity_id - # val: list of JMX ppid associated to that entity_id - # { 1: [1234, 1235, 1236], 2: [2234, 2235, 2236], ... } - for line in subproc.stdout.readlines(): - line = line.rstrip('\n') - logger.debug("line: [" + line + "]", extra=d) - if line.startswith("pid"): - logger.debug("found pid line: [" + line + "]", extra=d) - tokens = line.split(':') - thisPid = tokens[1] - if entityId not in testcaseEnv.entityJmxParentPidDict: - testcaseEnv.entityJmxParentPidDict[entityId] = [] - testcaseEnv.entityJmxParentPidDict[entityId].append(thisPid) - #print "\n#### testcaseEnv.entityJmxParentPidDict ", testcaseEnv.entityJmxParentPidDict, "\n" - - -def stop_metrics_collection(jmxHost, jmxPort): - logger.info("stopping metrics collection on " + jmxHost + ":" + jmxPort, extra=d) - system_test_utils.sys_call("ps -ef | grep JmxTool | grep -v grep | grep " + jmxPort + " | awk '{print $2}' | xargs kill -9") - -def get_mbeans_for_role(dashboardsForRole): - graphs = reduce(lambda x,y: x+y, map(lambda dashboard: dashboard['graphs'], dashboardsForRole)) - return set(map(lambda metric: metric['bean_name'], graphs)) http://git-wip-us.apache.org/repos/asf/kafka/blob/d50499a0/system_test/utils/pyh.py ---------------------------------------------------------------------- diff --git a/system_test/utils/pyh.py b/system_test/utils/pyh.py deleted file mode 100644 index cff06f4..0000000 --- a/system_test/utils/pyh.py +++ /dev/null @@ -1,161 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# @file: pyh.py -# @purpose: a HTML tag generator -# @author: Emmanuel Turlay <[email protected]> - -__doc__ = """The pyh.py module is the core of the PyH package. PyH lets you -generate HTML tags from within your python code. -See http://code.google.com/p/pyh/ for documentation. -""" -__author__ = "Emmanuel Turlay <[email protected]>" -__version__ = '$Revision: 63 $' -__date__ = '$Date: 2010-05-21 03:09:03 +0200 (Fri, 21 May 2010) $' - -from sys import _getframe, stdout, modules, version -nOpen={} - -nl = '\n' -doctype = '<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">\n' -charset = '<meta http-equiv="Content-Type" content="text/html;charset=utf-8" />\n' - -tags = ['html', 'body', 'head', 'link', 'meta', 'div', 'p', 'form', 'legend', - 'input', 'select', 'span', 'b', 'i', 'option', 'img', 'script', - 'table', 'tr', 'td', 'th', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', - 'fieldset', 'a', 'title', 'body', 'head', 'title', 'script', 'br', 'table', - 'ul', 'li', 'ol', 'embed'] - -selfClose = ['input', 'img', 'link', 'br'] - -class Tag(list): - tagname = '' - - def __init__(self, *arg, **kw): - self.attributes = kw - if self.tagname : - name = self.tagname - self.isSeq = False - else: - name = 'sequence' - self.isSeq = True - self.id = kw.get('id', name) - #self.extend(arg) - for a in arg: self.addObj(a) - - def __iadd__(self, obj): - if isinstance(obj, Tag) and obj.isSeq: - for o in obj: self.addObj(o) - else: self.addObj(obj) - return self - - def addObj(self, obj): - if not isinstance(obj, Tag): obj = str(obj) - id=self.setID(obj) - setattr(self, id, obj) - self.append(obj) - - def setID(self, obj): - if isinstance(obj, Tag): - id = obj.id - n = len([t for t in self if isinstance(t, Tag) and t.id.startswith(id)]) - else: - id = 'content' - n = len([t for t in self if not isinstance(t, Tag)]) - if n: id = '%s_%03i' % (id, n) - if isinstance(obj, Tag): obj.id = id - return id - - def __add__(self, obj): - if self.tagname: return Tag(self, obj) - self.addObj(obj) - return self - - def __lshift__(self, obj): - self += obj - if isinstance(obj, Tag): return obj - - def render(self): - result = '' - if self.tagname: - result = '<%s%s%s>' % (self.tagname, self.renderAtt(), self.selfClose()*' /') - if not self.selfClose(): - for c in self: - if isinstance(c, Tag): - result += c.render() - else: result += c - if self.tagname: - result += '</%s>' % self.tagname - result += '\n' - return result - - def renderAtt(self): - result = '' - for n, v in self.attributes.iteritems(): - if n != 'txt' and n != 'open': - if n == 'cl': n = 'class' - result += ' %s="%s"' % (n, v) - return result - - def selfClose(self): - return self.tagname in selfClose - -def TagFactory(name): - class f(Tag): - tagname = name - f.__name__ = name - return f - -thisModule = modules[__name__] - -for t in tags: setattr(thisModule, t, TagFactory(t)) - -def ValidW3C(): - out = a(img(src='http://www.w3.org/Icons/valid-xhtml10', alt='Valid XHTML 1.0 Strict'), href='http://validator.w3.org/check?uri=referer') - return out - -class PyH(Tag): - tagname = 'html' - - def __init__(self, name='MyPyHPage'): - self += head() - self += body() - self.attributes = dict(xmlns='http://www.w3.org/1999/xhtml', lang='en') - self.head += title(name) - - def __iadd__(self, obj): - if isinstance(obj, head) or isinstance(obj, body): self.addObj(obj) - elif isinstance(obj, meta) or isinstance(obj, link): self.head += obj - else: - self.body += obj - id=self.setID(obj) - setattr(self, id, obj) - return self - - def addJS(self, *arg): - for f in arg: self.head += script(type='text/javascript', src=f) - - def addCSS(self, *arg): - for f in arg: self.head += link(rel='stylesheet', type='text/css', href=f) - - def printOut(self,file=''): - if file: f = open(file, 'w') - else: f = stdout - f.write(doctype) - f.write(self.render()) - f.flush() - if file: f.close() - http://git-wip-us.apache.org/repos/asf/kafka/blob/d50499a0/system_test/utils/replication_utils.py ---------------------------------------------------------------------- diff --git a/system_test/utils/replication_utils.py b/system_test/utils/replication_utils.py deleted file mode 100644 index cfd80b2..0000000 --- a/system_test/utils/replication_utils.py +++ /dev/null @@ -1,70 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -#!/usr/bin/env python - -# ================================================================= -# replication_utils.py -# - This module defines constant values specific to Kafka Replication -# and also provides helper functions for Replication system test. -# ================================================================= - -import logging -import sys - -class ReplicationUtils(object): - - thisClassName = '(ReplicationUtils)' - d = {'name_of_class': thisClassName} - - logger = logging.getLogger("namedLogger") - anonLogger = logging.getLogger("anonymousLogger") - - def __init__(self, testClassInstance): - super(ReplicationUtils, self).__init__() - self.logger.debug("#### constructor inside ReplicationUtils", extra=self.d) - - # leader attributes - self.isLeaderLogPattern = "Completed the leader state transition" - self.brokerShutDownCompletedPattern = "shut down completed" - - self.leaderAttributesDict = {} - - self.leaderAttributesDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] = \ - self.brokerShutDownCompletedPattern - - self.leaderAttributesDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"] = \ - "\[(.*?)\] .* \[Kafka Server (.*?)\], " + \ - self.brokerShutDownCompletedPattern - - self.leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] = \ - self.isLeaderLogPattern - - self.leaderAttributesDict["REGX_LEADER_ELECTION_PATTERN"] = \ - "\[(.*?)\] .* Broker (.*?): " + \ - self.leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] + \ - " for topic (.*?) partition (.*?) \(.*" - - # Controller attributes - self.isControllerLogPattern = "Controller startup complete" - self.controllerAttributesDict = {} - self.controllerAttributesDict["CONTROLLER_STARTUP_COMPLETE_MSG"] = self.isControllerLogPattern - self.controllerAttributesDict["REGX_CONTROLLER_STARTUP_PATTERN"] = "\[(.*?)\] .* \[Controller (.*?)\]: " + \ - self.controllerAttributesDict["CONTROLLER_STARTUP_COMPLETE_MSG"] - - # Data Loss Percentage Threshold in Ack = 1 cases - self.ackOneDataLossThresholdPercent = 5.0 - http://git-wip-us.apache.org/repos/asf/kafka/blob/d50499a0/system_test/utils/setup_utils.py ---------------------------------------------------------------------- diff --git a/system_test/utils/setup_utils.py b/system_test/utils/setup_utils.py deleted file mode 100644 index 0e8b7f9..0000000 --- a/system_test/utils/setup_utils.py +++ /dev/null @@ -1,47 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -#!/usr/bin/env python - -# ================================================================= -# setup_utils.py -# - This module provides some basic helper functions. -# ================================================================= - -import logging -import kafka_system_test_utils -import sys - -class SetupUtils(object): - - # dict to pass user-defined attributes to logger argument: "extra" - # to use: just update "thisClassName" to the appropriate value - thisClassName = '(ReplicaBasicTest)' - d = {'name_of_class': thisClassName} - - logger = logging.getLogger("namedLogger") - anonLogger = logging.getLogger("anonymousLogger") - - def __init__(self): - d = {'name_of_class': self.__class__.__name__} - self.logger.debug("#### constructor inside SetupUtils", extra=self.d) - - def log_message(self, message): - print - self.anonLogger.info("======================================================") - self.anonLogger.info(message) - self.anonLogger.info("======================================================") - http://git-wip-us.apache.org/repos/asf/kafka/blob/d50499a0/system_test/utils/system_test_utils.py ---------------------------------------------------------------------- diff --git a/system_test/utils/system_test_utils.py b/system_test/utils/system_test_utils.py deleted file mode 100644 index e8529cd..0000000 --- a/system_test/utils/system_test_utils.py +++ /dev/null @@ -1,638 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -#!/usr/bin/env python - -# =================================== -# system_test_utils.py -# =================================== - -import copy -import difflib -import inspect -import json -import logging -import os -import re -import signal -import socket -import subprocess -import sys -import time - -logger = logging.getLogger("namedLogger") -aLogger = logging.getLogger("anonymousLogger") -thisClassName = '(system_test_utils)' -d = {'name_of_class': thisClassName} - - -def get_current_unix_timestamp(): - ts = time.time() - return "{0:.6f}".format(ts) - - -def get_local_hostname(): - return socket.gethostname() - - -def sys_call(cmdStr): - output = "" - #logger.info("executing command [" + cmdStr + "]", extra=d) - p = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - for line in p.stdout.readlines(): - output += line - return output - - -def remote_async_sys_call(host, cmd): - cmdStr = "ssh " + host + " \"" + cmd + "\"" - logger.info("executing command [" + cmdStr + "]", extra=d) - async_sys_call(cmdStr) - - -def remote_sys_call(host, cmd): - cmdStr = "ssh " + host + " \"" + cmd + "\"" - logger.info("executing command [" + cmdStr + "]", extra=d) - sys_call(cmdStr) - - -def get_dir_paths_with_prefix(fullPath, dirNamePrefix): - dirsList = [] - for dirName in os.listdir(fullPath): - if not os.path.isfile(dirName) and dirName.startswith(dirNamePrefix): - dirsList.append(os.path.abspath(fullPath + "/" + dirName)) - return dirsList - - -def get_testcase_prop_json_pathname(testcasePathName): - testcaseDirName = os.path.basename(testcasePathName) - return testcasePathName + "/" + testcaseDirName + "_properties.json" - -def get_json_list_data(infile): - json_file_str = open(infile, "r").read() - json_data = json.loads(json_file_str) - data_list = [] - - for key,settings in json_data.items(): - if type(settings) == list: - for setting in settings: - if type(setting) == dict: - kv_dict = {} - for k,v in setting.items(): - kv_dict[k] = v - data_list.append(kv_dict) - - return data_list - - -def get_dict_from_list_of_dicts(listOfDicts, lookupKey, lookupVal): - # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'} - # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'} - # - # Usage: - # - # 1. get_data_from_list_of_dicts(self.clusterConfigsList, "entity_id", "0", "role") - # returns: - # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'} - # - # 2. get_data_from_list_of_dicts(self.clusterConfigsList, None, None, "role") - # returns: - # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'} - # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'} - - retList = [] - if ( lookupVal is None or lookupKey is None ): - for dict in listOfDicts: - for k,v in dict.items(): - if ( k == fieldToRetrieve ): # match with fieldToRetrieve ONLY - retList.append( dict ) - else: - for dict in listOfDicts: - for k,v in dict.items(): - if ( k == lookupKey and v == lookupVal ): # match with lookupKey and lookupVal - retList.append( dict ) - - return retList - - -def get_data_from_list_of_dicts(listOfDicts, lookupKey, lookupVal, fieldToRetrieve): - # Sample List of Dicts: - # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'} - # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'} - # - # Usage: - # 1. get_data_from_list_of_dicts(self.clusterConfigsList, "entity_id", "0", "role") - # => returns ['zookeeper'] - # 2. get_data_from_list_of_dicts(self.clusterConfigsList, None, None, "role") - # => returns ['zookeeper', 'broker'] - - retList = [] - if ( lookupVal is None or lookupKey is None ): - for dict in listOfDicts: - for k,v in dict.items(): - if ( k == fieldToRetrieve ): # match with fieldToRetrieve ONLY - try: - retList.append( dict[fieldToRetrieve] ) - except: - logger.debug("field not found: " + fieldToRetrieve, extra=d) - else: - for dict in listOfDicts: - for k,v in dict.items(): - if ( k == lookupKey and v == lookupVal ): # match with lookupKey and lookupVal - try: - retList.append( dict[fieldToRetrieve] ) - except: - logger.debug("field not found: " + fieldToRetrieve, extra=d) - return retList - -def get_data_by_lookup_keyval(listOfDict, lookupKey, lookupVal, fieldToRetrieve): - returnValue = "" - returnValuesList = get_data_from_list_of_dicts(listOfDict, lookupKey, lookupVal, fieldToRetrieve) - if len(returnValuesList) > 0: - returnValue = returnValuesList[0] - - return returnValue - -def get_json_dict_data(infile): - json_file_str = open(infile, "r").read() - json_data = json.loads(json_file_str) - data_dict = {} - - for key,val in json_data.items(): - if ( type(val) != list ): - data_dict[key] = val - - return data_dict - -def get_remote_child_processes(hostname, pid): - pidStack = [] - - cmdList = ['''ssh ''' + hostname, - ''''pid=''' + pid + '''; prev_pid=""; echo $pid;''', - '''while [[ "x$pid" != "x" ]];''', - '''do prev_pid=$pid;''', - ''' for child in $(ps -o pid,ppid ax | awk "{ if ( \$2 == $pid ) { print \$1 }}");''', - ''' do echo $child; pid=$child;''', - ''' done;''', - ''' if [ $prev_pid == $pid ]; then''', - ''' break;''', - ''' fi;''', - '''done' 2> /dev/null'''] - - cmdStr = " ".join(cmdList) - logger.debug("executing command [" + cmdStr, extra=d) - - subproc = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE) - for line in subproc.stdout.readlines(): - procId = line.rstrip('\n') - pidStack.append(procId) - return pidStack - -def get_child_processes(pid): - pidStack = [] - currentPid = pid - parentPid = "" - pidStack.append(pid) - - while ( len(currentPid) > 0 ): - psCommand = subprocess.Popen("ps -o pid --ppid %s --noheaders" % currentPid, shell=True, stdout=subprocess.PIPE) - psOutput = psCommand.stdout.read() - outputLine = psOutput.rstrip('\n') - childPid = outputLine.lstrip() - - if ( len(childPid) > 0 ): - pidStack.append(childPid) - currentPid = childPid - else: - break - return pidStack - -def sigterm_remote_process(hostname, pidStack): - - while ( len(pidStack) > 0 ): - pid = pidStack.pop() - cmdStr = "ssh " + hostname + " 'kill -15 " + pid + "'" - - try: - logger.debug("executing command [" + cmdStr + "]", extra=d) - sys_call_return_subproc(cmdStr) - except: - print "WARN - pid:",pid,"not found" - raise - -def sigkill_remote_process(hostname, pidStack): - - while ( len(pidStack) > 0 ): - pid = pidStack.pop() - cmdStr = "ssh " + hostname + " 'kill -9 " + pid + "'" - - try: - logger.debug("executing command [" + cmdStr + "]", extra=d) - sys_call_return_subproc(cmdStr) - except: - print "WARN - pid:",pid,"not found" - raise - -def simulate_garbage_collection_pause_in_remote_process(hostname, pidStack, pauseTimeInSeconds): - pausedPidStack = [] - - # pause the processes - while len(pidStack) > 0: - pid = pidStack.pop() - pausedPidStack.append(pid) - cmdStr = "ssh " + hostname + " 'kill -SIGSTOP " + pid + "'" - - try: - logger.debug("executing command [" + cmdStr + "]", extra=d) - sys_call_return_subproc(cmdStr) - except: - print "WARN - pid:",pid,"not found" - raise - - time.sleep(int(pauseTimeInSeconds)) - - # resume execution of the processes - while len(pausedPidStack) > 0: - pid = pausedPidStack.pop() - cmdStr = "ssh " + hostname + " 'kill -SIGCONT " + pid + "'" - - try: - logger.debug("executing command [" + cmdStr + "]", extra=d) - sys_call_return_subproc(cmdStr) - except: - print "WARN - pid:",pid,"not found" - raise - -def terminate_process(pidStack): - while ( len(pidStack) > 0 ): - pid = pidStack.pop() - try: - os.kill(int(pid), signal.SIGTERM) - except: - print "WARN - pid:",pid,"not found" - raise - - -def convert_keyval_to_cmd_args(configFilePathname): - cmdArg = "" - inlines = open(configFilePathname, "r").readlines() - for inline in inlines: - line = inline.rstrip() - tokens = line.split('=', 1) - - if (len(tokens) == 2): - cmdArg = cmdArg + " --" + tokens[0] + " " + tokens[1] - elif (len(tokens) == 1): - cmdArg = cmdArg + " --" + tokens[0] - else: - print "ERROR: unexpected arguments list", line - return cmdArg - - -def async_sys_call(cmd_str): - subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - - -def sys_call_return_subproc(cmd_str): - p = subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - return p - - -def remote_host_file_exists(hostname, pathname): - cmdStr = "ssh " + hostname + " 'ls " + pathname + "'" - logger.debug("executing command: [" + cmdStr + "]", extra=d) - subproc = sys_call_return_subproc(cmdStr) - - for line in subproc.stdout.readlines(): - if "No such file or directory" in line: - return False - return True - - -def remote_host_directory_exists(hostname, path): - cmdStr = "ssh " + hostname + " 'ls -d " + path + "'" - logger.debug("executing command: [" + cmdStr + "]", extra=d) - subproc = sys_call_return_subproc(cmdStr) - - for line in subproc.stdout.readlines(): - if "No such file or directory" in line: - return False - return True - - -def remote_host_processes_stopped(hostname): - cmdStr = "ssh " + hostname + \ - " \"ps auxw | grep -v grep | grep -v Bootstrap | grep -i 'java\|run\-\|producer\|consumer\|jmxtool\|kafka' | wc -l\" 2> /dev/null" - - logger.info("executing command: [" + cmdStr + "]", extra=d) - subproc = sys_call_return_subproc(cmdStr) - - for line in subproc.stdout.readlines(): - line = line.rstrip('\n') - logger.info("no. of running processes found : [" + line + "]", extra=d) - if line == '0': - return True - return False - - -def setup_remote_hosts(systemTestEnv): - # sanity check on remote hosts to make sure: - # - all directories (eg. java_home) specified in cluster_config.json exists in all hosts - # - no conflicting running processes in remote hosts - - aLogger.info("=================================================") - aLogger.info("setting up remote hosts ...") - aLogger.info("=================================================") - - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - - localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..") - - # when configuring "default" java_home, use JAVA_HOME environment variable, if exists - # otherwise, use the directory with the java binary - localJavaHome = os.environ.get('JAVA_HOME') - if localJavaHome is not None: - localJavaBin = localJavaHome + '/bin/java' - else: - subproc = sys_call_return_subproc("which java") - for line in subproc.stdout.readlines(): - if line.startswith("which: no "): - logger.error("No Java binary found in local host", extra=d) - return False - else: - line = line.rstrip('\n') - localJavaBin = line - matchObj = re.match("(.*)\/bin\/java$", line) - localJavaHome = matchObj.group(1) - - listIndex = -1 - for clusterEntityConfigDict in clusterEntityConfigDictList: - listIndex += 1 - - hostname = clusterEntityConfigDict["hostname"] - kafkaHome = clusterEntityConfigDict["kafka_home"] - javaHome = clusterEntityConfigDict["java_home"] - - if hostname == "localhost" and javaHome == "default": - clusterEntityConfigDictList[listIndex]["java_home"] = localJavaHome - - if hostname == "localhost" and kafkaHome == "default": - clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome - if hostname == "localhost" and kafkaHome == "system_test/migration_tool_testsuite/0.7": - clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome + "/system_test/migration_tool_testsuite/0.7" - - kafkaHome = clusterEntityConfigDict["kafka_home"] - javaHome = clusterEntityConfigDict["java_home"] - - logger.debug("checking java binary [" + localJavaBin + "] in host [" + hostname + "]", extra=d) - if not remote_host_directory_exists(hostname, javaHome): - logger.error("Directory not found: [" + javaHome + "] in host [" + hostname + "]", extra=d) - return False - - logger.debug("checking directory [" + kafkaHome + "] in host [" + hostname + "]", extra=d) - if not remote_host_directory_exists(hostname, kafkaHome): - logger.info("Directory not found: [" + kafkaHome + "] in host [" + hostname + "]", extra=d) - if hostname == "localhost": - return False - else: - localKafkaSourcePath = systemTestEnv.SYSTEM_TEST_BASE_DIR + "/.." - logger.debug("copying local copy of [" + localKafkaSourcePath + "] to " + hostname + ":" + kafkaHome, extra=d) - copy_source_to_remote_hosts(hostname, localKafkaSourcePath, kafkaHome) - - return True - -def copy_source_to_remote_hosts(hostname, sourceDir, destDir): - - cmdStr = "rsync -avz --delete-before " + sourceDir + "/ " + hostname + ":" + destDir - logger.info("executing command [" + cmdStr + "]", extra=d) - subproc = sys_call_return_subproc(cmdStr) - - for line in subproc.stdout.readlines(): - dummyVar = 1 - - -def remove_kafka_home_dir_at_remote_hosts(hostname, kafkaHome): - - if remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"): - cmdStr = "ssh " + hostname + " 'chmod -R 777 " + kafkaHome + "'" - logger.info("executing command [" + cmdStr + "]", extra=d) - sys_call(cmdStr) - - cmdStr = "ssh " + hostname + " 'rm -rf " + kafkaHome + "'" - logger.info("executing command [" + cmdStr + "]", extra=d) - #sys_call(cmdStr) - else: - logger.warn("possible destructive command [" + cmdStr + "]", extra=d) - logger.warn("check config file: system_test/cluster_config.properties", extra=d) - logger.warn("aborting test...", extra=d) - sys.exit(1) - -def get_md5_for_file(filePathName, blockSize=8192): - md5 = hashlib.md5() - f = open(filePathName, 'rb') - - while True: - data = f.read(blockSize) - if not data: - break - md5.update(data) - return md5.digest() - -def load_cluster_config(clusterConfigPathName, clusterEntityConfigDictList): - # empty the list - clusterEntityConfigDictList[:] = [] - - # retrieve each entity's data from cluster config json file - # as "dict" and enter them into a "list" - jsonFileContent = open(clusterConfigPathName, "r").read() - jsonData = json.loads(jsonFileContent) - for key, cfgList in jsonData.items(): - if key == "cluster_config": - for cfg in cfgList: - clusterEntityConfigDictList.append(cfg) - -def setup_remote_hosts_with_testcase_level_cluster_config(systemTestEnv, testCasePathName): - # ======================================================================= - # starting a new testcase, check for local cluster_config.json - # ======================================================================= - # 1. if there is a xxxx_testsuite/testcase_xxxx/cluster_config.json - # => load it into systemTestEnv.clusterEntityConfigDictList - # 2. if there is NO testcase_xxxx/cluster_config.json but has a xxxx_testsuite/cluster_config.json - # => retore systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite - # 3. if there is NO testcase_xxxx/cluster_config.json NOR xxxx_testsuite/cluster_config.json - # => restore system_test/cluster_config.json - - testCaseLevelClusterConfigPathName = testCasePathName + "/cluster_config.json" - - if os.path.isfile(testCaseLevelClusterConfigPathName): - # if there is a cluster_config.json in this directory, load it and use it for this testsuite - logger.info("found a new cluster_config : " + testCaseLevelClusterConfigPathName, extra=d) - - # empty the current cluster config list - systemTestEnv.clusterEntityConfigDictList[:] = [] - - # load the cluster config for this testcase level - load_cluster_config(testCaseLevelClusterConfigPathName, systemTestEnv.clusterEntityConfigDictList) - - # back up this testcase level cluster config - systemTestEnv.clusterEntityConfigDictListLastFoundInTestCase = copy.deepcopy(systemTestEnv.clusterEntityConfigDictList) - - elif len(systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite) > 0: - # if there is NO testcase_xxxx/cluster_config.json, but has a xxxx_testsuite/cluster_config.json - # => restore the config in xxxx_testsuite/cluster_config.json - - # empty the current cluster config list - systemTestEnv.clusterEntityConfigDictList[:] = [] - - # restore the system_test/cluster_config.json - systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite) - - else: - # if there is NONE, restore the config in system_test/cluster_config.json - - # empty the current cluster config list - systemTestEnv.clusterEntityConfigDictList[:] = [] - - # restore the system_test/cluster_config.json - systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListInSystemTestLevel) - - # set up remote hosts - if not setup_remote_hosts(systemTestEnv): - logger.error("Remote hosts sanity check failed. Aborting test ...", extra=d) - print - sys.exit(1) - print - -def setup_remote_hosts_with_testsuite_level_cluster_config(systemTestEnv, testModulePathName): - # ======================================================================= - # starting a new testsuite, check for local cluster_config.json: - # ======================================================================= - # 1. if there is a xxxx_testsuite/cluster_config.son - # => load it into systemTestEnv.clusterEntityConfigDictList - # 2. if there is NO xxxx_testsuite/cluster_config.son - # => restore system_test/cluster_config.json - - testSuiteLevelClusterConfigPathName = testModulePathName + "/cluster_config.json" - - if os.path.isfile(testSuiteLevelClusterConfigPathName): - # if there is a cluster_config.json in this directory, load it and use it for this testsuite - logger.info("found a new cluster_config : " + testSuiteLevelClusterConfigPathName, extra=d) - - # empty the current cluster config list - systemTestEnv.clusterEntityConfigDictList[:] = [] - - # load the cluster config for this testsuite level - load_cluster_config(testSuiteLevelClusterConfigPathName, systemTestEnv.clusterEntityConfigDictList) - - # back up this testsuite level cluster config - systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite = copy.deepcopy(systemTestEnv.clusterEntityConfigDictList) - - else: - # if there is NONE, restore the config in system_test/cluster_config.json - - # empty the last testsuite level cluster config list - systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite[:] = [] - - # empty the current cluster config list - systemTestEnv.clusterEntityConfigDictList[:] = [] - - # restore the system_test/cluster_config.json - systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListInSystemTestLevel) - - # set up remote hosts - if not setup_remote_hosts(systemTestEnv): - logger.error("Remote hosts sanity check failed. Aborting test ...", extra=d) - print - sys.exit(1) - print - -# ================================================= -# lists_diff_count -# - find the no. of different items in both lists -# - both lists need not be sorted -# - input lists won't be changed -# ================================================= -def lists_diff_count(a, b): - c = list(b) - d = [] - for item in a: - try: - c.remove(item) - except: - d.append(item) - - if len(d) > 0: - print "#### Mismatch MessageID" - print d - - return len(c) + len(d) - -# ================================================= -# subtract_list -# - subtract items in listToSubtract from mainList -# and return the resulting list -# - both lists need not be sorted -# - input lists won't be changed -# ================================================= -def subtract_list(mainList, listToSubtract): - remainingList = list(mainList) - for item in listToSubtract: - try: - remainingList.remove(item) - except: - pass - return remainingList - -# ================================================= -# diff_lists -# - find the diff of 2 lists and return the -# total no. of mismatch from both lists -# - diff of both lists includes: -# - no. of items mismatch -# - ordering of the items -# -# sample lists: -# a = ['8','4','3','2','1'] -# b = ['8','3','4','2','1'] -# -# difflib will return the following: -# 8 -# + 3 -# 4 -# - 3 -# 2 -# 1 -# -# diff_lists(a,b) returns 2 and prints the following: -# #### only in seq 2 : + 3 -# #### only in seq 1 : - 3 -# ================================================= -def diff_lists(a, b): - mismatchCount = 0 - d = difflib.Differ() - diff = d.compare(a,b) - - for item in diff: - result = item[0:1].strip() - if len(result) > 0: - mismatchCount += 1 - if '-' in result: - logger.debug("#### only in seq 1 : " + item, extra=d) - elif '+' in result: - logger.debug("#### only in seq 2 : " + item, extra=d) - - return mismatchCount - http://git-wip-us.apache.org/repos/asf/kafka/blob/d50499a0/system_test/utils/testcase_env.py ---------------------------------------------------------------------- diff --git a/system_test/utils/testcase_env.py b/system_test/utils/testcase_env.py deleted file mode 100644 index 1d2fb57..0000000 --- a/system_test/utils/testcase_env.py +++ /dev/null @@ -1,173 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -#!/usr/bin/env python - -# =================================== -# testcase_env.py -# =================================== - -import json -import os -import sys -import thread - -import system_test_utils - -class TestcaseEnv(): - def __init__(self, systemTestEnv, classInstance): - self.systemTestEnv = systemTestEnv - - # ================================ - # Generic testcase environment - # ================================ - - # dictionary of entity_id to ppid for Zookeeper entities - # key: entity_id - # val: ppid of Zookeeper associated to that entity_id - # { 0: 12345, 1: 12389, ... } - self.entityZkParentPidDict = {} - - # dictionary of entity_id to ppid for broker entities - # key: entity_id - # val: ppid of broker associated to that entity_id - # { 0: 12345, 1: 12389, ... } - self.entityBrokerParentPidDict = {} - - # dictionary of entity_id to ppid for mirror-maker entities - # key: entity_id - # val: ppid of broker associated to that entity_id - # { 0: 12345, 1: 12389, ... } - self.entityMirrorMakerParentPidDict = {} - - # dictionary of entity_id to ppid for console-consumer entities - # key: entity_id - # val: ppid of console consumer associated to that entity_id - # { 0: 12345, 1: 12389, ... } - self.entityConsoleConsumerParentPidDict = {} - - # dictionary of entity_id to ppid for migration tool entities - # key: entity_id - # val: ppid of broker associated to that entity_id - # { 0: 12345, 1: 12389, ... } - self.entityMigrationToolParentPidDict = {} - - # dictionary of entity_id to list of JMX ppid - # key: entity_id - # val: list of JMX ppid associated to that entity_id - # { 1: [1234, 1235, 1236], 2: [2234, 2235, 2236], ... } - self.entityJmxParentPidDict = {} - - # dictionary of hostname-topic-ppid for consumer - # key: hostname - # val: dict of topic-ppid - # { host1: { test1 : 12345 }, host1: { test2 : 12389 }, ... } - self.consumerHostParentPidDict = {} - - # dictionary of hostname-topic-ppid for producer - # key: hostname - # val: dict of topic-ppid - # { host1: { test1 : 12345 }, host1: { test2 : 12389 }, ... } - self.producerHostParentPidDict = {} - - # list of testcase configs - self.testcaseConfigsList = [] - - # dictionary to keep track of testcase arguments such as replica_factor, num_partition - self.testcaseArgumentsDict = {} - - - # gather the test case related info and add to an SystemTestEnv object - self.testcaseResultsDict = {} - self.testcaseResultsDict["_test_class_name"] = classInstance.__class__.__name__ - self.testcaseResultsDict["_test_case_name"] = "" - self.validationStatusDict = {} - self.testcaseResultsDict["validation_status"] = self.validationStatusDict - self.systemTestEnv.systemTestResultsList.append(self.testcaseResultsDict) - - # FIXME: in a distributed environement, kafkaBaseDir could be different in individual host - # => TBD - self.kafkaBaseDir = "" - - self.systemTestBaseDir = systemTestEnv.SYSTEM_TEST_BASE_DIR - - # to be initialized in the Test Module - self.testSuiteBaseDir = "" - self.testCaseBaseDir = "" - self.testCaseLogsDir = "" - self.testCaseDashboardsDir = "" - self.testcasePropJsonPathName = "" - self.testcaseNonEntityDataDict = {} - - # ================================ - # dictionary to keep track of - # user-defined environment variables - # ================================ - # LEADER_ELECTION_COMPLETED_MSG = "completed the leader state transition" - # REGX_LEADER_ELECTION_PATTERN = "\[(.*?)\] .* Broker (.*?) " + \ - # LEADER_ELECTION_COMPLETED_MSG + \ - # " for topic (.*?) partition (.*?) \(.*" - # zkConnectStr = "" - # consumerLogPathName = "" - # consumerConfigPathName = "" - # producerLogPathName = "" - # producerConfigPathName = "" - self.userDefinedEnvVarDict = {} - - # Lock object for producer threads synchronization - self.lock = thread.allocate_lock() - - self.numProducerThreadsRunning = 0 - - # to be used when validating data match - these variables will be - # updated by kafka_system_test_utils.start_producer_in_thread - self.producerTopicsString = "" - self.consumerTopicsString = "" - - def initWithKnownTestCasePathName(self, testCasePathName): - testcaseDirName = os.path.basename(testCasePathName) - self.testcaseResultsDict["_test_case_name"] = testcaseDirName - self.testCaseBaseDir = testCasePathName - self.testCaseLogsDir = self.testCaseBaseDir + "/logs" - self.testCaseDashboardsDir = self.testCaseBaseDir + "/dashboards" - - # find testcase properties json file - self.testcasePropJsonPathName = system_test_utils.get_testcase_prop_json_pathname(testCasePathName) - - # get the dictionary that contains the testcase arguments and description - self.testcaseNonEntityDataDict = system_test_utils.get_json_dict_data(self.testcasePropJsonPathName) - - def printTestCaseDescription(self, testcaseDirName): - testcaseDescription = "" - for k,v in self.testcaseNonEntityDataDict.items(): - if ( k == "description" ): - testcaseDescription = v - - print "\n" - print "=======================================================================================" - print "Test Case Name :", testcaseDirName - print "=======================================================================================" - print "Description :" - for step in sorted(testcaseDescription.iterkeys()): - print " ", step, ":", testcaseDescription[step] - print "=======================================================================================" - print "Test Case Args :" - for k,v in self.testcaseArgumentsDict.items(): - print " ", k, " : ", v - self.testcaseResultsDict["arg : " + k] = v - print "=======================================================================================" - -
