Repository: kafka Updated Branches: refs/heads/trunk 02311c064 -> 3a048e80d
kafka-924 (follow-up); Specify console consumer properties via a single --property command line parameter;; patched by Sriharsha Chintalapani; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3a048e80 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3a048e80 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3a048e80 Branch: refs/heads/trunk Commit: 3a048e80d526d5eaa39ec1588c6ee47af975d015 Parents: 02311c0 Author: Sriharsha Chintalapani <[email protected]> Authored: Tue Jun 3 07:36:00 2014 -0700 Committer: Jun Rao <[email protected]> Committed: Tue Jun 3 07:36:00 2014 -0700 ---------------------------------------------------------------------- system_test/utils/kafka_system_test_utils.py | 34 ++++++++++++++++++----- 1 file changed, 27 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3a048e80/system_test/utils/kafka_system_test_utils.py ---------------------------------------------------------------------- diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index de02e47..8cde3c4 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -792,14 +792,20 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): except: pass - # 4. group - groupOption = "" - try: + # 4. consumer config + consumerProperties = {} + consumerProperties["consumer.timeout.ms"] = timeoutMs + try: groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "group.id") - groupOption = "--group " + groupOption + consumerProperties["group.id"] = groupOption except: pass + props_file_path=write_consumer_properties(consumerProperties) + scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/" + logger.debug("executing command [" + scpCmdStr + "]", extra=d) + system_test_utils.sys_call(scpCmdStr) + if len(formatterOption) > 0: formatterOption = " --formatter " + formatterOption + " " @@ -818,9 +824,8 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): kafkaHome + "/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer", "--zookeeper " + zkConnectStr, "--topic " + topic, - "--consumer-timeout-ms " + timeoutMs, + "--consumer.config /tmp/consumer.properties", "--csv-reporter-enabled", - groupOption, formatterOption, "--from-beginning", " >> " + logPathName + "/" + logFile + " & echo pid:$! > ", @@ -925,13 +930,20 @@ def start_console_consumer(systemTestEnv, testcaseEnv): logger.error("Invalid cluster name : " + clusterName, extra=d) sys.exit(1) + consumerProperties = {} + consumerProperties["consumer.timeout.ms"] = timeoutMs + props_file_path=write_consumer_properties(consumerProperties) + scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/" + logger.debug("executing command [" + scpCmdStr + "]", extra=d) + system_test_utils.sys_call(scpCmdStr) + cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, kafkaRunClassBin + " kafka.consumer.ConsoleConsumer", "--zookeeper " + zkConnectStr, "--topic " + topic, - "--consumer-timeout-ms " + timeoutMs, + "--consumer.config /tmp/consumer.properties", "--csv-reporter-enabled", #"--metrics-dir " + metricsDir, formatterOption, @@ -2484,4 +2496,12 @@ def get_leader_attributes(systemTestEnv, testcaseEnv): return leaderDict +def write_consumer_properties(consumerProperties): + import tempfile + props_file_path = tempfile.gettempdir() + "/consumer.properties" + consumer_props_file=open(props_file_path,"w") + for key,value in consumerProperties.iteritems(): + consumer_props_file.write(key+"="+value+"\n") + consumer_props_file.close() + return props_file_path
