Repository: samza Updated Branches: refs/heads/master 12968cfb6 -> d7a071b34
SAMZA-1664: ZkJobCoordinator stability fixes. Issues fixed: * Handle job coordinator shutdown gracefully in case of unclean container shutdowns. * Fix the zookeeper session handling logic. * Fix the forever retry timeout in ZkClient re-connect. Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Xinyu Liu <xinyuliu...@gmail.com> Closes #476 from shanthoosh/pullInK2Changes Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d7a071b3 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d7a071b3 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d7a071b3 Branch: refs/heads/master Commit: d7a071b346a2a6d31e90fc038bd30c7ba8cb7ef1 Parents: 12968cf Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Tue Apr 17 16:02:16 2018 -0700 Committer: xiliu <xi...@linkedin.com> Committed: Tue Apr 17 16:02:16 2018 -0700 ---------------------------------------------------------------------- .../samza/zk/ScheduleAfterDebounceTime.java | 35 +++++-- .../org/apache/samza/zk/ZkJobCoordinator.java | 100 +++++++++++++++---- .../samza/zk/TestScheduleAfterDebounceTime.java | 53 ---------- 3 files changed, 102 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/d7a071b3/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java index b53d245..f6f2dc9 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java @@ -20,12 +20,12 @@ package org.apache.samza.zk; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import org.slf4j.Logger; @@ -55,7 +55,8 @@ public class ScheduleAfterDebounceTime { /** * A map from actionName to {@link ScheduledFuture} of task scheduled for execution. */ - private final Map<String, ScheduledFuture> futureHandles = new HashMap<>(); + private final Map<String, ScheduledFuture> futureHandles = new ConcurrentHashMap<>(); + private boolean isShuttingDown; public ScheduleAfterDebounceTime() { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(DEBOUNCE_THREAD_NAME_FORMAT).setDaemon(true).build(); @@ -93,12 +94,24 @@ public class ScheduleAfterDebounceTime { * and all pending enqueued tasks will be cancelled. */ public synchronized void stopScheduler() { - LOG.info("Stopping Scheduler"); + if (isShuttingDown) { + LOG.debug("Debounce timer shutdown is already in progress!"); + return; + } + + isShuttingDown = true; + LOG.info("Shutting down debounce timer!"); - scheduledExecutorService.shutdownNow(); + // changing it back to use shutdown instead to prevent interruptions on the active task + scheduledExecutorService.shutdown(); + + // should clear out the future handles as well + futureHandles.keySet() + .forEach(this::tryCancelScheduledAction); + } - // Clear the existing future handles. - futureHandles.clear(); + public synchronized void cancelAction(String action) { + this.tryCancelScheduledAction(action); } /** @@ -144,22 +157,22 @@ public class ScheduleAfterDebounceTime { } else { LOG.debug("Action: {} completed successfully.", actionName); } - } catch (Throwable t) { - LOG.error("Execution of action: {} failed.", actionName, t); - doCleanUpOnTaskException(t); + } catch (Throwable throwable) { + LOG.error("Execution of action: {} failed.", actionName, throwable); + doCleanUpOnTaskException(throwable); } }; } /** - * Handler method to invoke on a throwable during an scheduled task execution and which + * Handler method to invoke on a exception during an scheduled task execution and which * the following operations in sequential order. * <ul> * <li> Stop the scheduler. If the task execution fails or a task is interrupted, scheduler will not accept/execute any new tasks.</li> * <li> Invokes the onError handler method if taskCallback is defined.</li> * </ul> * - * @param throwable the throwable that happened during task execution. + * @param throwable the exception happened during task execution. */ private void doCleanUpOnTaskException(Throwable throwable) { stopScheduler(); http://git-wip-us.apache.org/repos/asf/samza/blob/d7a071b3/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 407291a..0e0f815 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -58,6 +58,8 @@ import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.zookeeper.Watcher.Event.KeeperState.*; + /** * JobCoordinator for stand alone processor managed via Zookeeper. */ @@ -97,6 +99,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { private JobModel newJobModel; private int debounceTimeMs; private boolean hasCreatedStreams = false; + private boolean initiatedShutdown = false; private String cachedJobModelVersion = null; private Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>(); @@ -121,7 +124,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId); debounceTimer = new ScheduleAfterDebounceTime(); debounceTimer.setScheduledTaskCallback(throwable -> { - LOG.error("Received exception from in JobCoordinator Processing!", throwable); + LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable); stop(); }); systemAdmins = new SystemAdmins(config); @@ -137,19 +140,50 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { @Override public synchronized void stop() { - if (coordinatorListener != null) { - coordinatorListener.onJobModelExpired(); + // Make the shutdown idempotent + if (initiatedShutdown) { + LOG.debug("Job Coordinator shutdown is already in progress!"); + return; } - //Setting the isLeader metric to false when the stream processor shuts down because it does not remain the leader anymore + + LOG.info("Shutting down Job Coordinator..."); + initiatedShutdown = true; + boolean shutdownSuccessful = false; + + // Notify the metrics about abandoning the leadership. Moving it up the chain in the shutdown sequence so that + // in case of unclean shutdown, we get notified about lack of leader and we can set up some alerts around the absence of leader. metrics.isLeader.set(false); - debounceTimer.stopScheduler(); - zkController.stop(); - shutdownMetrics(); - if (coordinatorListener != null) { - coordinatorListener.onCoordinatorStop(); + try { + // todo: what does it mean for coordinator listener to be null? why not have it part of constructor? + if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); + } + + debounceTimer.stopScheduler(); + + LOG.debug("Shutting down ZkController."); + zkController.stop(); + + LOG.debug("Shutting down system admins."); + systemAdmins.stop(); + + LOG.debug("Shutting down metrics."); + shutdownMetrics(); + + if (coordinatorListener != null) { + coordinatorListener.onCoordinatorStop(); + } + + shutdownSuccessful = true; + } catch (Throwable t) { + LOG.error("Encountered errors during job coordinator stop.", t); + if (coordinatorListener != null) { + coordinatorListener.onCoordinatorFailure(t); + } + } finally { + LOG.info("Job Coordinator shutdown finished with ShutdownComplete=" + shutdownSuccessful); } - systemAdmins.stop(); } private void startMetrics() { @@ -380,7 +414,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { } } - /// listener to handle session expiration + /// listener to handle ZK state change events class ZkSessionStateChangedListener implements IZkStateListener { private static final String ZK_SESSION_ERROR = "ZK_SESSION_ERROR"; @@ -388,19 +422,41 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { @Override public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { - if (state == Watcher.Event.KeeperState.Expired) { - // if the session has expired it means that all the registration's ephemeral nodes are gone. - LOG.warn("Got session expired event for processor=" + processorId); + switch (state) { + case Expired: + // if the session has expired it means that all the registration's ephemeral nodes are gone. + LOG.warn("Got " + state.toString() + " event for processor=" + processorId + ". Stopping the container and unregister the processor node."); - // increase generation of the ZK connection. All the callbacks from the previous generation will be ignored. - zkUtils.incGeneration(); + // increase generation of the ZK session. All the callbacks from the previous generation will be ignored. + zkUtils.incGeneration(); - if (coordinatorListener != null) { - coordinatorListener.onJobModelExpired(); - } - // reset all the values that might have been from the previous session (e.g ephemeral node path) - zkUtils.unregister(); + if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); + } + // reset all the values that might have been from the previous session (e.g ephemeral node path) + zkUtils.unregister(); + return; + case Disconnected: + // if the session has expired it means that all the registration's ephemeral nodes are gone. + LOG.warn("Got " + state.toString() + " event for processor=" + processorId + ". Scheduling a coordinator stop."); + + // If the connection is not restored after debounceTimeMs, the process is considered dead. + debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, new ZkConfig(config).getZkSessionTimeoutMs(), () -> stop()); + return; + case AuthFailed: + case NoSyncConnected: + case Unknown: + LOG.warn("Got unexpected failure event " + state.toString() + " for processor=" + processorId + ". Stopping the job coordinator."); + debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop()); + return; + case SyncConnected: + LOG.info("Got syncconnected event for processor=" + processorId + "."); + debounceTimer.cancelAction(ZK_SESSION_ERROR); + return; + default: + // received SyncConnected, ConnectedReadOnly, and SaslAuthenticated. NoOp + LOG.info("Got ZK event " + state.toString() + " for processor=" + processorId + ". Continue"); } } @@ -416,7 +472,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { @Override public void handleSessionEstablishmentError(Throwable error) throws Exception { - // this means we cannot connect to zookeeper + // this means we cannot connect to zookeeper to establish a session LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error); debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop()); } http://git-wip-us.apache.org/repos/asf/samza/blob/d7a071b3/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java index 697833b..7f687d7 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java @@ -121,57 +121,4 @@ public class TestScheduleAfterDebounceTime { Assert.assertEquals(RuntimeException.class, taskCallbackException[0].getClass()); scheduledQueue.stopScheduler(); } - - /** - * Validates if the interrupted exception triggered by ExecutorService is handled by ScheduleAfterDebounceTime. - */ - @Test - public void testStopSchedulerInvokesRegisteredCallback() throws InterruptedException { - final CountDownLatch hasTaskCallbackCompleted = new CountDownLatch(1); - final CountDownLatch hasThreadStarted = new CountDownLatch(1); - final CountDownLatch isSchedulerShutdownTriggered = new CountDownLatch(1); - - /** - * Declaring this as an array to record the value inside the lambda. - */ - final Throwable[] taskCallbackException = new Exception[1]; - - ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(); - scheduledQueue.setScheduledTaskCallback(throwable -> { - /** - * Assertion failures in callback doesn't fail the test. - * Record the received exception here and assert outside - * the callback. - */ - taskCallbackException[0] = throwable; - hasTaskCallbackCompleted.countDown(); - }); - - scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME , () -> { - hasThreadStarted.countDown(); - try { - LOG.debug("Waiting for the scheduler shutdown trigger."); - isSchedulerShutdownTriggered.await(); - } catch (InterruptedException e) { - /** - * Don't swallow the exception and restore the interrupt status. - * Expect the ScheduleDebounceTime to handle this interrupt - * and invoke ScheduledTaskCallback. - */ - Thread.currentThread().interrupt(); - } - }); - - // Wait for the task to run. - hasThreadStarted.await(); - - // Shutdown the scheduler and update relevant state. - scheduledQueue.stopScheduler(); - isSchedulerShutdownTriggered.countDown(); - - hasTaskCallbackCompleted.await(); - - // Assert on exception thrown. - Assert.assertEquals(InterruptedException.class, taskCallbackException[0].getClass()); - } }