Repository: kafka Updated Branches: refs/heads/trunk caf256ad8 -> d678449b9
KAFKA-1582; System Test should wait for producer to finish; reviewed by Joel Koshy and Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d678449b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d678449b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d678449b Branch: refs/heads/trunk Commit: d678449b967ed92a5a02289c061f201da25b07de Parents: caf256a Author: Dong Lin <[email protected]> Authored: Fri Aug 15 10:46:34 2014 -0700 Committer: Joel Koshy <[email protected]> Committed: Fri Aug 15 10:46:34 2014 -0700 ---------------------------------------------------------------------- system_test/utils/kafka_system_test_utils.py | 34 ++++++++++++----------- 1 file changed, 18 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d678449b/system_test/utils/kafka_system_test_utils.py ---------------------------------------------------------------------- diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 6edd64a..fcacf0a 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -792,19 +792,19 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): except: pass - # 4. consumer config - consumerProperties = {} - consumerProperties["consumer.timeout.ms"] = timeoutMs - try: + # 4. consumer config + consumerProperties = {} + consumerProperties["consumer.timeout.ms"] = timeoutMs + try: groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "group.id") consumerProperties["group.id"] = groupOption except: pass - props_file_path=write_consumer_properties(consumerProperties) - scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/" - logger.debug("executing command [" + scpCmdStr + "]", extra=d) - system_test_utils.sys_call(scpCmdStr) + props_file_path=write_consumer_properties(consumerProperties) + scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/" + logger.debug("executing command [" + scpCmdStr + "]", extra=d) + system_test_utils.sys_call(scpCmdStr) if len(formatterOption) > 0: formatterOption = " --formatter " + formatterOption + " " @@ -930,12 +930,12 @@ def start_console_consumer(systemTestEnv, testcaseEnv): logger.error("Invalid cluster name : " + clusterName, extra=d) sys.exit(1) - consumerProperties = {} - consumerProperties["consumer.timeout.ms"] = timeoutMs - props_file_path=write_consumer_properties(consumerProperties) - scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/" - logger.debug("executing command [" + scpCmdStr + "]", extra=d) - system_test_utils.sys_call(scpCmdStr) + consumerProperties = {} + consumerProperties["consumer.timeout.ms"] = timeoutMs + props_file_path=write_consumer_properties(consumerProperties) + scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/" + logger.debug("executing command [" + scpCmdStr + "]", extra=d) + system_test_utils.sys_call(scpCmdStr) cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, @@ -1136,7 +1136,8 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk "--metrics-dir " + metricsDir, boolArgumentsStr, " >> " + producerLogPathName, - " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"] + " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid", + " & wait'"] if kafka07Client: cmdList[:] = [] @@ -1167,7 +1168,8 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk "--message-size " + messageSize, "--vary-message-size --async", " >> " + producerLogPathName, - " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"] + " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid", + " & wait'"] cmdStr = " ".join(cmdList) logger.debug("executing command: [" + cmdStr + "]", extra=d)
