Repository: incubator-reef Updated Branches: refs/heads/master 234a91a1c -> 14b6d4b59
[REEF-704] Invoke onDriverRestartContextActive in ContextRepresenters for Evaluators with Active Contexts This adds a call to DriverRestartContextActiveHandlers in ContextRepresenters and removes DriverRestartCompletedHandlers from EvaluatorMessageDispatcher. JIRA: [REEF-704](https://issues.apache.org/jira/browse/REEF-704) Pull Request: This closes #452 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/14b6d4b5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/14b6d4b5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/14b6d4b5 Branch: refs/heads/master Commit: 14b6d4b59c28b953336df693cd10948b02d3c0ad Parents: 234a91a Author: Andrew Chung <[email protected]> Authored: Tue Sep 1 11:06:27 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Tue Sep 1 11:08:27 2015 -0700 ---------------------------------------------------------------------- .../common/driver/context/ContextRepresenters.java | 15 ++++++++++++++- .../driver/evaluator/EvaluatorMessageDispatcher.java | 11 ----------- 2 files changed, 14 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/14b6d4b5/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java index 4936b9a..a1293dc 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java @@ -23,6 +23,8 @@ import net.jcip.annotations.ThreadSafe; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.restart.DriverRestartManager; +import org.apache.reef.driver.restart.EvaluatorRestartState; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; import org.apache.reef.util.Optional; @@ -50,11 +52,15 @@ public final class ContextRepresenters { @GuardedBy("this") private final Set<String> contextIds = new HashSet<>(); + private final DriverRestartManager driverRestartManager; + @Inject private ContextRepresenters(final EvaluatorMessageDispatcher messageDispatcher, - final ContextFactory contextFactory) { + final ContextFactory contextFactory, + final DriverRestartManager driverRestartManager) { this.messageDispatcher = messageDispatcher; this.contextFactory = contextFactory; + this.driverRestartManager = driverRestartManager; } /** @@ -211,6 +217,13 @@ public final class ContextRepresenters { final EvaluatorContext context = contextFactory.newContext(contextID, parentID); this.addContext(context); if (notifyClientOnNewActiveContext) { + if (driverRestartManager.getEvaluatorRestartState(context.getEvaluatorId()) + == EvaluatorRestartState.REREGISTERED) { + // if restart, advance restart state and all the restart context active handlers. + driverRestartManager.setEvaluatorProcessed(context.getEvaluatorId()); + this.messageDispatcher.onDriverRestartContextActive(context); + } + this.messageDispatcher.onContextActive(context); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/14b6d4b5/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java index e46b06c..aa5f43b 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java @@ -27,7 +27,6 @@ import org.apache.reef.driver.evaluator.CompletedEvaluator; import org.apache.reef.driver.evaluator.FailedEvaluator; import org.apache.reef.driver.parameters.*; import org.apache.reef.driver.task.*; -import org.apache.reef.driver.restart.DriverRestartCompleted; import org.apache.reef.runtime.common.driver.DriverExceptionHandler; import org.apache.reef.runtime.common.utils.DispatchingEStage; import org.apache.reef.tang.annotations.Parameter; @@ -116,8 +115,6 @@ public final class EvaluatorMessageDispatcher { final Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers, @Parameter(DriverRestartContextActiveHandlers.class) final Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers, - @Parameter(DriverRestartCompletedHandlers.class) - final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers, @Parameter(DriverRestartFailedEvaluatorHandlers.class) final Set<EventHandler<FailedEvaluator>> driverRestartEvaluatorFailedHandlers, @@ -126,8 +123,6 @@ public final class EvaluatorMessageDispatcher { final Set<EventHandler<RunningTask>> serviceDriverRestartTaskRunningHandlers, @Parameter(ServiceDriverRestartContextActiveHandlers.class) final Set<EventHandler<ActiveContext>> serviceDriverRestartActiveContextHandlers, - @Parameter(ServiceDriverRestartCompletedHandlers.class) - final Set<EventHandler<DriverRestartCompleted>> serviceDriverRestartCompletedHandlers, @Parameter(ServiceDriverRestartFailedEvaluatorHandlers.class) final Set<EventHandler<FailedEvaluator>> serviceDriverRestartFailedEvaluatorHandlers, @@ -179,7 +174,6 @@ public final class EvaluatorMessageDispatcher { // Application event handlers specific to a Driver restart this.driverRestartApplicationDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers); this.driverRestartApplicationDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers); - this.driverRestartApplicationDispatcher.register(DriverRestartCompleted.class, driverRestartCompletedHandlers); final Set<EventHandler<FailedEvaluator>> driverRestartEvaluatorFailedCallbackHandlers = new HashSet<>(); for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : driverRestartEvaluatorFailedHandlers) { @@ -193,7 +187,6 @@ public final class EvaluatorMessageDispatcher { // Service event handlers specific to a Driver restart this.driverRestartServiceDispatcher.register(RunningTask.class, serviceDriverRestartTaskRunningHandlers); this.driverRestartServiceDispatcher.register(ActiveContext.class, serviceDriverRestartActiveContextHandlers); - this.driverRestartServiceDispatcher.register(DriverRestartCompleted.class, serviceDriverRestartCompletedHandlers); this.driverRestartServiceDispatcher.register(FailedEvaluator.class, serviceDriverRestartFailedEvaluatorHandlers); final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedCallbackHandlers = new HashSet<>(); @@ -273,10 +266,6 @@ public final class EvaluatorMessageDispatcher { this.dispatchForRestartedDriver(FailedEvaluator.class, failedEvaluator); } - public void onDriverRestartCompleted(final DriverRestartCompleted restartCompleted) { - this.dispatchForRestartedDriver(DriverRestartCompleted.class, restartCompleted); - } - boolean isEmpty() { return this.applicationDispatcher.isEmpty(); }
