Reenable LocalZkApplicationRunner tests. Add back commented out ZkLocalApplicationRunner tests(Was dependent upon error propagation from SamzaContainer to LocalApplicationRunner).
Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Navina Ramesh <nav...@apache.org> Closes #250 from shanthoosh/fix_broken_tests Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cf5efe76 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cf5efe76 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cf5efe76 Branch: refs/heads/0.14.0 Commit: cf5efe761d0d901e8a98d195fba9a6778879d0fc Parents: 966730e Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Fri Aug 4 11:49:29 2017 -0700 Committer: navina <nav...@apache.org> Committed: Fri Aug 4 11:49:29 2017 -0700 ---------------------------------------------------------------------- .../apache/samza/zk/TestZkJobCoordinator.java | 1 - .../processor/TestZkLocalApplicationRunner.java | 145 +++++++++++-------- 2 files changed, 84 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/cf5efe76/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java index fd6065a..117d458 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java @@ -44,7 +44,6 @@ public class TestZkJobCoordinator { ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils)); zkJobCoordinator.onNewJobModelAvailable(TEST_JOB_MODEL_VERSION); - Mockito.doNothing().when(zkJobCoordinator).stop(); Mockito.verify(zkJobCoordinator, Mockito.atMost(1)).stop(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/cf5efe76/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index cf0a242..9ca48ad 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -33,9 +33,11 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import kafka.admin.AdminUtils; +import kafka.server.KafkaServer; import kafka.utils.TestUtils; import org.I0Itec.zkclient.ZkClient; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; @@ -64,6 +66,7 @@ import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; import static org.junit.Assert.*; @@ -78,14 +81,15 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne private static final Logger LOGGER = LoggerFactory.getLogger(TestZkLocalApplicationRunner.class); private static final int NUM_KAFKA_EVENTS = 300; - private static final int ZK_CONNECTION_TIMEOUT_MS = 10000; + private static final int ZK_CONNECTION_TIMEOUT_MS = 100; private static final String TEST_SYSTEM = "TestSystemName"; private static final String TEST_SSP_GROUPER_FACTORY = "org.apache.samza.container.grouper.stream.GroupByPartitionFactory"; private static final String TEST_TASK_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"; private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory"; private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory"; private static final String TEST_JOB_NAME = "test-job"; - private static final String TASK_SHUTDOWN_MS = "5000"; + private static final String TASK_SHUTDOWN_MS = "3000"; + private static final String JOB_DEBOUNCE_TIME_MS = "1000"; private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"}; private String inputKafkaTopic; @@ -100,9 +104,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne private String testStreamAppName; private String testStreamAppId; - // Set 90 seconds as max execution time for each test. @Rule - public Timeout testTimeOutInMillis = new Timeout(90000); + public Timeout testTimeOutInMillis = new Timeout(120000); @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -138,12 +141,14 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne @Override public void tearDown() { - for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) { - LOGGER.info("Deleting kafka topic: {}.", kafkaTopic); - AdminUtils.deleteTopic(zkUtils(), kafkaTopic); + if (zookeeper().zookeeper().isRunning()) { + for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) { + LOGGER.info("Deleting kafka topic: {}.", kafkaTopic); + AdminUtils.deleteTopic(zkUtils(), kafkaTopic); + } + zkUtils.close(); + super.tearDown(); } - zkUtils.close(); - super.tearDown(); } private void publishKafkaEvents(String topic, int numEvents, String streamProcessorId) { @@ -175,6 +180,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne .put(String.format("systems.%s.samza.factory", systemName), TEST_SYSTEM_FACTORY) .put(JobConfig.JOB_NAME(), TEST_JOB_NAME) .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS) + .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS) .build(); Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig); applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true)); @@ -255,10 +261,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne @Test public void shouldReElectLeaderWhenLeaderDies() throws InterruptedException { // Set up kafka topics. - publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); + publishKafkaEvents(inputKafkaTopic, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); // Create stream applications. - CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS); + CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(2 * NUM_KAFKA_EVENTS); CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); CountDownLatch processedMessagesLatch3 = new CountDownLatch(1); @@ -278,11 +284,11 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne processedMessagesLatch3.await(); // Verifications before killing the leader. - JobModel jobModel = zkUtils.getJobModel(zkUtils.getJobModelVersion()); - String prevJobModelVersion = zkUtils.getJobModelVersion(); + String jobModelVersion = zkUtils.getJobModelVersion(); + JobModel jobModel = zkUtils.getJobModel(jobModelVersion); assertEquals(3, jobModel.getContainers().size()); assertEquals(Sets.newHashSet("0000000000", "0000000001", "0000000002"), jobModel.getContainers().keySet()); - assertEquals("1", prevJobModelVersion); + assertEquals("1", jobModelVersion); List<String> processorIdsFromZK = zkUtils.getActiveProcessorsIDs(Arrays.asList(PROCESSOR_IDS)); @@ -291,6 +297,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Kill the leader. Since streamApp1 is the first to join the cluster, it's the leader. applicationRunner1.kill(streamApp1); + applicationRunner1.waitForFinish(); kafkaEventsConsumedLatch.await(); // Verifications after killing the leader. @@ -298,11 +305,11 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne processorIdsFromZK = zkUtils.getActiveProcessorsIDs(ImmutableList.of(PROCESSOR_IDS[1], PROCESSOR_IDS[2])); assertEquals(2, processorIdsFromZK.size()); assertEquals(PROCESSOR_IDS[1], processorIdsFromZK.get(0)); - jobModel = zkUtils.getJobModel(zkUtils.getJobModelVersion()); + jobModelVersion = zkUtils.getJobModelVersion(); + assertEquals("2", jobModelVersion); + jobModel = zkUtils.getJobModel(jobModelVersion); assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet()); - String currentJobModelVersion = zkUtils.getJobModelVersion(); assertEquals(2, jobModel.getContainers().size()); - assertEquals("2", currentJobModelVersion); } @Test @@ -337,34 +344,22 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne applicationRunner3.run(streamApp3); } - // Depends upon SAMZA-1302 - // @Test(expected = Exception.class) - public void shouldKillStreamAppWhenZooKeeperDiesBeforeLeaderReElection() throws InterruptedException { - // Create StreamApplications. - StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, null, null); - StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, null, null); - - // Run stream applications. - applicationRunner1.run(streamApp1); - applicationRunner2.run(streamApp2); - - applicationRunner1.kill(streamApp1); - applicationRunner1.waitForFinish(); - assertEquals(ApplicationStatus.SuccessfulFinish, applicationRunner1.status(streamApp2)); + @Test + public void testRollingUpgradeOfStreamApplicationsShouldGenerateSameJobModel() throws Exception { + // Set up kafka topics. + publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); - // Kill zookeeper server and expect job model regeneration and ZK fencing will fail with exception. - zookeeper().shutdown(); + /** + * Custom listeners can't be plugged in for transition events(generatingNewJobModel, waitingForProcessors, waitingForBarrierCompletion etc) from zkJobCoordinator. Only possible listeners + * are for ZkJobCoordinator output(onNewJobModelConfirmed, onNewJobModelAvailable). Increasing DefaultDebounceTime to make sure that streamApplication dies & rejoins before expiry. + */ + Map<String, String> debounceTimeConfig = ImmutableMap.of(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000"); - applicationRunner1 = new LocalApplicationRunner(applicationConfig1); - applicationRunner1.run(streamApp1); - // This line should throw exception since Zookeeper is unavailable. - applicationRunner1.waitForFinish(); - } + Config applicationConfig1 = new MapConfig(ImmutableList.of(buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[0], testStreamAppName, testStreamAppId), debounceTimeConfig)); + Config applicationConfig2 = new MapConfig(ImmutableList.of(buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[1], testStreamAppName, testStreamAppId), debounceTimeConfig)); - // Depends upon SAMZA-1302 - // @Test - public void testRollingUpgrade() throws Exception { - publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); + LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1); + LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2); List<TestKafkaEvent> messagesProcessed = new ArrayList<>(); StreamApplicationCallback streamApplicationCallback = messagesProcessed::add; @@ -393,37 +388,65 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne int lastProcessedMessageId = -1; for (TestKafkaEvent message : messagesProcessed) { - lastProcessedMessageId = Math.max(lastProcessedMessageId, Integer.parseInt(message.getEventId())); + lastProcessedMessageId = Math.max(lastProcessedMessageId, Integer.parseInt(message.getEventData())); } + messagesProcessed.clear(); LocalApplicationRunner applicationRunner4 = new LocalApplicationRunner(applicationConfig1); + processedMessagesLatch1 = new CountDownLatch(1); + publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); + streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch); applicationRunner4.run(streamApp1); - applicationRunner4.waitForFinish(); - // Kill both the stream applications. - applicationRunner4.kill(streamApp1); - applicationRunner4.waitForFinish(); - applicationRunner2.kill(streamApp2); - applicationRunner2.waitForFinish(); + processedMessagesLatch1.await(); // Read new job model after rolling upgrade. String newJobModelVersion = zkUtils.getJobModelVersion(); JobModel newJobModel = zkUtils.getJobModel(newJobModelVersion); // This should be continuation of last processed message. - int nextSeenMessageId = Integer.parseInt(messagesProcessed.get(0).getEventId()); - assertTrue(lastProcessedMessageId < nextSeenMessageId); + int nextSeenMessageId = Integer.parseInt(messagesProcessed.get(0).getEventData()); + assertTrue(lastProcessedMessageId <= nextSeenMessageId); + assertEquals(Integer.parseInt(jobModelVersion) + 1, Integer.parseInt(newJobModelVersion)); + assertEquals(jobModel.getContainers(), newJobModel.getContainers()); + } - // Assertions on job model read from zookeeper. - assertFalse(newJobModelVersion.equals(jobModelVersion)); - assertEquals(jobModel, newJobModel); - assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet()); - String currentJobModelVersion = zkUtils.getJobModelVersion(); - List<String> processorIdsFromZK = zkUtils.getActiveProcessorsIDs(Arrays.asList(PROCESSOR_IDS)); - assertEquals(3, processorIdsFromZK.size()); - assertEquals(ImmutableList.of(PROCESSOR_IDS[1], PROCESSOR_IDS[2]), processorIdsFromZK); - assertEquals(2, jobModel.getContainers().size()); - assertEquals("2", currentJobModelVersion); + @Test + public void shouldKillStreamAppWhenZooKeeperDiesBeforeLeaderReElection() throws InterruptedException { + // Set up kafka topics. + publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); + + MapConfig kafkaProducerConfig = new MapConfig(ImmutableMap.of(String.format("systems.%s.producer.%s", TEST_SYSTEM, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG), "1000")); + MapConfig applicationRunnerConfig1 = new MapConfig(ImmutableList.of(applicationConfig1, kafkaProducerConfig)); + MapConfig applicationRunnerConfig2 = new MapConfig(ImmutableList.of(applicationConfig2, kafkaProducerConfig)); + LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationRunnerConfig1); + LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationRunnerConfig2); + + CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); + CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); + + // Create StreamApplications. + StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, null); + StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, null); + + // Run stream applications. + applicationRunner1.run(streamApp1); + applicationRunner2.run(streamApp2); + + processedMessagesLatch1.await(); + processedMessagesLatch2.await(); + + // Non daemon thread in brokers reconnect repeatedly to zookeeper on failures. Manually shutting them down. + List<KafkaServer> kafkaServers = JavaConverters.bufferAsJavaListConverter(this.servers()).asJava(); + kafkaServers.forEach(KafkaServer::shutdown); + + zookeeper().shutdown(); + + applicationRunner1.waitForFinish(); + applicationRunner2.waitForFinish(); + + assertEquals(ApplicationStatus.UnsuccessfulFinish, applicationRunner1.status(streamApp1)); + assertEquals(ApplicationStatus.UnsuccessfulFinish, applicationRunner2.status(streamApp2)); } public interface StreamApplicationCallback {