Repository: incubator-reef
Updated Branches:
  refs/heads/master 234a91a1c -> 14b6d4b59


[REEF-704] Invoke onDriverRestartContextActive in ContextRepresenters for 
Evaluators with Active Contexts

This adds a call to DriverRestartContextActiveHandlers in
ContextRepresenters and removes DriverRestartCompletedHandlers from
EvaluatorMessageDispatcher.

JIRA:
  [REEF-704](https://issues.apache.org/jira/browse/REEF-704)

Pull Request:
  This closes #452


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/14b6d4b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/14b6d4b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/14b6d4b5

Branch: refs/heads/master
Commit: 14b6d4b59c28b953336df693cd10948b02d3c0ad
Parents: 234a91a
Author: Andrew Chung <[email protected]>
Authored: Tue Sep 1 11:06:27 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Tue Sep 1 11:08:27 2015 -0700

----------------------------------------------------------------------
 .../common/driver/context/ContextRepresenters.java   | 15 ++++++++++++++-
 .../driver/evaluator/EvaluatorMessageDispatcher.java | 11 -----------
 2 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/14b6d4b5/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
index 4936b9a..a1293dc 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
@@ -23,6 +23,8 @@ import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.restart.DriverRestartManager;
+import org.apache.reef.driver.restart.EvaluatorRestartState;
 import org.apache.reef.proto.ReefServiceProtos;
 import 
org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
 import org.apache.reef.util.Optional;
@@ -50,11 +52,15 @@ public final class ContextRepresenters {
   @GuardedBy("this")
   private final Set<String> contextIds = new HashSet<>();
 
+  private final DriverRestartManager driverRestartManager;
+
   @Inject
   private ContextRepresenters(final EvaluatorMessageDispatcher 
messageDispatcher,
-                              final ContextFactory contextFactory) {
+                              final ContextFactory contextFactory,
+                              final DriverRestartManager driverRestartManager) 
{
     this.messageDispatcher = messageDispatcher;
     this.contextFactory = contextFactory;
+    this.driverRestartManager = driverRestartManager;
   }
 
   /**
@@ -211,6 +217,13 @@ public final class ContextRepresenters {
     final EvaluatorContext context = contextFactory.newContext(contextID, 
parentID);
     this.addContext(context);
     if (notifyClientOnNewActiveContext) {
+      if 
(driverRestartManager.getEvaluatorRestartState(context.getEvaluatorId())
+          == EvaluatorRestartState.REREGISTERED) {
+        // if restart, advance restart state and all the restart context 
active handlers.
+        driverRestartManager.setEvaluatorProcessed(context.getEvaluatorId());
+        this.messageDispatcher.onDriverRestartContextActive(context);
+      }
+
       this.messageDispatcher.onContextActive(context);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/14b6d4b5/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
index e46b06c..aa5f43b 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -27,7 +27,6 @@ import org.apache.reef.driver.evaluator.CompletedEvaluator;
 import org.apache.reef.driver.evaluator.FailedEvaluator;
 import org.apache.reef.driver.parameters.*;
 import org.apache.reef.driver.task.*;
-import org.apache.reef.driver.restart.DriverRestartCompleted;
 import org.apache.reef.runtime.common.driver.DriverExceptionHandler;
 import org.apache.reef.runtime.common.utils.DispatchingEStage;
 import org.apache.reef.tang.annotations.Parameter;
@@ -116,8 +115,6 @@ public final class EvaluatorMessageDispatcher {
       final Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers,
       @Parameter(DriverRestartContextActiveHandlers.class)
       final Set<EventHandler<ActiveContext>> 
driverRestartActiveContextHandlers,
-      @Parameter(DriverRestartCompletedHandlers.class)
-      final Set<EventHandler<DriverRestartCompleted>> 
driverRestartCompletedHandlers,
       @Parameter(DriverRestartFailedEvaluatorHandlers.class)
       final Set<EventHandler<FailedEvaluator>> 
driverRestartEvaluatorFailedHandlers,
 
@@ -126,8 +123,6 @@ public final class EvaluatorMessageDispatcher {
       final Set<EventHandler<RunningTask>> 
serviceDriverRestartTaskRunningHandlers,
       @Parameter(ServiceDriverRestartContextActiveHandlers.class)
       final Set<EventHandler<ActiveContext>> 
serviceDriverRestartActiveContextHandlers,
-      @Parameter(ServiceDriverRestartCompletedHandlers.class)
-      final Set<EventHandler<DriverRestartCompleted>> 
serviceDriverRestartCompletedHandlers,
       @Parameter(ServiceDriverRestartFailedEvaluatorHandlers.class)
       final Set<EventHandler<FailedEvaluator>> 
serviceDriverRestartFailedEvaluatorHandlers,
 
@@ -179,7 +174,6 @@ public final class EvaluatorMessageDispatcher {
     // Application event handlers specific to a Driver restart
     this.driverRestartApplicationDispatcher.register(RunningTask.class, 
driverRestartTaskRunningHandlers);
     this.driverRestartApplicationDispatcher.register(ActiveContext.class, 
driverRestartActiveContextHandlers);
-    
this.driverRestartApplicationDispatcher.register(DriverRestartCompleted.class, 
driverRestartCompletedHandlers);
 
     final Set<EventHandler<FailedEvaluator>> 
driverRestartEvaluatorFailedCallbackHandlers = new HashSet<>();
     for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : 
driverRestartEvaluatorFailedHandlers) {
@@ -193,7 +187,6 @@ public final class EvaluatorMessageDispatcher {
     // Service event handlers specific to a Driver restart
     this.driverRestartServiceDispatcher.register(RunningTask.class, 
serviceDriverRestartTaskRunningHandlers);
     this.driverRestartServiceDispatcher.register(ActiveContext.class, 
serviceDriverRestartActiveContextHandlers);
-    this.driverRestartServiceDispatcher.register(DriverRestartCompleted.class, 
serviceDriverRestartCompletedHandlers);
     this.driverRestartServiceDispatcher.register(FailedEvaluator.class, 
serviceDriverRestartFailedEvaluatorHandlers);
 
     final Set<EventHandler<CompletedEvaluator>> 
evaluatorCompletedCallbackHandlers = new HashSet<>();
@@ -273,10 +266,6 @@ public final class EvaluatorMessageDispatcher {
     this.dispatchForRestartedDriver(FailedEvaluator.class, failedEvaluator);
   }
 
-  public void onDriverRestartCompleted(final DriverRestartCompleted 
restartCompleted) {
-    this.dispatchForRestartedDriver(DriverRestartCompleted.class, 
restartCompleted);
-  }
-
   boolean isEmpty() {
     return this.applicationDispatcher.isEmpty();
   }

Reply via email to