SAMZA-1304: Handling duplicate stream processor registration.

When a stream processor registers with same processorId as already existing
processor in processor group, it's registration should fail.

Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>

Reviewers: Navina Ramesh <nav...@apache.org>, Jagadish V <jvenk...@linkedin.com>

Closes #240 from shanthoosh/standalone_duplicate_processor_fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/45931fd5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/45931fd5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/45931fd5

Branch: refs/heads/0.14.0
Commit: 45931fd5d4406f731013b39d10fbfd577d6ac6a4
Parents: ebb1b7f
Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>
Authored: Thu Jul 13 16:19:56 2017 -0700
Committer: navina <nav...@apache.org>
Committed: Thu Jul 13 16:19:56 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/samza/zk/ProcessorData.java |  18 ++-
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  10 +-
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 117 ++++++++++++++++---
 .../java/org/apache/samza/zk/TestZkUtils.java   |  36 +++++-
 .../processor/TestZkLocalApplicationRunner.java |  13 ++-
 5 files changed, 169 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/45931fd5/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java 
b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
index 3f4fd0b..a48a450 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
@@ -19,9 +19,12 @@
 
 package org.apache.samza.zk;
 
+import java.util.Objects;
 import org.apache.samza.SamzaException;
 
-
+/**
+ * Represents processor data stored in zookeeper processors node.
+ */
 public class ProcessorData {
   private final String processorId;
   private final String host;
@@ -51,4 +54,17 @@ public class ProcessorData {
   public String getProcessorId() {
     return processorId;
   }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(processorId, host);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) return false;
+    if (getClass() != obj.getClass()) return false;
+    final ProcessorData other = (ProcessorData) obj;
+    return Objects.equals(processorId, other.processorId) && 
Objects.equals(host, other.host);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/45931fd5/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 94c3054..8ca26c8 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -20,8 +20,10 @@ package org.apache.samza.zk;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
@@ -162,8 +164,14 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
 
   void doOnProcessorChange(List<String> processors) {
     // if list of processors is empty - it means we are called from 
'onBecomeLeader'
-    // TODO: Handle empty currentProcessorIds or duplicate processorIds in the 
list
+    // TODO: Handle empty currentProcessorIds.
     List<String> currentProcessorIds = getActualProcessorIds(processors);
+    Set<String> uniqueProcessorIds = new HashSet<String>(currentProcessorIds);
+
+    if (currentProcessorIds.size() != uniqueProcessorIds.size()) {
+      LOG.info("Processors: {} has duplicates. Not generating job model.", 
currentProcessorIds);
+      return;
+    }
 
     // Generate the JobModel
     JobModel jobModel = generateNewJobModel(currentProcessorIds);

http://git-wip-us.apache.org/repos/asf/samza/blob/45931fd5/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index ecf118b..7406cf5 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -23,11 +23,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.job.model.JobModel;
@@ -83,10 +85,6 @@ public class ZkUtils {
     }
   }
 
-  public static ZkConnection createZkConnection(String zkConnectString, int 
sessionTimeoutMs) {
-    return new ZkConnection(zkConnectString, sessionTimeoutMs);
-  }
-
   ZkClient getZkClient() {
     return zkClient;
   }
@@ -105,17 +103,69 @@ public class ZkUtils {
    * @return String representing the absolute ephemeralPath of this client in 
the current session
    */
   public synchronized String registerProcessorAndGetId(final ProcessorData 
data) {
+    String processorId = data.getProcessorId();
     if (ephemeralPath == null) {
-      ephemeralPath =
-          zkClient.createEphemeralSequential(
-              keyBuilder.getProcessorsPath() + "/", data.toString());
-
-      LOG.info("newly generated path for " + data +  " is " +  ephemeralPath);
-      return ephemeralPath;
+      ephemeralPath = 
zkClient.createEphemeralSequential(keyBuilder.getProcessorsPath() + "/", 
data.toString());
+      LOG.info("Created ephemeral path: {} for processor: {} in zookeeper.", 
ephemeralPath, data);
+      ProcessorNode processorNode = new ProcessorNode(data, ephemeralPath);
+      // Determine if there are duplicate processors with this.processorId 
after registration.
+      if (!isValidRegisteredProcessor(processorNode)) {
+        LOG.info("Processor: {} is duplicate. Deleting zookeeper node at path: 
{}.", processorId, ephemeralPath);
+        zkClient.delete(ephemeralPath);
+        throw new SamzaException(String.format("Processor: %s is duplicate in 
the group. Registration failed.", processorId));
+      }
     } else {
-      LOG.info("existing path for " + data +  " is " +  ephemeralPath);
-      return ephemeralPath;
+      LOG.info("Ephemeral path: {} exists for processor: {} in zookeeper.", 
ephemeralPath, data);
     }
+    return ephemeralPath;
+  }
+
+  /**
+   * Determines the validity of processor registered with zookeeper.
+   *
+   * If there are multiple processors registered with same processorId,
+   * the processor with lexicographically smallest zookeeperPath is considered 
valid
+   * and all the remaining processors are invalid.
+   *
+   * Two processors will not have smallest zookeeperPath because of 
sequentialId guarantees
+   * of zookeeper for ephemeral nodes.
+   *
+   * @param processor to check for validity condition in processors group.
+   * @return true if the processor is valid. false otherwise.
+   */
+  private boolean isValidRegisteredProcessor(final ProcessorNode processor) {
+    String processorId = processor.getProcessorData().getProcessorId();
+    List<ProcessorNode> processorNodes = getAllProcessorNodes().stream()
+                                                               
.filter(processorNode -> 
processorNode.processorData.getProcessorId().equals(processorId))
+                                                               
.collect(Collectors.toList());
+    // Check for duplicate processor condition(if more than one processor 
exist for this processorId).
+    if (processorNodes.size() > 1) {
+      // There exists more than processor for provided `processorId`.
+      LOG.debug("Processor nodes in zookeeper: {} for processorId: {}.", 
processorNodes, processorId);
+      // Get all ephemeral processor paths
+      TreeSet<String> sortedProcessorPaths = processorNodes.stream()
+                                                           
.map(ProcessorNode::getEphemeralPath)
+                                                           
.collect(Collectors.toCollection(TreeSet::new));
+      // Check if smallest path is equal to this processor's ephemeralPath.
+      return sortedProcessorPaths.first().equals(processor.getEphemeralPath());
+    }
+    // There're no duplicate processors. This is a valid registered processor.
+    return true;
+  }
+
+  /**
+   * Fetches all the ephemeral processor nodes of a standalone job from 
zookeeper.
+   * @return a list of {@link ProcessorNode}, where each ProcessorNode 
represents a registered stream processor.
+   */
+  private List<ProcessorNode> getAllProcessorNodes() {
+    List<String> processorZNodes = getSortedActiveProcessorsZnodes();
+    LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
+    return processorZNodes.stream()
+                          .map(processorZNode -> {
+                              String ephemeralProcessorPath = 
String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
+                              String data = 
readProcessorData(ephemeralProcessorPath);
+                              return new ProcessorNode(new 
ProcessorData(data), ephemeralProcessorPath);
+                            }).collect(Collectors.toList());
   }
 
   /**
@@ -321,4 +371,45 @@ public class ZkUtils {
     zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
     metrics.subscriptions.inc();
   }
+
+  /**
+   * Represents zookeeper processor node.
+   */
+  private static class ProcessorNode {
+    private final ProcessorData processorData;
+
+    // Ex: /test/processors/0000000000
+    private final String ephemeralProcessorPath;
+
+    ProcessorNode(ProcessorData processorData, String ephemeralProcessorPath) {
+      this.processorData = processorData;
+      this.ephemeralProcessorPath = ephemeralProcessorPath;
+    }
+
+    ProcessorData getProcessorData() {
+      return processorData;
+    }
+
+    String getEphemeralPath() {
+      return ephemeralProcessorPath;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("[ProcessorData: %s, ephemeralProcessorPath: %s]", 
processorData, ephemeralProcessorPath);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(processorData, ephemeralProcessorPath);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null) return false;
+      if (getClass() != obj.getClass()) return false;
+      final ProcessorNode other = (ProcessorNode) obj;
+      return Objects.equals(processorData, other.processorData) && 
Objects.equals(ephemeralProcessorPath, other.ephemeralProcessorPath);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/45931fd5/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index a33bf03..e7a9aa2 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -37,6 +37,8 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
 import org.junit.Test;
 
 public class TestZkUtils {
@@ -47,6 +49,10 @@ public class TestZkUtils {
   private static final int CONNECTION_TIMEOUT_MS = 10000;
   private ZkUtils zkUtils;
 
+  @Rule
+  // Declared public to honor junit contract.
+  public final ExpectedException expectedException = ExpectedException.none();
+
   @BeforeClass
   public static void setup() throws InterruptedException {
     zkServer = new EmbeddedZookeeper();
@@ -68,10 +74,7 @@ public class TestZkUtils {
       // Do nothing
     }
 
-    zkUtils = new ZkUtils(
-        KEY_BUILDER,
-        zkClient,
-        SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+    zkUtils = getZkUtils();
 
     zkUtils.connect();
   }
@@ -82,6 +85,11 @@ public class TestZkUtils {
     zkClient.close();
   }
 
+  private ZkUtils getZkUtils() {
+    return new ZkUtils(KEY_BUILDER, zkClient,
+                       SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+  }
+
   @AfterClass
   public static void teardown() {
     zkServer.teardown();
@@ -174,6 +182,26 @@ public class TestZkUtils {
     Assert.assertTrue(testWithDelayBackOff(() -> 
"newProcessor".equals(res.getRes()), 2, 1000));
   }
 
+  /**
+   * Create two duplicate processors with same processorId.
+   * Second creation should fail with exception.
+   */
+  @Test
+  public void 
testRegisterProcessorAndGetIdShouldFailForDuplicateProcessorRegistration() {
+    final String testHostName = "localhost";
+    final String testProcessId = "testProcessorId";
+    ProcessorData processorData1 = new ProcessorData(testHostName, 
testProcessId);
+    // Register processor 1 which is not duplicate, this registration should 
succeed.
+    zkUtils.registerProcessorAndGetId(processorData1);
+
+    ZkUtils zkUtils1 = getZkUtils();
+    zkUtils1.connect();
+    ProcessorData duplicateProcessorData = new ProcessorData(testHostName, 
testProcessId);
+    // Registration of the duplicate processor should fail.
+    expectedException.expect(SamzaException.class);
+    zkUtils1.registerProcessorAndGetId(duplicateProcessorData);
+  }
+
   @Test
   public void testPublishNewJobModel() {
     ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");

http://git-wip-us.apache.org/repos/asf/samza/blob/45931fd5/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 2d5da2b..77e2a49 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -58,6 +58,7 @@ import org.apache.samza.zk.ZkKeyBuilder;
 import org.apache.samza.zk.ZkUtils;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,6 +99,9 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
   @Rule
   public Timeout testTimeOutInMillis = new Timeout(90000);
 
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
   @Override
   public void setUp() {
     super.setUp();
@@ -224,9 +228,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     assertEquals("2", currentJobModelVersion);
   }
 
-  // Checks enforcing property that all processors should have unique Id.
-  // Depends upon SAMZA-1302
-  // @Test(expected = Exception.class)
+  @Test
   public void shouldFailWhenNewProcessorJoinsWithSameIdAsExistingProcessor() 
throws InterruptedException {
     // Set up kafka topics.
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
@@ -253,10 +255,9 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[2]);
     kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
     StreamApplication streamApp3 = new TestStreamApplication(inputKafkaTopic, 
outputKafkaTopic, null, null, kafkaEventsConsumedLatch);
+    // Fail when the duplicate processor joins.
+    expectedException.expect(SamzaException.class);
     applicationRunner3.run(streamApp3);
-
-    // The following line should throw up by handling duplicate processorId 
registration.
-    kafkaEventsConsumedLatch.await();
   }
 
   // Depends upon SAMZA-1302

Reply via email to