Repository: samza Updated Branches: refs/heads/master 6f811de30 -> a974b236a
SAMZA-1124; Job coordinator with time out If a processor doesn't join the barrier for the TimeOut time - the barrier is cancelled. All the processor should unsubscribe from it. Author: Boris Shkolnik <bshko...@bshkolni-ld1.linkedin.biz> Author: Boris Shkolnik <bor...@apache.org> Author: navina <nav...@apache.org> Reviewers: Xinyu Liu <xi...@linkedin.com> Closes #77 from sborya/JobCoordinatorWithTO and squashes the following commits: ed055dd [Boris Shkolnik] checkstyle 1468902 [Boris Shkolnik] renambed a method 579d2e7 [Boris Shkolnik] Merge branch 'JobCoordinator' into JobCoordinatorWithTO 54ab688 [Boris Shkolnik] removed JavaJobConfig.java 3f95d46 [Boris Shkolnik] checkstyle 75f9a94 [Boris Shkolnik] added test for time out b73ba32 [Boris Shkolnik] merge + test 5d67be0 [Boris Shkolnik] removed extra empty lines 9cf3c3e [Boris Shkolnik] addressed review comments c47031d [Boris Shkolnik] added timeout for ZkBarrierForVersionUpgrade 93a55a9 [Boris Shkolnik] use write directly in the barrier when all joined f89a037 [Boris Shkolnik] addressed some notes 4219720 [Boris Shkolnik] added JavaJobConfig 659ae7c [Boris Shkolnik] add ZkJobCoordinatorFactory 520a083 [Boris Shkolnik] added ZKJobCoordinatorFactory a42218e [Boris Shkolnik] checkstyle 02ca658 [Boris Shkolnik] typo c1fd0b2 [Boris Shkolnik] merge cleanup 1c8eef4 [Boris Shkolnik] merge cleanup a7e014a [Boris Shkolnik] merged b4a0642 [Boris Shkolnik] typo 33bacdd [Boris Shkolnik] merge d1582a9 [Boris Shkolnik] missed method name change 4c481a2 [Boris Shkolnik] merge b811f3d [Boris Shkolnik] changed to private final 5d43419 [Boris Shkolnik] addressed review comments 1a0c54d [Boris Shkolnik] fixed test e19d77b [Boris Shkolnik] renamed method 0c5edab [Boris Shkolnik] makey tryBecomeALeader async b34f6b7 [Boris Shkolnik] added java doc c2305b6 [Boris Shkolnik] cleanup f83fc57 [Boris Shkolnik] merge f8c8a6d [Boris Shkolnik] make a smaller PR for publish functionality only 251aad7 [Boris Shkolnik] removed unneeded interface for real 9892dee [Boris Shkolnik] removed unneeded interface f20d15f [Boris Shkolnik] some updates to JobCoordinator bd53c07 [Boris Shkolnik] deleteing already committed files 18198d1 [Boris Shkolnik] added test for zk barrier 9dba992 [Boris Shkolnik] moved the Test in to test subdir c16d864 [Boris Shkolnik] added test 817a7b6 [Boris Shkolnik] merge complete 1e5947f [Boris Shkolnik] merged 6506b48 [Boris Shkolnik] merged with latest 4290b13 [Boris Shkolnik] merged 43eb076 [Boris Shkolnik] checkstyle errors e0c44fe [Boris Shkolnik] merged 8e8d833 [Boris Shkolnik] merge e59d38c [Boris Shkolnik] merge with ZkController 6a71cf6 [Boris Shkolnik] renamed the listners 6cbcf6e [Boris Shkolnik] review comments efbee84 [Boris Shkolnik] Checkstyle errors 132300c [Boris Shkolnik] converted ZkLeaderElector.tryBecomeLeader to async method 13a05d7 [Boris Shkolnik] merge 2d59e0c [Boris Shkolnik] merge 82c819b [Boris Shkolnik] merge 7ebe9a6 [Boris Shkolnik] check style 41b2e46 [Boris Shkolnik] refactoring to match the new ZkUtils constructor b07d63a [Boris Shkolnik] merge JavaJobConfig bdc953b [Boris Shkolnik] merged 4301372 [Boris Shkolnik] added tests 4d48d83 [Boris Shkolnik] merge c9bb475 [Boris Shkolnik] added tests for ZkUtils 592e9bb [Boris Shkolnik] added missing functionality for ZkControllerImpl into zkUtils and zkKeyBuilder b473a6e [Boris Shkolnik] Added the new file ZkControllerListener.java 3412ed4 [Boris Shkolnik] Renamed ZkListener to ZkControllerListener fabddc9 [Boris Shkolnik] merge ad9108a [Boris Shkolnik] merge with ScheduleAfterDebounceTime ba583d6 [Boris Shkolnik] merge 3d6b993 [Boris Shkolnik] cleaned up fe69e70 [Boris Shkolnik] merge 7f8125b [Boris Shkolnik] Merge branch 'master' into ZkTestUtils eaf04bb [Boris Shkolnik] added more comments 358ae6b [Boris Shkolnik] Added test and addressed review comments 9b22eb6 [Boris Shkolnik] JobModelPublish 0ef90b6 [Boris Shkolnik] Merge branch 'JobModel' into JobModelPublish 9c59048 [Boris Shkolnik] merge 017fe79 [Boris Shkolnik] added tests 5c8aa20 [Boris Shkolnik] JobModel Generation using SimpleGroupByContainerCount 2c841e1 [Boris Shkolnik] added awaitStart eeb69ca [Boris Shkolnik] Merge branch 'ZkBarrier' into JobCoordinator dc26bd2 [Boris Shkolnik] added BarrierForVersionUpgrade cfdb4c7 [Boris Shkolnik] Merge branch 'ZkController' into ZkBarrier b28ba14 [Boris Shkolnik] ZkBarrier c8d26ba [Boris Shkolnik] merged efc4d03 [Boris Shkolnik] ZkController c9b3fe4 [Boris Shkolnik] Merge branch 'ScheduleAfterDebounceTime' into ZkController f0cae7b [Boris Shkolnik] Merge branch 'LeaderElector' into ZkController 3df0def [Boris Shkolnik] ZkControllerImpl 4801613 [Boris Shkolnik] cleanup d32045b [Boris Shkolnik] Merge branch 'ScheduleAfterDebounceTime1' into JobCoordinator 32f96b4 [Boris Shkolnik] added ScheduleAfterDebounce ce83409 [Boris Shkolnik] cleanup d7a7ccb [Boris Shkolnik] Merge branch 'LeaderElector' into ScheduleAfterDebounceTime 9c6b20a [Boris Shkolnik] added ZkListener 5f867c0 [Boris Shkolnik] added Apache license info d0687b9 [Boris Shkolnik] Merge branch 'LeaderElector' into JobCoordinator 372829f [Boris Shkolnik] Merge branch 'master' into JobCoordinator 14db43d [Boris Shkolnik] added TestZkStreamProcessor - main manual test b3b27c6 [Boris Shkolnik] Merge branch 'LeaderElector' into TestZkStreamProcessor 6649c80 [navina] Fixing ZkUtils close(). No need to close underlying connection explicitly 7f17e26 [Boris Shkolnik] ZkTestUtils f904cd3 [Boris Shkolnik] ScheduleAfterDebounceTime d126b10 [Boris Shkolnik] ScheduleAfterDebounceTime 63d8d60 [Boris Shkolnik] JavaJobConfig ff15501 [Boris Shkolnik] JavaJobConfig d20bacf [Boris Shkolnik] ZkTestUtils 737eb2f [Boris Shkolnik] ZkTestUtils 8e2d6c1 [Boris Shkolnik] add main manual test 7a47f84 [Boris Shkolnik] add main manual test fa2186b [Boris Shkolnik] added ZkController a0a7409 [Boris Shkolnik] Merge branch 'master' of https://github.com/navina/samza edda60d [navina] Removing an unintended change to the grouper 6dd6b8d [navina] Adding tests for ZkLeaderElector 1734f8f [navina] Adding tests for ZkUtils 317cf16 [navina] Adding tests for ZkKeyBuilder aaaf24e [navina] Adding EmbeddedZookeeper for testing 37c2c8b [navina] Extracting files related to LeaderElection 76b5167 [Boris Shkolnik] added new line at then end Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a974b236 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a974b236 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a974b236 Branch: refs/heads/master Commit: a974b236a78826d993e50d8936a4cc393a97bfe4 Parents: 6f811de Author: Boris Shkolnik <bshko...@bshkolni-ld1.linkedin.biz> Authored: Wed Mar 8 17:16:51 2017 -0800 Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz> Committed: Wed Mar 8 17:16:51 2017 -0800 ---------------------------------------------------------------------- build.gradle | 1 + gradle/dependency-versions.gradle | 1 + .../samza/zk/BarrierForVersionUpgrade.java | 3 +- .../samza/zk/ZkBarrierForVersionUpgrade.java | 114 ++++++---- .../org/apache/samza/zk/ZkJobCoordinator.java | 221 +++++++++++++++++++ .../samza/zk/ZkJobCoordinatorFactory.java | 57 +++++ .../main/java/org/apache/samza/zk/ZkUtils.java | 5 +- .../samza/zk/TestScheduleAfterDebounceTime.java | 26 +-- .../zk/TestZkBarrierForVersionUpgrade.java | 53 ++++- .../apache/samza/zk/TestZkLeaderElector.java | 1 + .../java/org/apache/samza/zk/TestZkUtils.java | 9 + 11 files changed, 422 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 400a913..ea1e3c2 100644 --- a/build.gradle +++ b/build.gradle @@ -160,6 +160,7 @@ project(":samza-core_$scalaVersion") { compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion" compile "org.eclipse.jetty:jetty-webapp:$jettyVersion" compile "com.101tec:zkclient:$zkClientVersion" + compile "org.apache.commons:commons-collections4:$apacheCommonsCollections4Version" testCompile project(":samza-api").sourceSets.test.output testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 0a8542b..20af1da 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -39,4 +39,5 @@ commonsCollectionVersion = "3.2.1" httpClientVersion="4.4.1" commonsLang3Version="3.4" + apacheCommonsCollections4Version="4.0" } http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java index b2d80d0..2b785f0 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java @@ -32,12 +32,13 @@ public interface BarrierForVersionUpgrade { * @param version - for which the barrier is started. * @param processorsNames - list of processors available at the time of the JobModel generation. */ - void startBarrier(String version, List<String> processorsNames); + void start(String version, List<String> processorsNames); /** * Called by the processor. * Updates the processor readiness to use the new version and wait on the barrier, until all other processors * joined. + * The call is async. The callback will be invoked when the barrier is reached. * @param version of the jobModel this barrier is protecting. * @param processorsName as it appears in the list of processors. * @param callback will be invoked, when barrier is reached. http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java index 3ec87b0..f7efa48 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java @@ -19,10 +19,12 @@ package org.apache.samza.zk; +import java.util.Arrays; import java.util.List; - import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +33,8 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { private final ZkUtils zkUtils; private final ZkKeyBuilder keyBuilder; private final static String BARRIER_DONE = "done"; + private final static String BARRIER_TIMED_OUT = "TIMED_OUT"; + private final static long BARRIER_TIMED_OUT_MS = 60 * 1000; private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class); private final ScheduleAfterDebounceTime debounceTimer; @@ -45,36 +49,66 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { this.debounceTimer = debounceTimer; } + /** + * set the barrier for the timer. If the timer is not achieved by the timeout - it will fail + * @param version for which the barrier is created + * @param timeout - time in ms to wait + */ + public void setTimer(String version, long timeout) { + debounceTimer.scheduleAfterDebounceTime("VersionUpgradeTimeout", timeout, ()->timerOff(version)); + } + + protected long getBarrierTimeOutMs() { + return BARRIER_TIMED_OUT_MS; + } + + private void timerOff(String version) { + // check if barrier has finished + final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); + final String barrierDonePath = String.format("%s/barrier_done", barrierPath); + Stat stat = new Stat(); + String done = zkUtils.getZkClient().<String>readData(barrierDonePath, stat); + if (done != null && done.equals(BARRIER_DONE)) + return; //nothing to do + + while (true) { + try { + // write a new value if no one else did, if the value was changed since previous reading - retry + zkUtils.getZkClient().writeData(barrierDonePath, "TIMED_OUT", stat.getVersion()); + return; + } catch (Exception e) { + // failed to write, try read/write again + LOG.info("Barrier timeout write failed"); + done = zkUtils.getZkClient().<String>readData(barrierDonePath, stat); + if (done.equals(BARRIER_DONE)) + return; //nothing to do + } + } + } + @Override - public void startBarrier(String version, List<String> processorsNames) { - String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); - String barrierDonePath = String.format("%s/barrier_done", barrierPath); - String barrierProcessors = String.format("%s/barrier_processors", barrierPath); + public void start(String version, List<String> processorsNames) { + final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); + final String barrierDonePath = String.format("%s/barrier_done", barrierPath); + final String barrierProcessors = String.format("%s/barrier_processors", barrierPath); zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath}); - // callback for when the barrier is reached - Runnable callback = new Runnable() { - @Override - public void run() { - LOG.info("Writing BARRIER DONE to " + barrierDonePath); - zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE); - } - }; // subscribe for processor's list changes LOG.info("Subscribing for child changes at " + barrierProcessors); zkUtils.getZkClient().subscribeChildChanges(barrierProcessors, - new ZkBarrierChangeHandler(callback, processorsNames)); + new ZkBarrierChangeHandler(version, processorsNames)); + + setTimer(version, getBarrierTimeOutMs()); } @Override public void waitForBarrier(String version, String processorsName, Runnable callback) { // if participant makes this call it means it has already stopped the old container and got the new job model. - String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); - String barrierDonePath = String.format("%s/barrier_done", barrierPath); - String barrierProcessors = String.format("%s/barrier_processors", barrierPath); - String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName); - + final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); + final String barrierDonePath = String.format("%s/barrier_done", barrierPath); + final String barrierProcessors = String.format("%s/barrier_processors", barrierPath); + final String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName); // update the barrier for this processor LOG.info("Creating a child for barrier at " + barrierProcessorThis); @@ -88,47 +122,32 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { * listener for the subscription. */ class ZkBarrierChangeHandler implements IZkChildListener { - Runnable callback; - List<String> names; + private final String version; + private final List<String> names; - public ZkBarrierChangeHandler(Runnable callback, List<String> names) { - this.callback = callback; + public ZkBarrierChangeHandler(String version, List<String> names) { + this.version = version; this.names = names; } @Override public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception { - // Find out the event & Log - boolean allIn = true; if (currentChildren == null) { LOG.info("Got handleChildChange with null currentChildren"); return; } - // debug - StringBuilder sb = new StringBuilder(); - for (String child : currentChildren) { - sb.append(child).append(","); - } - LOG.info("list of children in the barrier = " + parentPath + ":" + sb.toString()); - sb = new StringBuilder(); - for (String child : names) { - sb.append(child).append(","); - } - LOG.info("list of children to compare against = " + parentPath + ":" + sb.toString()); + LOG.info("list of children in the barrier = " + parentPath + ":" + Arrays.toString(currentChildren.toArray())); + LOG.info("list of children to compare against = " + parentPath + ":" + Arrays.toString(names.toArray())); // check if all the names are in - for (String n : names) { - if (!currentChildren.contains(n)) { - LOG.info("node " + n + " is still not in the list "); - allIn = false; - break; - } - } - if (allIn) { + if (CollectionUtils.containsAll(names, currentChildren)) { LOG.info("ALl nodes reached the barrier"); - callback.run(); // all the names have registered + final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); + final String barrierDonePath = String.format("%s/barrier_done", barrierPath); + LOG.info("Writing BARRIER DONE to " + barrierDonePath); + zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE); } } } @@ -152,6 +171,11 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { if (done.equals(BARRIER_DONE)) { zkUtils.unsubscribeDataChanges(barrierPathDone, this); debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, callback); + } else if (done.equals(BARRIER_TIMED_OUT)) { + // timed out + LOG.error("Barrier for " + dataPath + " timed out"); + System.out.println("Barrier for " + dataPath + " timed out"); + zkUtils.unsubscribeDataChanges(barrierPathDone, this); } // we do not need to resubscribe because, ZkClient library does it for us. http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java new file mode 100644 index 0000000..1d16d4a --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.JobModelManager$; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.processor.SamzaContainerController; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.util.SystemClock; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JobCoordinator for stand alone processor managed via Zookeeper. + */ +public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { + private static final Logger log = LoggerFactory.getLogger(ZkJobCoordinator.class); + + private final ZkUtils zkUtils; + private final int processorId; + private final ZkController zkController; + private final SamzaContainerController containerController; + private final BarrierForVersionUpgrade barrier; + private final ScheduleAfterDebounceTime debounceTimer; + private final StreamMetadataCache streamMetadataCache; + private final ZkKeyBuilder keyBuilder; + private final Config config; + + private JobModel newJobModel; + private String newJobModelVersion; // version published in ZK (by the leader) + private JobModelManager jobModelManager; + + public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, SamzaContainerController containerController) { + this.zkUtils = zkUtils; + this.keyBuilder = zkUtils.getKeyBuilder(); + this.debounceTimer = debounceTimer; + this.processorId = processorId; + this.containerController = containerController; + this.zkController = new ZkControllerImpl(String.valueOf(processorId), zkUtils, debounceTimer, this); + this.config = config; + + + barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer); + streamMetadataCache = getStreamMetadataCache(); + } + + private StreamMetadataCache getStreamMetadataCache() { + // model generation - NEEDS TO BE REVIEWED + JavaSystemConfig systemConfig = new JavaSystemConfig(this.config); + Map<String, SystemAdmin> systemAdmins = new HashMap<>(); + for (String systemName: systemConfig.getSystemNames()) { + String systemFactoryClassName = systemConfig.getSystemFactory(systemName); + if (systemFactoryClassName == null) { + String msg = String.format("A stream uses system %s, which is missing from the configuration.", systemName); + log.error(String.format(msg)); + throw new SamzaException(msg); + } + SystemFactory systemFactory = Util.getObj(systemFactoryClassName); + systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config)); + } + + return new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance()); + } + + @Override + public void start() { + zkController.register(); + } + + public void cleanupZk() { + zkUtils.deleteRoot(); + } + + @Override + public void stop() { + zkController.stop(); + } + + @Override + public boolean awaitStart(long timeoutMs) + throws InterruptedException { + return containerController.awaitStart(timeoutMs); + } + + @Override + public int getProcessorId() { + return processorId; + } + + @Override + public JobModel getJobModel() { + return newJobModel; + } + + //////////////////////////////////////////////// LEADER stuff /////////////////////////// + @Override + public void onBecomeLeader() { + log.info("ZkJobCoordinator::onBecomeLeader - I became the leader!"); + + List<String> emptyList = new ArrayList<>(); + + // actual actions to do are the same as onProcessorChange() + debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, + ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> onProcessorChange(emptyList)); + } + + @Override + public void onProcessorChange(List<String> processorIds) { + log.info("ZkJobCoordinator::onProcessorChange - Processors changed! List: " + Arrays.toString(processorIds.toArray())); + generateNewJobModel(); + } + + @Override + public void onNewJobModelAvailable(final String version) { + newJobModelVersion = version; + log.info("pid=" + processorId + "new JobModel available"); + // stop current work + containerController.stopContainer(); + log.info("pid=" + processorId + "new JobModel available.Container stopped."); + // get the new job model + newJobModel = zkUtils.getJobModel(version); + log.info("pid=" + processorId + "new JobModel available. ver=" + version + "; jm = " + newJobModel); + + String currentPath = zkUtils.getEphemeralPath(); + String zkProcessorId = keyBuilder.parseIdFromPath(currentPath); + + // update ZK and wait for all the processors to get this new version + barrier.waitForBarrier(version, String.valueOf(zkProcessorId), new Runnable() { + @Override + public void run() { + onNewJobModelConfirmed(version); + } + }); + } + + @Override + public void onNewJobModelConfirmed(String version) { + log.info("pid=" + processorId + "new version " + version + " of the job model got confirmed"); + // get the new Model + JobModel jobModel = getJobModel(); + log.info("pid=" + processorId + "got the new job model in JobModelConfirmed =" + jobModel); + + // start the container with the new model + containerController.startContainer(jobModel.getContainers().get(processorId), jobModel.getConfig(), + jobModel.maxChangeLogStreamPartitions); + } + + /** + * Generate new JobModel when becoming a leader or the list of processor changed. + */ + private void generateNewJobModel() { + // get the current list of processors + List<String> currentProcessors = zkUtils.getSortedActiveProcessors(); + + // get the current version + String currentJMVersion = zkUtils.getJobModelVersion(); + String nextJMVersion; + if (currentJMVersion == null) { + log.info("pid=" + processorId + "generating first version of the model"); + nextJMVersion = "1"; + } else { + nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1); + } + log.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion); + + StringBuilder sb = new StringBuilder(); + List<Integer> containerIds = new ArrayList<>(); + for (String processor : currentProcessors) { + String zkProcessorId = keyBuilder.parseIdFromPath(processor); + sb.append(zkProcessorId).append(","); + containerIds.add(Integer.valueOf(zkProcessorId)); + } + log.info("generate new job model: processorsIds: " + sb.toString()); + + jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null, + containerIds); + JobModel jobModel = jobModelManager.jobModel(); + + log.info("pid=" + processorId + "Generated jobModel: " + jobModel); + + // publish the new job model first + zkUtils.publishJobModel(nextJMVersion, jobModel); + log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion + ";jm=" + jobModel); + + // start the barrier for the job model update + barrier.start(nextJMVersion, currentProcessors); + + // publish new JobModel version + zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion); + log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java new file mode 100644 index 0000000..e211f70 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.zk; + +import org.I0Itec.zkclient.ZkClient; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.processor.SamzaContainerController; + +public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { + /** + * Method to instantiate an implementation of JobCoordinator + * + * @param processorId Indicates the StreamProcessor's id to which this Job Coordinator is associated with + * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig" + * @return An instance of IJobCoordinator + */ + @Override + public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) { + JobConfig jobConfig = new JobConfig(config); + String groupName = String.format("%s-%s", jobConfig.getName(), jobConfig.getJobId()); + ZkConfig zkConfig = new ZkConfig(config); + ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); + ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + return new ZkJobCoordinator( + processorId, + config, + debounceTimer, + new ZkUtils( + String.valueOf(processorId), + new ZkKeyBuilder(groupName), + zkClient, + zkConfig.getZkConnectionTimeoutMs() + ), + containerController); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 73376b1..e8170e3 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -64,12 +64,13 @@ public class ZkUtils { private volatile String ephemeralPath = null; private final ZkKeyBuilder keyBuilder; private final int connectionTimeoutMs; - private final String processorId = "TO BE PASSED IN THE CONSTRUCTOR"; //TODO + private final String processorId; - public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) { + public ZkUtils(String processorId, ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) { this.keyBuilder = zkKeyBuilder; this.connectionTimeoutMs = connectionTimeoutMs; this.zkClient = zkClient; + this.processorId = processorId; } public void connect() throws ZkInterruptedException { http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java index e57372f..23a8cc1 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java @@ -48,19 +48,13 @@ public class TestScheduleAfterDebounceTime { ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj(); - debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () -> - { + debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () -> { testObj.inc(); - } - ); + }); // action is delayed Assert.assertEquals(0, i); - try { - Thread.sleep(DEBOUNCE_TIME + 10); - } catch (InterruptedException e) { - Assert.fail("Sleep was interrupted"); - } + TestZkUtils.sleepMs(DEBOUNCE_TIME + 10); // debounce time passed Assert.assertEquals(1, i); @@ -85,11 +79,8 @@ public class TestScheduleAfterDebounceTime { } ); - try { - Thread.sleep(DEBOUNCE_TIME + 10); - } catch (InterruptedException e) { - Assert.fail("Sleep was interrupted"); - } + TestZkUtils.sleepMs(DEBOUNCE_TIME + 10); + // still should be the old value Assert.assertEquals(0, i); @@ -100,11 +91,8 @@ public class TestScheduleAfterDebounceTime { } ); - try { - Thread.sleep(3 * DEBOUNCE_TIME + 10); - } catch (InterruptedException e) { - Assert.fail("Sleep was interrupted"); - } + TestZkUtils.sleepMs(3 * DEBOUNCE_TIME + 10); + Assert.assertEquals(100, i); } } http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java index 92cb2c9..f26d4d0 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java @@ -81,7 +81,7 @@ public class TestZkBarrierForVersionUpgrade { } final Status s = new Status(); - barrier.startBarrier(ver, processors); + barrier.start(ver, processors); barrier.waitForBarrier(ver, "p1", new Runnable() { @Override @@ -117,7 +117,7 @@ public class TestZkBarrierForVersionUpgrade { } final Status s = new Status(); - barrier.startBarrier(ver, processors); + barrier.start(ver, processors); barrier.waitForBarrier(ver, "p1", new Runnable() { @Override @@ -134,13 +134,62 @@ public class TestZkBarrierForVersionUpgrade { }); Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 100)); + } + + @Test + public void testZkBarrierForVersionUpgradeWithTimeOut() { + ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); + ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer) { + @Override + protected long getBarrierTimeOutMs() { + return 200; + } + }; + String ver = "1"; + List<String> processors = new ArrayList<String>(); + processors.add("p1"); + processors.add("p2"); + processors.add("p3"); + + class Status { + boolean p1 = false; + boolean p2 = false; + boolean p3 = false; + } + final Status s = new Status(); + + barrier.start(ver, processors); + barrier.waitForBarrier(ver, "p1", new Runnable() { + @Override + public void run() { + s.p1 = true; + } + }); + + barrier.waitForBarrier(ver, "p2", new Runnable() { + @Override + public void run() { + s.p2 = true; + } + }); + + // this node will join "too late" + barrier.waitForBarrier(ver, "p3", new Runnable() { + @Override + public void run() { + TestZkUtils.sleepMs(300); + s.p3 = true; + } + }); + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 400)); } private ZkUtils getZkUtilsWithNewClient() { ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS); return new ZkUtils( + "1", KEY_BUILDER, ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS), CONNECTION_TIMEOUT_MS); http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index bfda464..12fa922 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -501,6 +501,7 @@ public class TestZkLeaderElector { private ZkUtils getZkUtilsWithNewClient() { ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS); return new ZkUtils( + "processorId1", KEY_BUILDER, ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS), CONNECTION_TIMEOUT_MS); http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index 58c3ed6..913bd49 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -67,6 +67,7 @@ public class TestZkUtils { } zkUtils = new ZkUtils( + "testProcessorId", KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS); @@ -209,4 +210,12 @@ public class TestZkUtils { } return false; } + + public static void sleepMs(long delay) { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + Assert.fail("Sleep was interrupted"); + } + } }