Reenable LocalZkApplicationRunner tests.

Add back commented out ZkLocalApplicationRunner tests(Was dependent upon error 
propagation from SamzaContainer to LocalApplicationRunner).

Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>

Reviewers: Navina Ramesh <nav...@apache.org>

Closes #250 from shanthoosh/fix_broken_tests


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cf5efe76
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cf5efe76
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cf5efe76

Branch: refs/heads/0.14.0
Commit: cf5efe761d0d901e8a98d195fba9a6778879d0fc
Parents: 966730e
Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>
Authored: Fri Aug 4 11:49:29 2017 -0700
Committer: navina <nav...@apache.org>
Committed: Fri Aug 4 11:49:29 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/zk/TestZkJobCoordinator.java   |   1 -
 .../processor/TestZkLocalApplicationRunner.java | 145 +++++++++++--------
 2 files changed, 84 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/cf5efe76/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index fd6065a..117d458 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -44,7 +44,6 @@ public class TestZkJobCoordinator {
     ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new 
MapConfig(), new NoOpMetricsRegistry(), zkUtils));
     zkJobCoordinator.onNewJobModelAvailable(TEST_JOB_MODEL_VERSION);
 
-    Mockito.doNothing().when(zkJobCoordinator).stop();
     Mockito.verify(zkJobCoordinator, Mockito.atMost(1)).stop();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/cf5efe76/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index cf0a242..9ca48ad 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -33,9 +33,11 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import kafka.admin.AdminUtils;
+import kafka.server.KafkaServer;
 import kafka.utils.TestUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
@@ -64,6 +66,7 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
 
 import static org.junit.Assert.*;
 
@@ -78,14 +81,15 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TestZkLocalApplicationRunner.class);
 
   private static final int NUM_KAFKA_EVENTS = 300;
-  private static final int ZK_CONNECTION_TIMEOUT_MS = 10000;
+  private static final int ZK_CONNECTION_TIMEOUT_MS = 100;
   private static final String TEST_SYSTEM = "TestSystemName";
   private static final String TEST_SSP_GROUPER_FACTORY = 
"org.apache.samza.container.grouper.stream.GroupByPartitionFactory";
   private static final String TEST_TASK_GROUPER_FACTORY = 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory";
   private static final String TEST_JOB_COORDINATOR_FACTORY = 
"org.apache.samza.zk.ZkJobCoordinatorFactory";
   private static final String TEST_SYSTEM_FACTORY = 
"org.apache.samza.system.kafka.KafkaSystemFactory";
   private static final String TEST_JOB_NAME = "test-job";
-  private static final String TASK_SHUTDOWN_MS = "5000";
+  private static final String TASK_SHUTDOWN_MS = "3000";
+  private static final String JOB_DEBOUNCE_TIME_MS = "1000";
   private static final String[] PROCESSOR_IDS = new String[] {"0000000000", 
"0000000001", "0000000002"};
 
   private String inputKafkaTopic;
@@ -100,9 +104,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
   private String testStreamAppName;
   private String testStreamAppId;
 
-  // Set 90 seconds as max execution time for each test.
   @Rule
-  public Timeout testTimeOutInMillis = new Timeout(90000);
+  public Timeout testTimeOutInMillis = new Timeout(120000);
 
   @Rule
   public final ExpectedException expectedException = ExpectedException.none();
@@ -138,12 +141,14 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
 
   @Override
   public void tearDown() {
-    for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, 
outputKafkaTopic)) {
-      LOGGER.info("Deleting kafka topic: {}.", kafkaTopic);
-      AdminUtils.deleteTopic(zkUtils(), kafkaTopic);
+    if (zookeeper().zookeeper().isRunning()) {
+      for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, 
outputKafkaTopic)) {
+        LOGGER.info("Deleting kafka topic: {}.", kafkaTopic);
+        AdminUtils.deleteTopic(zkUtils(), kafkaTopic);
+      }
+      zkUtils.close();
+      super.tearDown();
     }
-    zkUtils.close();
-    super.tearDown();
   }
 
   private void publishKafkaEvents(String topic, int numEvents, String 
streamProcessorId) {
@@ -175,6 +180,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
         .put(String.format("systems.%s.samza.factory", systemName), 
TEST_SYSTEM_FACTORY)
         .put(JobConfig.JOB_NAME(), TEST_JOB_NAME)
         .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
+        .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
         .build();
     Map<String, String> applicationConfig = 
Maps.newHashMap(samzaContainerConfig);
     
applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, 
bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, 
true));
@@ -255,10 +261,10 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
   @Test
   public void shouldReElectLeaderWhenLeaderDies() throws InterruptedException {
     // Set up kafka topics.
-    publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+    publishKafkaEvents(inputKafkaTopic, 2 * NUM_KAFKA_EVENTS, 
PROCESSOR_IDS[0]);
 
     // Create stream applications.
-    CountDownLatch kafkaEventsConsumedLatch = new 
CountDownLatch(NUM_KAFKA_EVENTS);
+    CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(2 * 
NUM_KAFKA_EVENTS);
     CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
@@ -278,11 +284,11 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     processedMessagesLatch3.await();
 
     // Verifications before killing the leader.
-    JobModel jobModel = zkUtils.getJobModel(zkUtils.getJobModelVersion());
-    String prevJobModelVersion = zkUtils.getJobModelVersion();
+    String jobModelVersion = zkUtils.getJobModelVersion();
+    JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
     assertEquals(3, jobModel.getContainers().size());
     assertEquals(Sets.newHashSet("0000000000", "0000000001", "0000000002"), 
jobModel.getContainers().keySet());
-    assertEquals("1", prevJobModelVersion);
+    assertEquals("1", jobModelVersion);
 
     List<String> processorIdsFromZK = 
zkUtils.getActiveProcessorsIDs(Arrays.asList(PROCESSOR_IDS));
 
@@ -291,6 +297,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
 
     // Kill the leader. Since streamApp1 is the first to join the cluster, 
it's the leader.
     applicationRunner1.kill(streamApp1);
+    applicationRunner1.waitForFinish();
     kafkaEventsConsumedLatch.await();
 
     // Verifications after killing the leader.
@@ -298,11 +305,11 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     processorIdsFromZK = 
zkUtils.getActiveProcessorsIDs(ImmutableList.of(PROCESSOR_IDS[1], 
PROCESSOR_IDS[2]));
     assertEquals(2, processorIdsFromZK.size());
     assertEquals(PROCESSOR_IDS[1], processorIdsFromZK.get(0));
-    jobModel = zkUtils.getJobModel(zkUtils.getJobModelVersion());
+    jobModelVersion = zkUtils.getJobModelVersion();
+    assertEquals("2", jobModelVersion);
+    jobModel = zkUtils.getJobModel(jobModelVersion);
     assertEquals(Sets.newHashSet("0000000001", "0000000002"), 
jobModel.getContainers().keySet());
-    String currentJobModelVersion = zkUtils.getJobModelVersion();
     assertEquals(2, jobModel.getContainers().size());
-    assertEquals("2", currentJobModelVersion);
   }
 
   @Test
@@ -337,34 +344,22 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     applicationRunner3.run(streamApp3);
   }
 
-  // Depends upon SAMZA-1302
-  // @Test(expected = Exception.class)
-  public void shouldKillStreamAppWhenZooKeeperDiesBeforeLeaderReElection() 
throws InterruptedException {
-    // Create StreamApplications.
-    StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, 
outputKafkaTopic, null, null, null);
-    StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, 
outputKafkaTopic, null, null, null);
-
-    // Run stream applications.
-    applicationRunner1.run(streamApp1);
-    applicationRunner2.run(streamApp2);
-
-    applicationRunner1.kill(streamApp1);
-    applicationRunner1.waitForFinish();
-    assertEquals(ApplicationStatus.SuccessfulFinish, 
applicationRunner1.status(streamApp2));
+  @Test
+  public void 
testRollingUpgradeOfStreamApplicationsShouldGenerateSameJobModel() throws 
Exception {
+    // Set up kafka topics.
+    publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
 
-    // Kill zookeeper server and expect job model regeneration and ZK fencing 
will fail with exception.
-    zookeeper().shutdown();
+    /**
+     * Custom listeners can't be plugged in for transition 
events(generatingNewJobModel, waitingForProcessors, waitingForBarrierCompletion 
etc) from zkJobCoordinator. Only possible listeners
+     * are for ZkJobCoordinator output(onNewJobModelConfirmed, 
onNewJobModelAvailable). Increasing DefaultDebounceTime to make sure that 
streamApplication dies & rejoins before expiry.
+     */
+    Map<String, String> debounceTimeConfig = 
ImmutableMap.of(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000");
 
-    applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
-    applicationRunner1.run(streamApp1);
-    // This line should throw exception since Zookeeper is unavailable.
-    applicationRunner1.waitForFinish();
-  }
+    Config applicationConfig1 = new 
MapConfig(ImmutableList.of(buildStreamApplicationConfig(TEST_SYSTEM, 
inputKafkaTopic, PROCESSOR_IDS[0], testStreamAppName, testStreamAppId), 
debounceTimeConfig));
+    Config applicationConfig2 = new 
MapConfig(ImmutableList.of(buildStreamApplicationConfig(TEST_SYSTEM, 
inputKafkaTopic, PROCESSOR_IDS[1], testStreamAppName, testStreamAppId), 
debounceTimeConfig));
 
-  // Depends upon SAMZA-1302
-  // @Test
-  public void testRollingUpgrade() throws Exception {
-    publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+    LocalApplicationRunner applicationRunner1 = new 
LocalApplicationRunner(applicationConfig1);
+    LocalApplicationRunner applicationRunner2 = new 
LocalApplicationRunner(applicationConfig2);
 
     List<TestKafkaEvent> messagesProcessed = new ArrayList<>();
     StreamApplicationCallback streamApplicationCallback = 
messagesProcessed::add;
@@ -393,37 +388,65 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
 
     int lastProcessedMessageId = -1;
     for (TestKafkaEvent message : messagesProcessed) {
-      lastProcessedMessageId = Math.max(lastProcessedMessageId, 
Integer.parseInt(message.getEventId()));
+      lastProcessedMessageId = Math.max(lastProcessedMessageId, 
Integer.parseInt(message.getEventData()));
     }
+    messagesProcessed.clear();
 
     LocalApplicationRunner applicationRunner4 = new 
LocalApplicationRunner(applicationConfig1);
+    processedMessagesLatch1 = new CountDownLatch(1);
+    publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+    streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch);
     applicationRunner4.run(streamApp1);
-    applicationRunner4.waitForFinish();
 
-    // Kill both the stream applications.
-    applicationRunner4.kill(streamApp1);
-    applicationRunner4.waitForFinish();
-    applicationRunner2.kill(streamApp2);
-    applicationRunner2.waitForFinish();
+    processedMessagesLatch1.await();
 
     // Read new job model after rolling upgrade.
     String newJobModelVersion = zkUtils.getJobModelVersion();
     JobModel newJobModel = zkUtils.getJobModel(newJobModelVersion);
 
     // This should be continuation of last processed message.
-    int nextSeenMessageId = 
Integer.parseInt(messagesProcessed.get(0).getEventId());
-    assertTrue(lastProcessedMessageId < nextSeenMessageId);
+    int nextSeenMessageId = 
Integer.parseInt(messagesProcessed.get(0).getEventData());
+    assertTrue(lastProcessedMessageId <= nextSeenMessageId);
+    assertEquals(Integer.parseInt(jobModelVersion) + 1, 
Integer.parseInt(newJobModelVersion));
+    assertEquals(jobModel.getContainers(), newJobModel.getContainers());
+  }
 
-    // Assertions on job model read from zookeeper.
-    assertFalse(newJobModelVersion.equals(jobModelVersion));
-    assertEquals(jobModel, newJobModel);
-    assertEquals(Sets.newHashSet("0000000001", "0000000002"), 
jobModel.getContainers().keySet());
-    String currentJobModelVersion = zkUtils.getJobModelVersion();
-    List<String> processorIdsFromZK = 
zkUtils.getActiveProcessorsIDs(Arrays.asList(PROCESSOR_IDS));
-    assertEquals(3, processorIdsFromZK.size());
-    assertEquals(ImmutableList.of(PROCESSOR_IDS[1], PROCESSOR_IDS[2]), 
processorIdsFromZK);
-    assertEquals(2, jobModel.getContainers().size());
-    assertEquals("2", currentJobModelVersion);
+  @Test
+  public void shouldKillStreamAppWhenZooKeeperDiesBeforeLeaderReElection() 
throws InterruptedException {
+    // Set up kafka topics.
+    publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+    MapConfig kafkaProducerConfig = new 
MapConfig(ImmutableMap.of(String.format("systems.%s.producer.%s", TEST_SYSTEM, 
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG), "1000"));
+    MapConfig applicationRunnerConfig1 = new 
MapConfig(ImmutableList.of(applicationConfig1, kafkaProducerConfig));
+    MapConfig applicationRunnerConfig2 = new 
MapConfig(ImmutableList.of(applicationConfig2, kafkaProducerConfig));
+    LocalApplicationRunner applicationRunner1 = new 
LocalApplicationRunner(applicationRunnerConfig1);
+    LocalApplicationRunner applicationRunner2 = new 
LocalApplicationRunner(applicationRunnerConfig2);
+
+    CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+    CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
+
+    // Create StreamApplications.
+    StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, 
outputKafkaTopic, processedMessagesLatch1, null, null);
+    StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, 
outputKafkaTopic, processedMessagesLatch2, null, null);
+
+    // Run stream applications.
+    applicationRunner1.run(streamApp1);
+    applicationRunner2.run(streamApp2);
+
+    processedMessagesLatch1.await();
+    processedMessagesLatch2.await();
+
+    // Non daemon thread in brokers reconnect repeatedly to zookeeper on 
failures. Manually shutting them down.
+    List<KafkaServer> kafkaServers = 
JavaConverters.bufferAsJavaListConverter(this.servers()).asJava();
+    kafkaServers.forEach(KafkaServer::shutdown);
+
+    zookeeper().shutdown();
+
+    applicationRunner1.waitForFinish();
+    applicationRunner2.waitForFinish();
+
+    assertEquals(ApplicationStatus.UnsuccessfulFinish, 
applicationRunner1.status(streamApp1));
+    assertEquals(ApplicationStatus.UnsuccessfulFinish, 
applicationRunner2.status(streamApp2));
   }
 
   public interface StreamApplicationCallback {

Reply via email to