Repository: samza
Updated Branches:
  refs/heads/master 11afae3fd -> 187ec5f2f


SAMZA-1718: Simplify management of Zookeeper coordination state

1. Currently coordination related state is spread across several Zookeeper 
classes. There are also back-and-forth flows that exist between the 
ZkJobCoordinator, ZkControllerImpl, ZkControllerListener and ZkLeaderElector. 
This PR nukes un-necessary interfaces (and their implementation classes), 
simplifies state management and unifies state in the ZkJobCoordinator class.

2. Clearly defined life-cycle hooks on events:
- Protocol validations happen once during the lifecycle of a StreamProcessor 
(instead of each new session)
- New subscriptions to listeners happen at each a new Zk session

Author: Jagadish <jvenkatra...@linkedin.com>

Reviewers: Prateek M<pmahe...@linkedin.com>

Closes #525 from vjagadish/zk-simplify


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/187ec5f2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/187ec5f2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/187ec5f2

Branch: refs/heads/master
Commit: 187ec5f2fc81bab63bef5de28865ea267a4400fc
Parents: 11afae3
Author: Jagadish <jvenkatra...@linkedin.com>
Authored: Fri Jun 22 18:06:51 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Fri Jun 22 18:06:51 2018 -0700

----------------------------------------------------------------------
 .../samza/zk/ZkBarrierForVersionUpgrade.java    |  19 +-
 .../java/org/apache/samza/zk/ZkController.java  |  39 -----
 .../org/apache/samza/zk/ZkControllerImpl.java   | 163 ------------------
 .../apache/samza/zk/ZkControllerListener.java   |  37 ----
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 172 ++++++++++++-------
 .../org/apache/samza/zk/ZkLeaderElector.java    |  17 +-
 .../main/java/org/apache/samza/zk/ZkUtils.java  |  70 +++++---
 .../apache/samza/zk/TestZkJobCoordinator.java   |  19 +-
 .../java/org/apache/samza/zk/TestZkUtils.java   |  13 +-
 9 files changed, 189 insertions(+), 360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index a2ed823..d0f1caf 100644
--- 
a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -168,7 +168,7 @@ public class ZkBarrierForVersionUpgrade {
    * Listener for changes to the list of participants. It is meant to be 
subscribed only by the creator of the barrier
    * node. It checks to see when the barrier is ready to be marked as 
completed.
    */
-  class ZkBarrierChangeHandler extends ZkUtils.GenIZkChildListener {
+  class ZkBarrierChangeHandler extends ZkUtils.GenerationAwareZkChildListener {
     private static final String ACTION_NAME = "ZkBarrierChangeHandler";
 
     private final String barrierVersion;
@@ -181,10 +181,7 @@ public class ZkBarrierForVersionUpgrade {
     }
 
     @Override
-    public void handleChildChange(String barrierParticipantPath, List<String> 
participantIds) {
-      if (notAValidEvent()) {
-        return;
-      }
+    public void doHandleChildChange(String barrierParticipantPath, 
List<String> participantIds) {
       if (participantIds == null) {
         LOG.info("Received notification with null participants for barrier: 
{}. Ignoring it.", barrierParticipantPath);
         return;
@@ -216,7 +213,7 @@ public class ZkBarrierForVersionUpgrade {
    * Barrier state values are either DONE or TIMED_OUT. It only registers to 
receive on valid state change notification.
    * Once a valid state change notification is received, it will un-subscribe 
from further notifications.
    */
-  class ZkBarrierReachedHandler extends ZkUtils.GenIZkDataListener {
+  class ZkBarrierReachedHandler extends ZkUtils.GenerationAwareZkDataListener {
     private final String barrierStatePath;
     private final String barrierVersion;
 
@@ -227,10 +224,8 @@ public class ZkBarrierForVersionUpgrade {
     }
 
     @Override
-    public void handleDataChange(String dataPath, Object data) {
+    public void doHandleDataChange(String dataPath, Object data) {
       LOG.info(String.format("Received barrierState change notification for 
barrier version: %s from zkNode: %s with data: %s.", barrierVersion, dataPath, 
data));
-      if (notAValidEvent())
-        return;
 
       State barrierState = (State) data;
       List<State> expectedBarrierStates = ImmutableList.of(State.DONE, 
State.TIMED_OUT);
@@ -244,10 +239,8 @@ public class ZkBarrierForVersionUpgrade {
     }
 
     @Override
-    public void handleDataDeleted(String dataPath) {
-      LOG.warn("barrier done node got deleted at " + dataPath);
-      if (notAValidEvent())
-        return;
+    public void doHandleDataDeleted(String path) {
+      LOG.warn("Data deleted in path: " + path + " barrierVersion: " + 
barrierVersion);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
deleted file mode 100644
index de2e473..0000000
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package org.apache.samza.zk;
-
-/**
- * Api to the functionality provided by ZK
- *
- * Api for JC to ZK communication
- */
-public interface ZkController {
-  void register();
-  boolean isLeader();
-  void stop();
-
-  // Leader
-  /**
-   * Allows the {@link ZkJobCoordinator} to subscribe to changes to Zk nodes 
in the processors subtree
-   * Typically, the leader is interested in such notifications.
-   */
-  void subscribeToProcessorChange();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
deleted file mode 100644
index 87d7177..0000000
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.zk;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.LeaderElector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class ZkControllerImpl implements ZkController {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ZkControllerImpl.class);
-
-  private final String processorId;
-  private final ZkUtils zkUtils;
-  private final ZkControllerListener zkControllerListener;
-  private final LeaderElector zkLeaderElector;
-
-  public ZkControllerImpl(String processorId, ZkUtils zkUtils,
-      ZkControllerListener zkControllerListener, LeaderElector 
zkLeaderElector) {
-    this.processorId = processorId;
-    this.zkUtils = zkUtils;
-    this.zkControllerListener = zkControllerListener;
-    this.zkLeaderElector = zkLeaderElector;
-
-    init();
-  }
-
-  private void init() {
-    ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
-
-    zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), 
keyBuilder.getJobModelVersionPath(), keyBuilder
-            .getJobModelPathPrefix()});
-  }
-
-  @Override
-  public void register() {
-    // TODO - make a loop here with some number of attempts.
-    // possibly split into two method - becomeLeader() and becomeParticipant()
-    zkLeaderElector.tryBecomeLeader();
-
-    // make sure we are connection to a job that uses the same ZK 
communication protocol version.
-    try {
-      zkUtils.validateZkVersion();
-    } catch (SamzaException e) {
-      // IMPORTANT: Mismatch of the version, means we are trying to join a 
job, started by processors with different version.
-      // If there are no processors running, this is the place to do the 
migration to the new
-      // ZK structure.
-      // If some processors are running, then this processor should fail with 
an error to tell the user to stop all
-      // the processors before upgrading to this new version.
-      // TODO migration here
-      // for now we just rethrow the exception
-      throw e;
-    }
-
-
-    // subscribe to JobModel version updates
-    zkUtils.subscribeToJobModelVersionChange(new 
ZkJobModelVersionChangeHandler(zkUtils));
-  }
-
-  @Override
-  public boolean isLeader() {
-    return zkLeaderElector.amILeader();
-  }
-
-  @Override
-  public void stop() {
-    try {
-      if (isLeader()) {
-        zkLeaderElector.resignLeadership();
-      }
-    } finally {
-      zkUtils.deleteProcessorNode(processorId);
-      zkUtils.close();
-    }
-  }
-
-  @Override
-  public void subscribeToProcessorChange() {
-    zkUtils.subscribeToProcessorChange(new ProcessorChangeHandler(zkUtils));
-  }
-
-  // Only by Leader
-  class ProcessorChangeHandler extends ZkUtils.GenIZkChildListener {
-
-    public ProcessorChangeHandler(ZkUtils zkUtils) {
-      super(zkUtils, "ProcessorChangeHandler");
-    }
-
-    /**
-     * Called when the children of the given path changed.
-     *
-     * @param parentPath    The parent path
-     * @param currentChildren The children or null if the root node (parent 
path) was deleted.
-     * @throws Exception
-     */
-    @Override
-    public void handleChildChange(String parentPath, List<String> 
currentChildren)
-        throws Exception {
-      if (notAValidEvent())
-        return;
-
-      if (currentChildren == null) {
-        // this may happen only in case of exception in ZK. It happens if the 
zkNode has been deleted.
-        // So the notification will pass 'null' as the list of children. 
Exception should be visible in the logs.
-        // It makes no sense to pass it further down.
-        LOG.error("handleChildChange on path " + parentPath + " was invoked 
with NULL list of children");
-        return;
-      }
-      LOG.info(
-          "ZkControllerImpl::ProcessorChangeHandler::handleChildChange - Path: 
" + parentPath +
-              "  Current Children: " + currentChildren);
-      zkControllerListener.onProcessorChange(currentChildren);
-
-    }
-  }
-
-  class ZkJobModelVersionChangeHandler extends ZkUtils.GenIZkDataListener {
-
-    public ZkJobModelVersionChangeHandler(ZkUtils zkUtils) {
-      super(zkUtils, "ZkJobModelVersionChangeHandler");
-    }
-    /**
-     * Called when there is a change to the data in JobModel version path
-     * To the subscribers, it signifies that a new version of JobModel is 
available.
-     */
-    @Override
-    public void handleDataChange(String dataPath, Object data) throws 
Exception {
-      if (notAValidEvent())
-        return;
-
-      LOG.info("pid=" + processorId + ". Got notification on version update 
change. path=" + dataPath + "; data="
-          + data);
-      zkControllerListener.onNewJobModelAvailable((String) data);
-    }
-
-    @Override
-    public void handleDataDeleted(String dataPath) throws Exception {
-      if (notAValidEvent())
-        return;
-
-      throw new SamzaException("version update path has been deleted!");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
deleted file mode 100644
index af4d56c..0000000
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.zk;
-
-import java.util.List;
-
-/**
- * Interface to listen for notifications from the {@link ZkController}
- */
-public interface ZkControllerListener {
-  /**
-   * ZkController observes the ZkTree for changes to group membership of 
processors and notifies the listener
-   *
-   * @param processorIds List of current znodes that are in the processing 
group
-   */
-  void onProcessorChange(List<String> processorIds);
-
-  void onNewJobModelAvailable(String version); // start job model update (stop 
current work)
-  void onNewJobModelConfirmed(String version); // start new work according to 
the new model
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 23fb3b0..6d85c66 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -19,6 +19,7 @@
 package org.apache.samza.zk;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.HashMap;
@@ -62,7 +63,7 @@ import org.slf4j.LoggerFactory;
 /**
  * JobCoordinator for stand alone processor managed via Zookeeper.
  */
-public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
+public class ZkJobCoordinator implements JobCoordinator {
   private static final Logger LOG = 
LoggerFactory.getLogger(ZkJobCoordinator.class);
   // TODO: MetadataCache timeout has to be 0 for the leader so that it can 
always have the latest information associated
   // with locality. Since host-affinity is not yet implemented, this can be 
fixed as part of SAMZA-1197
@@ -84,7 +85,6 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
 
   private final ZkUtils zkUtils;
   private final String processorId;
-  private final ZkController zkController;
 
   private final Config config;
   private final ZkBarrierForVersionUpgrade barrier;
@@ -117,7 +117,6 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     zkUtils.getZkClient().subscribeStateChanges(new 
ZkSessionStateChangedListener());
     leaderElector = new ZkLeaderElector(processorId, zkUtils);
     leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
-    this.zkController = new ZkControllerImpl(processorId, zkUtils, this, 
leaderElector);
     this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
     this.reporters = MetricsReporterLoader.getMetricsReporters(new 
MetricsConfig(config), processorId);
     debounceTimer = new ScheduleAfterDebounceTime(processorId);
@@ -132,9 +131,15 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
 
   @Override
   public void start() {
+    ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
+    zkUtils.validateZkVersion();
+    zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), 
keyBuilder.getJobModelVersionPath(), keyBuilder
+        .getJobModelPathPrefix()});
+
     startMetrics();
     systemAdmins.start();
-    zkController.register();
+    leaderElector.tryBecomeLeader();
+    zkUtils.subscribeToJobModelVersionChange(new 
ZkJobModelVersionChangeHandler(zkUtils));
   }
 
   @Override
@@ -157,8 +162,16 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
 
         debounceTimer.stopScheduler();
 
-        LOG.debug("Shutting down ZkController.");
-        zkController.stop();
+        if (leaderElector.amILeader()) {
+          LOG.info("Resigning leadership for processorId: " + processorId);
+          leaderElector.resignLeadership();
+        }
+
+        LOG.info("Shutting down ZkUtils.");
+        // close zk connection
+        if (zkUtils != null) {
+          zkUtils.close();
+        }
 
         LOG.debug("Shutting down system admins.");
         systemAdmins.stop();
@@ -212,11 +225,14 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     return processorId;
   }
 
-  //////////////////////////////////////////////// LEADER stuff 
///////////////////////////
-  @Override
+  /*
+   * The leader handles notifications for two types of events:
+   *   1. Changes to the current set of processors in the group.
+   *   2. Changes to the set of participants who have subscribed the the 
barrier
+   */
   public void onProcessorChange(List<String> processors) {
     if (leaderElector.amILeader()) {
-      LOG.info("ZkJobCoordinator::onProcessorChange - list of processors 
changed! List size=" + processors.size());
+      LOG.info("ZkJobCoordinator::onProcessorChange - list of processors 
changed. List size=" + processors.size());
       debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, 
debounceTimeMs, () -> doOnProcessorChange(processors));
     }
   }
@@ -267,42 +283,6 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0, () -> 
zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE));
   }
 
-  @Override
-  public void onNewJobModelAvailable(final String version) {
-    debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () ->
-      {
-        LOG.info("pid=" + processorId + ": new JobModel available");
-        // get the new job model from ZK
-        newJobModel = zkUtils.getJobModel(version);
-        LOG.info("pid=" + processorId + ": new JobModel available. ver=" + 
version + "; jm = " + newJobModel);
-
-        if (!newJobModel.getContainers().containsKey(processorId)) {
-          LOG.info("New JobModel does not contain pid={}. Stopping this 
processor. New JobModel: {}",
-              processorId, newJobModel);
-          stop();
-        } else {
-          // stop current work
-          if (coordinatorListener != null) {
-            coordinatorListener.onJobModelExpired();
-          }
-          // update ZK and wait for all the processors to get this new version
-          barrier.join(version, processorId);
-        }
-      });
-  }
-
-  @Override
-  public void onNewJobModelConfirmed(String version) {
-    LOG.info("pid=" + processorId + "new version " + version + " of the job 
model got confirmed");
-    // get the new Model
-    JobModel jobModel = getJobModel();
-
-    // start the container with the new model
-    if (coordinatorListener != null) {
-      coordinatorListener.onNewJobModel(processorId, jobModel);
-    }
-  }
-
   private String createProcessorId(Config config) {
     // TODO: This check to be removed after 0.13+
     ApplicationConfig appConfig = new ApplicationConfig(config);
@@ -319,17 +299,6 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     }
   }
 
-  private List<String> getActualProcessorIds(List<String> processors) {
-    if (processors.size() > 0) {
-      // we should use this list
-      // but it needs to be converted into PIDs, which is part of the data
-      return zkUtils.getActiveProcessorsIDs(processors);
-    } else {
-      // get the current list of processors
-      return zkUtils.getSortedActiveProcessorsIDs();
-    }
-  }
-
   /**
    * Generate new JobModel when becoming a leader or the list of processor 
changed.
    */
@@ -354,11 +323,10 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
   class LeaderElectorListenerImpl implements LeaderElectorListener {
     @Override
     public void onBecomingLeader() {
-      LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
+      LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader");
       metrics.isLeader.set(true);
-      zkController.subscribeToProcessorChange();
-      debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, 
debounceTimeMs, () ->
-        {
+      zkUtils.subscribeToProcessorChange(new ProcessorChangeHandler(zkUtils));
+      debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, 
debounceTimeMs, () -> {
           // actual actions to do are the same as onProcessorChange
           doOnProcessorChange(new ArrayList<>());
         });
@@ -386,7 +354,16 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
       metrics.barrierStateChange.inc();
       metrics.singleBarrierRebalancingTime.update(System.nanoTime() - 
startTime);
       if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
-        debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> 
onNewJobModelConfirmed(version));
+        debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> {
+            LOG.info("pid=" + processorId + "new version " + version + " of 
the job model got confirmed");
+
+            // read the new Model
+            JobModel jobModel = getJobModel();
+            // start the container with the new model
+            if (coordinatorListener != null) {
+              coordinatorListener.onNewJobModel(processorId, jobModel);
+            }
+          });
       } else {
         if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
           // no-op for non-leaders
@@ -394,8 +371,7 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
           LOG.warn("Barrier for version " + version + " timed out.");
           if (leaderElector.amILeader()) {
             LOG.info("Leader will schedule a new job model generation");
-            debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, 
debounceTimeMs, () ->
-              {
+            debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, 
debounceTimeMs, () -> {
                 // actual actions to do are the same as onProcessorChange
                 doOnProcessorChange(new ArrayList<>());
               });
@@ -412,6 +388,73 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     }
   }
 
+  class ProcessorChangeHandler extends ZkUtils.GenerationAwareZkChildListener {
+
+    public ProcessorChangeHandler(ZkUtils zkUtils) {
+      super(zkUtils, "ProcessorChangeHandler");
+    }
+
+    /**
+     * Called when the children of the given path changed.
+     *
+     * @param parentPath      The parent path
+     * @param currentChildren The children or null if the root node (parent 
path) was deleted.
+     * @throws Exception
+     */
+    @Override
+    public void doHandleChildChange(String parentPath, List<String> 
currentChildren)
+        throws Exception {
+      if (currentChildren == null) {
+        LOG.info("handleChildChange on path " + parentPath + " was invoked 
with NULL list of children");
+      } else {
+        LOG.info("ProcessorChangeHandler::handleChildChange - Path: {} Current 
Children: {} ", parentPath, currentChildren);
+        onProcessorChange(currentChildren);
+      }
+    }
+  }
+
+  class ZkJobModelVersionChangeHandler extends 
ZkUtils.GenerationAwareZkDataListener {
+
+    public ZkJobModelVersionChangeHandler(ZkUtils zkUtils) {
+      super(zkUtils, "ZkJobModelVersionChangeHandler");
+    }
+
+    /**
+     * Invoked when there is a change to the JobModelVersion z-node. It 
signifies that a new JobModel version is available.
+     */
+    @Override
+    public void doHandleDataChange(String dataPath, Object data) {
+      debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () 
-> {
+          String jobModelVersion = (String) data;
+
+          LOG.info("Got a notification for new JobModel version. Path = {} 
Version = {}", dataPath, data);
+
+          newJobModel = zkUtils.getJobModel(jobModelVersion);
+          LOG.info("pid=" + processorId + ": new JobModel is available. 
Version =" + jobModelVersion + "; JobModel = " + newJobModel);
+
+          if (!newJobModel.getContainers().containsKey(processorId)) {
+            LOG.info("New JobModel does not contain pid={}. Stopping this 
processor. New JobModel: {}",
+                processorId, newJobModel);
+            stop();
+          } else {
+            // stop current work
+            if (coordinatorListener != null) {
+              coordinatorListener.onJobModelExpired();
+            }
+            // update ZK and wait for all the processors to get this new 
version
+            barrier.join(jobModelVersion, processorId);
+          }
+        });
+    }
+
+    @Override
+    public void doHandleDataDeleted(String dataPath) {
+      LOG.warn("JobModel version z-node has been deleted. Shutting down the 
coordinator" + dataPath);
+      debounceTimer.scheduleAfterDebounceTime("JOB_MODEL_VERSION_DELETED", 0,  
() -> stop());
+    }
+  }
+
+
   /// listener to handle ZK state change events
   @VisibleForTesting
   class ZkSessionStateChangedListener implements IZkStateListener {
@@ -479,7 +522,8 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
       LOG.info("Got new session created event for processor=" + processorId);
       debounceTimer.cancelAllScheduledActions();
       LOG.info("register zk controller for the new session");
-      zkController.register();
+      leaderElector.tryBecomeLeader();
+      zkUtils.subscribeToJobModelVersionChange(new 
ZkJobModelVersionChangeHandler(zkUtils));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index c9ee1f0..9171d9d 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -170,26 +170,19 @@ public class ZkLeaderElector implements LeaderElector {
     return String.format("[Processor-%s] %s", processorIdStr, logMessage);
   }
 
-  // Only by non-leaders
-  class PreviousProcessorChangeListener extends ZkUtils.GenIZkDataListener {
+  class PreviousProcessorChangeListener extends 
ZkUtils.GenerationAwareZkDataListener {
 
     public PreviousProcessorChangeListener(ZkUtils zkUtils) {
       super(zkUtils, "PreviousProcessorChangeListener");
     }
     @Override
-    public void handleDataChange(String dataPath, Object data) throws 
Exception {
-      LOG.debug("Data change on path: " + dataPath + " Data: " + data);
-      if (notAValidEvent())
-        return;
+    public void doHandleDataChange(String dataPath, Object data) {
+      LOG.info("Data change on path: {} for data: {}", dataPath, data);
     }
 
     @Override
-    public void handleDataDeleted(String dataPath)
-        throws Exception {
-      LOG.info(zLog("Data deleted on path " + dataPath + ". Predecessor went 
away. So, trying to become leader again..."));
-      if (notAValidEvent()) {
-        return;
-      }
+    public void doHandleDataDeleted(String dataPath) {
+      LOG.info(zLog("Data deleted on path " + dataPath + ". Predecessor went 
away. So, trying to become leader."));
       tryBecomeLeader();
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index fead167..4d325c5 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -341,60 +341,86 @@ public class ZkUtils {
   }
 
   /**
-   * Generation enforcing zk listener abstract class.
-   * It helps listeners, which extend it, to notAValidEvent old generation 
events.
-   * We cannot use 'sessionId' for this because it is not available through 
ZkClient (at leaste without reflection)
+   * A generation-aware {@link IZkChildListener} that only responds to events 
that occur in the current-generation.
+   * Each generation is identified by a generation-id which is scoped to the 
currently active Zk session and
+   * is incremented each time a session expires. This ensures that events 
corresponding to the previous generation
+   * are not acted on.
    */
-  public abstract static class GenIZkChildListener implements IZkChildListener 
{
+  public abstract static class GenerationAwareZkChildListener implements 
IZkChildListener {
     private final int generation;
     private final ZkUtils zkUtils;
     private final String listenerName;
 
-    public GenIZkChildListener(ZkUtils zkUtils, String listenerName) {
+    public GenerationAwareZkChildListener(ZkUtils zkUtils, String 
listenerName) {
       generation = zkUtils.getGeneration();
       this.zkUtils = zkUtils;
       this.listenerName = listenerName;
     }
 
-    protected boolean notAValidEvent() {
-      int curGeneration = zkUtils.getGeneration();
-      if (curGeneration != generation) {
-        LOG.warn("SKIPPING handleDataChanged for " + listenerName +
-            " from wrong generation. current generation=" + curGeneration + "; 
callback generation= " + generation);
-        return true;
+    @Override
+    public void handleChildChange(String barrierParticipantPath, List<String> 
participantIds) throws Exception {
+      int currentGeneration = zkUtils.getGeneration();
+      if (currentGeneration != generation) {
+        LOG.warn(String.format("Skipping handleChildChange for %s from wrong 
generation. Current generation: %s; " +
+            "Callback generation: %s", listenerName, currentGeneration, 
generation));
+        return;
       }
-      return false;
+      doHandleChildChange(barrierParticipantPath, participantIds);
     }
+
+    public abstract void doHandleChildChange(String path, List<String> 
children) throws Exception;
   }
 
-  public abstract static class GenIZkDataListener implements IZkDataListener {
+  /**
+   * A generation-aware {@link IZkDataListener} that only responds to events 
that occur in the current-generation.
+   * Each generation is identified by a generation-id which is scoped to the 
currently active Zk session and
+   * is incremented each time a session expires. This ensures that events 
corresponding to the previous generation
+   * are not acted on.
+   */
+  public abstract static class GenerationAwareZkDataListener implements 
IZkDataListener {
     private final int generation;
     private final ZkUtils zkUtils;
     private final String listenerName;
 
-    public GenIZkDataListener(ZkUtils zkUtils, String listenerName) {
+    public GenerationAwareZkDataListener(ZkUtils zkUtils, String listenerName) 
{
       generation = zkUtils.getGeneration();
       this.zkUtils = zkUtils;
       this.listenerName = listenerName;
     }
 
-    protected boolean notAValidEvent() {
-      int curGeneration = zkUtils.getGeneration();
-      if (curGeneration != generation) {
-        LOG.warn("SKIPPING handleDataChanged for " + listenerName +
-            " from wrong generation. curGen=" + curGeneration + "; cb gen= " + 
generation);
-        return true;
+    @Override
+    public void handleDataChange(String path, Object data) {
+      if (!isValid()) {
+        LOG.warn(String.format("Skipping handleDataChange for %s from wrong 
generation. Current generation: %s; " +
+            "Callback generation: %s", listenerName, zkUtils.getGeneration(), 
generation));
+      } else {
+        doHandleDataChange(path, data);
+      }
+    }
+
+    public void handleDataDeleted(String dataPath) throws Exception {
+      if (!isValid()) {
+        LOG.warn(String.format("Skipping handleDataDeleted for %s from wrong 
generation. Current generation: %s; " +
+            "Callback generation: %s", listenerName, zkUtils.getGeneration(), 
generation));
+      } else {
+        doHandleDataDeleted(dataPath);
       }
-      return false;
     }
 
+    public abstract void doHandleDataChange(String path, Object data);
+
+    public abstract void doHandleDataDeleted(String path);
+
+    private boolean isValid() {
+      return generation == zkUtils.getGeneration();
+    }
   }
 
   /**
     * subscribe for changes of JobModel version
     * @param dataListener describe this
     */
-  public void subscribeToJobModelVersionChange(GenIZkDataListener 
dataListener) {
+  public void subscribeToJobModelVersionChange(GenerationAwareZkDataListener 
dataListener) {
     LOG.info(" subscribing for jm version change at:" + 
keyBuilder.getJobModelVersionPath());
     zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), 
dataListener);
     metrics.subscriptions.inc();

http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index c8367fb..50d6a42 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -19,6 +19,9 @@
 package org.apache.samza.zk;
 
 import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.model.JobModel;
@@ -27,7 +30,10 @@ import 
org.apache.samza.zk.ZkJobCoordinator.ZkSessionStateChangedListener;
 import org.apache.zookeeper.Watcher;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
+import static junit.framework.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
 
@@ -36,9 +42,10 @@ public class TestZkJobCoordinator {
   private static final String TEST_JOB_MODEL_VERSION = "1";
 
   @Test
-  public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() {
+  public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() throws 
Exception {
     ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
     ZkClient mockZkClient = Mockito.mock(ZkClient.class);
+    CountDownLatch jcShutdownLatch = new CountDownLatch(1);
     
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
 
     ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
@@ -47,9 +54,17 @@ public class TestZkJobCoordinator {
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new 
JobModel(new MapConfig(), new HashMap<>()));
 
     ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new 
MapConfig(), new NoOpMetricsRegistry(), zkUtils));
-    zkJobCoordinator.onNewJobModelAvailable(TEST_JOB_MODEL_VERSION);
+    doAnswer(new Answer<Void>() {
+      public Void answer(InvocationOnMock invocation) {
+        jcShutdownLatch.countDown();
+        return null;
+      }
+    }).when(zkJobCoordinator).stop();
 
+    final ZkJobCoordinator.ZkJobModelVersionChangeHandler 
zkJobModelVersionChangeHandler = zkJobCoordinator.new 
ZkJobModelVersionChangeHandler(zkUtils);
+    zkJobModelVersionChangeHandler.doHandleDataChange("path", 
TEST_JOB_MODEL_VERSION);
     verify(zkJobCoordinator, Mockito.atMost(1)).stop();
+    assertTrue("Timed out waiting for JobCoordinator to stop", 
jcShutdownLatch.await(1, TimeUnit.MINUTES));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index e49dc13..19a05a6 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -145,23 +145,21 @@ public class TestZkUtils {
   public void testZKProtocolVersion() {
     // first time connect, version should be set to ZkUtils.ZK_PROTOCOL_VERSION
     ZkLeaderElector le = new ZkLeaderElector("1", zkUtils);
-    ZkControllerImpl zkController = new ZkControllerImpl("1", zkUtils, null, 
le);
-    zkController.register();
+    zkUtils.validateZkVersion();
+
     String root = zkUtils.getKeyBuilder().getRootPath();
     String ver = (String) zkUtils.getZkClient().readData(root);
     Assert.assertEquals(ZkUtils.ZK_PROTOCOL_VERSION, ver);
 
     // do it again (in case original value was null
-    zkController = new ZkControllerImpl("1", zkUtils, null, le);
-    zkController.register();
+    zkUtils.validateZkVersion();
     ver = (String) zkUtils.getZkClient().readData(root);
     Assert.assertEquals(ZkUtils.ZK_PROTOCOL_VERSION, ver);
 
     // now negative case
     zkUtils.getZkClient().writeData(root, "2.0");
     try {
-      zkController = new ZkControllerImpl("1", zkUtils, null, le);
-      zkController.register();
+      zkUtils.validateZkVersion();
       Assert.fail("Expected to fail because of version mismatch 2.0 vs 1.0");
     } catch (SamzaException e) {
       // expected
@@ -178,8 +176,7 @@ public class TestZkUtils {
     }
 
     try {
-      zkController = new ZkControllerImpl("1", zkUtils, null, le);
-      zkController.register();
+      zkUtils.validateZkVersion();
       Assert.fail("Expected to fail because of version mismatch 2.0 vs 3.0");
     } catch (SamzaException e) {
       // expected

Reply via email to