Repository: samza Updated Branches: refs/heads/master 53d7f2625 -> 9f323c950
SAMZA-1732: Reduce the coordination timeouts in TestZkLocalApplicationRunner tests. Currently all the tests in TestZkLocalApplicationRunner takes around 5 minutes to finish. Reducing the coordination timeout to reduce the test time. Changes in TestZkLocalApplicationRunner test timeout values: * Change debounce timeout from 20 seconds to 2 seconds. * Change task.shutdown timeout from 30 seconds to 2 seconds. * Change barrier timeout from 40 seconds to 2 seconds. After this change, execution time of TestZkLocalApplicationRunner tests has reduced from `310` seconds to `55` seconds. Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Jagadish <[email protected]> Closes #537 from shanthoosh/reduce_zk_localAppRunnerTestTime Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9f323c95 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9f323c95 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9f323c95 Branch: refs/heads/master Commit: 9f323c950410ad52cd29a9ac8def65af5b578487 Parents: 53d7f26 Author: Shanthoosh Venkataraman <[email protected]> Authored: Tue May 29 16:26:25 2018 -0700 Committer: Jagadish <[email protected]> Committed: Tue May 29 16:26:25 2018 -0700 ---------------------------------------------------------------------- .../processor/TestZkLocalApplicationRunner.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9f323c95/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index 0b0a271..ea44052 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -88,8 +88,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne private static final String TEST_TASK_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"; private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory"; private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory"; - private static final String TASK_SHUTDOWN_MS = "20000"; - private static final String JOB_DEBOUNCE_TIME_MS = "30000"; + private static final String TASK_SHUTDOWN_MS = "2000"; + private static final String JOB_DEBOUNCE_TIME_MS = "2000"; + private static final String BARRIER_TIMEOUT_MS = "2000"; private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"}; private String inputKafkaTopic; @@ -185,6 +186,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne private Map<String, String> buildStreamApplicationConfigMap(String systemName, String inputTopic, String appName, String appId) { Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder() + .put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS) .put(TaskConfig.INPUT_STREAMS(), inputTopic) .put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName) .put(TaskConfig.IGNORED_EXCEPTIONS(), "*") @@ -489,11 +491,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne processedMessagesLatch1.await(); processedMessagesLatch2.await(); - LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig2); + MapConfig appConfig = new ApplicationConfig(new MapConfig(applicationConfig2, ImmutableMap.of(ZkConfig.ZK_SESSION_TIMEOUT_MS, "10"))); + LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(appConfig); // Create a stream app with same processor id as SP2 and run it. It should fail. - publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[2]); - kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS); StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch, applicationConfig2); // Fail when the duplicate processor joins. expectedException.expect(SamzaException.class); @@ -514,12 +515,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Set up kafka topics. publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); - /** - * Custom listeners can't be plugged in for transition events(generatingNewJobModel, waitingForProcessors, waitingForBarrierCompletion etc) from zkJobCoordinator. Only possible listeners - * are for ZkJobCoordinator output(onNewJobModelConfirmed, onNewJobModelAvailable). Increasing DefaultDebounceTime to make sure that streamApplication dies & rejoins before expiry. - */ Map<String, String> configMap = buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId); - configMap.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000"); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); Config applicationConfig1 = new MapConfig(configMap);
