Repository: kafka
Updated Branches:
  refs/heads/trunk 02311c064 -> 3a048e80d


kafka-924 (follow-up); Specify console consumer properties via a single 
--property command line parameter;;  patched by Sriharsha Chintalapani; 
reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3a048e80
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3a048e80
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3a048e80

Branch: refs/heads/trunk
Commit: 3a048e80d526d5eaa39ec1588c6ee47af975d015
Parents: 02311c0
Author: Sriharsha Chintalapani <[email protected]>
Authored: Tue Jun 3 07:36:00 2014 -0700
Committer: Jun Rao <[email protected]>
Committed: Tue Jun 3 07:36:00 2014 -0700

----------------------------------------------------------------------
 system_test/utils/kafka_system_test_utils.py | 34 ++++++++++++++++++-----
 1 file changed, 27 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3a048e80/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py 
b/system_test/utils/kafka_system_test_utils.py
index de02e47..8cde3c4 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -792,14 +792,20 @@ def start_entity_in_background(systemTestEnv, 
testcaseEnv, entityId):
         except:
             pass
 
-        # 4. group
-        groupOption = ""
-        try:
+       # 4. consumer config
+       consumerProperties = {}
+       consumerProperties["consumer.timeout.ms"] = timeoutMs
+       try:
             groupOption = 
system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", 
entityId, "group.id")
-            groupOption = "--group " + groupOption
+            consumerProperties["group.id"] = groupOption
         except:
             pass
 
+       props_file_path=write_consumer_properties(consumerProperties)
+       scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/"
+       logger.debug("executing command [" + scpCmdStr + "]", extra=d)
+       system_test_utils.sys_call(scpCmdStr)
+
         if len(formatterOption) > 0:
             formatterOption = " --formatter " + formatterOption + " "
 
@@ -818,9 +824,8 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, 
entityId):
                    kafkaHome + "/bin/kafka-run-class.sh 
kafka.consumer.ConsoleConsumer",
                    "--zookeeper " + zkConnectStr,
                    "--topic " + topic,
-                   "--consumer-timeout-ms " + timeoutMs,
+                   "--consumer.config /tmp/consumer.properties",
                    "--csv-reporter-enabled",
-                   groupOption,
                    formatterOption,
                    "--from-beginning",
                    " >> " + logPathName + "/" + logFile + " & echo pid:$! > ",
@@ -925,13 +930,20 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
             logger.error("Invalid cluster name : " + clusterName, extra=d)
             sys.exit(1)
 
+       consumerProperties = {}
+       consumerProperties["consumer.timeout.ms"] = timeoutMs
+       props_file_path=write_consumer_properties(consumerProperties)
+       scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/"
+       logger.debug("executing command [" + scpCmdStr + "]", extra=d)
+       system_test_utils.sys_call(scpCmdStr)
+
         cmdList = ["ssh " + host,
                    "'JAVA_HOME=" + javaHome,
                    "JMX_PORT=" + jmxPort,
                    kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
                    "--zookeeper " + zkConnectStr,
                    "--topic " + topic,
-                   "--consumer-timeout-ms " + timeoutMs,
+                   "--consumer.config /tmp/consumer.properties",
                    "--csv-reporter-enabled",
                    #"--metrics-dir " + metricsDir,
                    formatterOption,
@@ -2484,4 +2496,12 @@ def get_leader_attributes(systemTestEnv, testcaseEnv):
     return leaderDict
 
 
+def write_consumer_properties(consumerProperties):
+    import tempfile
+    props_file_path = tempfile.gettempdir() + "/consumer.properties"
+    consumer_props_file=open(props_file_path,"w")
+    for key,value in consumerProperties.iteritems():
+        consumer_props_file.write(key+"="+value+"\n")
+    consumer_props_file.close()
+    return props_file_path
 

Reply via email to