Repository: incubator-reef
Updated Branches:
  refs/heads/master 8598d4da6 -> 9d107c906


[REEF-541] Avoid AllocatedEvaluator event on Driver restart.

This addressed the issue by separating the call to AllocatedEvaluatorHandler
from the EvaluatorManager constructor.

JIRA:
  [REEF-541](https://issues.apache.org/jira/browse/REEF-541)

Pull Request:
  This closes #337


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/9d107c90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/9d107c90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/9d107c90

Branch: refs/heads/master
Commit: 9d107c9063902e9bc84ce45c416ed57d24bc3d6f
Parents: 8598d4d
Author: Andrew Chung <[email protected]>
Authored: Wed Aug 5 10:52:48 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Aug 5 11:47:24 2015 -0700

----------------------------------------------------------------------
 .../driver/evaluator/EvaluatorManager.java      | 47 +++++++++++++++-----
 .../evaluator/EvaluatorManagerFactory.java      | 18 ++++++--
 .../common/driver/evaluator/Evaluators.java     |  2 +-
 3 files changed, 52 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9d107c90/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index b41f290..91e9e1d 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -98,10 +98,17 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
   private final ExceptionCodec exceptionCodec;
   private final DriverStatusManager driverStatusManager;
   private final EventHandlerIdlenessSource idlenessSource;
+  private final RemoteManager remoteManager;
+  private final ConfigurationSerializer configurationSerializer;
+  private final LoggingScopeFactory loggingScopeFactory;
+  private final Set<ConfigurationProvider> evaluatorConfigurationProviders;
+  private final JVMProcessFactory jvmProcessFactory;
+  private final CLRProcessFactory clrProcessFactory;
 
   // Mutable fields
   private Optional<TaskRepresenter> task = Optional.empty();
   private boolean isResourceReleased = false;
+  private boolean allocationFired = false;
 
   @Inject
   private EvaluatorManager(
@@ -142,17 +149,13 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
     this.driverStatusManager = driverStatusManager;
     this.exceptionCodec = exceptionCodec;
 
-    final AllocatedEvaluator allocatedEvaluator =
-        new AllocatedEvaluatorImpl(this,
-            remoteManager.getMyIdentifier(),
-            configurationSerializer,
-            getJobIdentifier(),
-            loggingScopeFactory,
-            evaluatorConfigurationProviders,
-            jvmProcessFactory,
-            clrProcessFactory);
-    LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with 
ID [{0}]", evaluatorId);
-    this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
+    this.remoteManager = remoteManager;
+    this.configurationSerializer = configurationSerializer;
+    this.loggingScopeFactory = loggingScopeFactory;
+    this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
+    this.jvmProcessFactory = jvmProcessFactory;
+    this.clrProcessFactory = clrProcessFactory;
+
     LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: 
[{0}]", this.getId());
   }
 
@@ -174,6 +177,28 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
     return "REEF_LOCAL_RUNTIME";
   }
 
+  /**
+   * Fires the EvaluatorAllocatedEvent to the handlers. Can only be done once.
+   */
+  public synchronized void fireEvaluatorAllocatedEvent() {
+    if (!allocationFired && stateManager.isAllocated()) {
+      final AllocatedEvaluator allocatedEvaluator =
+          new AllocatedEvaluatorImpl(this,
+              remoteManager.getMyIdentifier(),
+              configurationSerializer,
+              getJobIdentifier(),
+              loggingScopeFactory,
+              evaluatorConfigurationProviders,
+              jvmProcessFactory,
+              clrProcessFactory);
+      LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator 
with ID [{0}]", evaluatorId);
+      messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
+      allocationFired = true;
+    } else {
+      LOG.log(Level.WARNING, "Evaluator allocated event fired twice.");
+    }
+  }
+
   private static boolean isDoneOrFailedOrKilled(final ResourceStatusEvent 
resourceStatusEvent) {
     return resourceStatusEvent.getState() == ReefServiceProtos.State.DONE ||
         resourceStatusEvent.getState() == ReefServiceProtos.State.FAILED ||

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9d107c90/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
index da35145..2a0dd49 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
@@ -83,11 +83,13 @@ public final class EvaluatorManagerFactory {
 
   /**
    * Instantiates a new EvaluatorManager based on a resource allocation.
+   * Fires the EvaluatorAllocatedEvent.
    *
    * @param resourceAllocationEvent
-   * @return
+   * @return an EvaluatorManager for the newly allocated Evaluator.
    */
-  public EvaluatorManager getNewEvaluatorManager(final ResourceAllocationEvent 
resourceAllocationEvent) {
+  public EvaluatorManager getNewEvaluatorManagerForNewlyAllocatedEvaluator(
+      final ResourceAllocationEvent resourceAllocationEvent) {
     final NodeDescriptor nodeDescriptor = 
this.resourceCatalog.getNode(resourceAllocationEvent.getNodeId());
 
     if (nodeDescriptor == null) {
@@ -98,9 +100,19 @@ public final class EvaluatorManagerFactory {
         processFactory.newEvaluatorProcess());
 
     LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]", 
resourceAllocationEvent.getIdentifier());
-    return 
this.getNewEvaluatorManagerInstance(resourceAllocationEvent.getIdentifier(), 
evaluatorDescriptor);
+    final EvaluatorManager evaluatorManager =
+        
getNewEvaluatorManagerInstance(resourceAllocationEvent.getIdentifier(), 
evaluatorDescriptor);
+    evaluatorManager.fireEvaluatorAllocatedEvent();
+
+    return evaluatorManager;
   }
 
+  /**
+   * Instantiates a new EvaluatorManager for a failed evaluator during driver 
restart.
+   * Does not fire an EvaluatorAllocatedEvent.
+   * @param resourceStatusEvent
+   * @return an EvaluatorManager for the user to call fail on.
+   */
   public EvaluatorManager createForEvaluatorFailedDuringDriverRestart(final 
ResourceStatusEvent resourceStatusEvent) {
     if (!resourceStatusEvent.getIsFromPreviousDriver().get()) {
       throw new RuntimeException("Invalid resourceStatusEvent, must be status 
for resource from previous Driver.");

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9d107c90/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
index c39844a..fe464d9 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
@@ -107,7 +107,7 @@ public final class Evaluators implements AutoCloseable {
   public synchronized void put(
       final EvaluatorManagerFactory evaluatorManagerFactory,
       final ResourceAllocationEvent evaluatorMsg) {
-    this.put(evaluatorManagerFactory.getNewEvaluatorManager(evaluatorMsg));
+    
this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewlyAllocatedEvaluator(evaluatorMsg));
   }
 
   /**

Reply via email to