SAMZA-1304: Handling duplicate stream processor registration. When a stream processor registers with same processorId as already existing processor in processor group, it's registration should fail.
Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Navina Ramesh <nav...@apache.org>, Jagadish V <jvenk...@linkedin.com> Closes #240 from shanthoosh/standalone_duplicate_processor_fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/45931fd5 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/45931fd5 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/45931fd5 Branch: refs/heads/0.14.0 Commit: 45931fd5d4406f731013b39d10fbfd577d6ac6a4 Parents: ebb1b7f Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Thu Jul 13 16:19:56 2017 -0700 Committer: navina <nav...@apache.org> Committed: Thu Jul 13 16:19:56 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/samza/zk/ProcessorData.java | 18 ++- .../org/apache/samza/zk/ZkJobCoordinator.java | 10 +- .../main/java/org/apache/samza/zk/ZkUtils.java | 117 ++++++++++++++++--- .../java/org/apache/samza/zk/TestZkUtils.java | 36 +++++- .../processor/TestZkLocalApplicationRunner.java | 13 ++- 5 files changed, 169 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/45931fd5/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java index 3f4fd0b..a48a450 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java @@ -19,9 +19,12 @@ package org.apache.samza.zk; +import java.util.Objects; import org.apache.samza.SamzaException; - +/** + * Represents processor data stored in zookeeper processors node. + */ public class ProcessorData { private final String processorId; private final String host; @@ -51,4 +54,17 @@ public class ProcessorData { public String getProcessorId() { return processorId; } + + @Override + public int hashCode() { + return Objects.hash(processorId, host); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + final ProcessorData other = (ProcessorData) obj; + return Objects.equals(processorId, other.processorId) && Objects.equals(host, other.host); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/45931fd5/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 94c3054..8ca26c8 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -20,8 +20,10 @@ package org.apache.samza.zk; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; @@ -162,8 +164,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { void doOnProcessorChange(List<String> processors) { // if list of processors is empty - it means we are called from 'onBecomeLeader' - // TODO: Handle empty currentProcessorIds or duplicate processorIds in the list + // TODO: Handle empty currentProcessorIds. List<String> currentProcessorIds = getActualProcessorIds(processors); + Set<String> uniqueProcessorIds = new HashSet<String>(currentProcessorIds); + + if (currentProcessorIds.size() != uniqueProcessorIds.size()) { + LOG.info("Processors: {} has duplicates. Not generating job model.", currentProcessorIds); + return; + } // Generate the JobModel JobModel jobModel = generateNewJobModel(currentProcessorIds); http://git-wip-us.apache.org/repos/asf/samza/blob/45931fd5/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index ecf118b..7406cf5 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -23,11 +23,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.samza.SamzaException; import org.apache.samza.job.model.JobModel; @@ -83,10 +85,6 @@ public class ZkUtils { } } - public static ZkConnection createZkConnection(String zkConnectString, int sessionTimeoutMs) { - return new ZkConnection(zkConnectString, sessionTimeoutMs); - } - ZkClient getZkClient() { return zkClient; } @@ -105,17 +103,69 @@ public class ZkUtils { * @return String representing the absolute ephemeralPath of this client in the current session */ public synchronized String registerProcessorAndGetId(final ProcessorData data) { + String processorId = data.getProcessorId(); if (ephemeralPath == null) { - ephemeralPath = - zkClient.createEphemeralSequential( - keyBuilder.getProcessorsPath() + "/", data.toString()); - - LOG.info("newly generated path for " + data + " is " + ephemeralPath); - return ephemeralPath; + ephemeralPath = zkClient.createEphemeralSequential(keyBuilder.getProcessorsPath() + "/", data.toString()); + LOG.info("Created ephemeral path: {} for processor: {} in zookeeper.", ephemeralPath, data); + ProcessorNode processorNode = new ProcessorNode(data, ephemeralPath); + // Determine if there are duplicate processors with this.processorId after registration. + if (!isValidRegisteredProcessor(processorNode)) { + LOG.info("Processor: {} is duplicate. Deleting zookeeper node at path: {}.", processorId, ephemeralPath); + zkClient.delete(ephemeralPath); + throw new SamzaException(String.format("Processor: %s is duplicate in the group. Registration failed.", processorId)); + } } else { - LOG.info("existing path for " + data + " is " + ephemeralPath); - return ephemeralPath; + LOG.info("Ephemeral path: {} exists for processor: {} in zookeeper.", ephemeralPath, data); } + return ephemeralPath; + } + + /** + * Determines the validity of processor registered with zookeeper. + * + * If there are multiple processors registered with same processorId, + * the processor with lexicographically smallest zookeeperPath is considered valid + * and all the remaining processors are invalid. + * + * Two processors will not have smallest zookeeperPath because of sequentialId guarantees + * of zookeeper for ephemeral nodes. + * + * @param processor to check for validity condition in processors group. + * @return true if the processor is valid. false otherwise. + */ + private boolean isValidRegisteredProcessor(final ProcessorNode processor) { + String processorId = processor.getProcessorData().getProcessorId(); + List<ProcessorNode> processorNodes = getAllProcessorNodes().stream() + .filter(processorNode -> processorNode.processorData.getProcessorId().equals(processorId)) + .collect(Collectors.toList()); + // Check for duplicate processor condition(if more than one processor exist for this processorId). + if (processorNodes.size() > 1) { + // There exists more than processor for provided `processorId`. + LOG.debug("Processor nodes in zookeeper: {} for processorId: {}.", processorNodes, processorId); + // Get all ephemeral processor paths + TreeSet<String> sortedProcessorPaths = processorNodes.stream() + .map(ProcessorNode::getEphemeralPath) + .collect(Collectors.toCollection(TreeSet::new)); + // Check if smallest path is equal to this processor's ephemeralPath. + return sortedProcessorPaths.first().equals(processor.getEphemeralPath()); + } + // There're no duplicate processors. This is a valid registered processor. + return true; + } + + /** + * Fetches all the ephemeral processor nodes of a standalone job from zookeeper. + * @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor. + */ + private List<ProcessorNode> getAllProcessorNodes() { + List<String> processorZNodes = getSortedActiveProcessorsZnodes(); + LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes); + return processorZNodes.stream() + .map(processorZNode -> { + String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode); + String data = readProcessorData(ephemeralProcessorPath); + return new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath); + }).collect(Collectors.toList()); } /** @@ -321,4 +371,45 @@ public class ZkUtils { zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener); metrics.subscriptions.inc(); } + + /** + * Represents zookeeper processor node. + */ + private static class ProcessorNode { + private final ProcessorData processorData; + + // Ex: /test/processors/0000000000 + private final String ephemeralProcessorPath; + + ProcessorNode(ProcessorData processorData, String ephemeralProcessorPath) { + this.processorData = processorData; + this.ephemeralProcessorPath = ephemeralProcessorPath; + } + + ProcessorData getProcessorData() { + return processorData; + } + + String getEphemeralPath() { + return ephemeralProcessorPath; + } + + @Override + public String toString() { + return String.format("[ProcessorData: %s, ephemeralProcessorPath: %s]", processorData, ephemeralProcessorPath); + } + + @Override + public int hashCode() { + return Objects.hash(processorData, ephemeralProcessorPath); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + final ProcessorNode other = (ProcessorNode) obj; + return Objects.equals(processorData, other.processorData) && Objects.equals(ephemeralProcessorPath, other.ephemeralProcessorPath); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/45931fd5/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index a33bf03..e7a9aa2 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -37,6 +37,8 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.ExpectedException; import org.junit.Test; public class TestZkUtils { @@ -47,6 +49,10 @@ public class TestZkUtils { private static final int CONNECTION_TIMEOUT_MS = 10000; private ZkUtils zkUtils; + @Rule + // Declared public to honor junit contract. + public final ExpectedException expectedException = ExpectedException.none(); + @BeforeClass public static void setup() throws InterruptedException { zkServer = new EmbeddedZookeeper(); @@ -68,10 +74,7 @@ public class TestZkUtils { // Do nothing } - zkUtils = new ZkUtils( - KEY_BUILDER, - zkClient, - SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); + zkUtils = getZkUtils(); zkUtils.connect(); } @@ -82,6 +85,11 @@ public class TestZkUtils { zkClient.close(); } + private ZkUtils getZkUtils() { + return new ZkUtils(KEY_BUILDER, zkClient, + SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); + } + @AfterClass public static void teardown() { zkServer.teardown(); @@ -174,6 +182,26 @@ public class TestZkUtils { Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000)); } + /** + * Create two duplicate processors with same processorId. + * Second creation should fail with exception. + */ + @Test + public void testRegisterProcessorAndGetIdShouldFailForDuplicateProcessorRegistration() { + final String testHostName = "localhost"; + final String testProcessId = "testProcessorId"; + ProcessorData processorData1 = new ProcessorData(testHostName, testProcessId); + // Register processor 1 which is not duplicate, this registration should succeed. + zkUtils.registerProcessorAndGetId(processorData1); + + ZkUtils zkUtils1 = getZkUtils(); + zkUtils1.connect(); + ProcessorData duplicateProcessorData = new ProcessorData(testHostName, testProcessId); + // Registration of the duplicate processor should fail. + expectedException.expect(SamzaException.class); + zkUtils1.registerProcessorAndGetId(duplicateProcessorData); + } + @Test public void testPublishNewJobModel() { ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test"); http://git-wip-us.apache.org/repos/asf/samza/blob/45931fd5/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index 2d5da2b..77e2a49 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -58,6 +58,7 @@ import org.apache.samza.zk.ZkKeyBuilder; import org.apache.samza.zk.ZkUtils; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,6 +99,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne @Rule public Timeout testTimeOutInMillis = new Timeout(90000); + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + @Override public void setUp() { super.setUp(); @@ -224,9 +228,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertEquals("2", currentJobModelVersion); } - // Checks enforcing property that all processors should have unique Id. - // Depends upon SAMZA-1302 - // @Test(expected = Exception.class) + @Test public void shouldFailWhenNewProcessorJoinsWithSameIdAsExistingProcessor() throws InterruptedException { // Set up kafka topics. publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); @@ -253,10 +255,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[2]); kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS); StreamApplication streamApp3 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch); + // Fail when the duplicate processor joins. + expectedException.expect(SamzaException.class); applicationRunner3.run(streamApp3); - - // The following line should throw up by handling duplicate processorId registration. - kafkaEventsConsumedLatch.await(); } // Depends upon SAMZA-1302