Repository: incubator-reef Updated Branches: refs/heads/master 8598d4da6 -> 9d107c906
[REEF-541] Avoid AllocatedEvaluator event on Driver restart. This addressed the issue by separating the call to AllocatedEvaluatorHandler from the EvaluatorManager constructor. JIRA: [REEF-541](https://issues.apache.org/jira/browse/REEF-541) Pull Request: This closes #337 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/9d107c90 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/9d107c90 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/9d107c90 Branch: refs/heads/master Commit: 9d107c9063902e9bc84ce45c416ed57d24bc3d6f Parents: 8598d4d Author: Andrew Chung <[email protected]> Authored: Wed Aug 5 10:52:48 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed Aug 5 11:47:24 2015 -0700 ---------------------------------------------------------------------- .../driver/evaluator/EvaluatorManager.java | 47 +++++++++++++++----- .../evaluator/EvaluatorManagerFactory.java | 18 ++++++-- .../common/driver/evaluator/Evaluators.java | 2 +- 3 files changed, 52 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9d107c90/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java index b41f290..91e9e1d 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java @@ -98,10 +98,17 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { private final ExceptionCodec exceptionCodec; private final DriverStatusManager driverStatusManager; private final EventHandlerIdlenessSource idlenessSource; + private final RemoteManager remoteManager; + private final ConfigurationSerializer configurationSerializer; + private final LoggingScopeFactory loggingScopeFactory; + private final Set<ConfigurationProvider> evaluatorConfigurationProviders; + private final JVMProcessFactory jvmProcessFactory; + private final CLRProcessFactory clrProcessFactory; // Mutable fields private Optional<TaskRepresenter> task = Optional.empty(); private boolean isResourceReleased = false; + private boolean allocationFired = false; @Inject private EvaluatorManager( @@ -142,17 +149,13 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { this.driverStatusManager = driverStatusManager; this.exceptionCodec = exceptionCodec; - final AllocatedEvaluator allocatedEvaluator = - new AllocatedEvaluatorImpl(this, - remoteManager.getMyIdentifier(), - configurationSerializer, - getJobIdentifier(), - loggingScopeFactory, - evaluatorConfigurationProviders, - jvmProcessFactory, - clrProcessFactory); - LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId); - this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator); + this.remoteManager = remoteManager; + this.configurationSerializer = configurationSerializer; + this.loggingScopeFactory = loggingScopeFactory; + this.evaluatorConfigurationProviders = evaluatorConfigurationProviders; + this.jvmProcessFactory = jvmProcessFactory; + this.clrProcessFactory = clrProcessFactory; + LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId()); } @@ -174,6 +177,28 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { return "REEF_LOCAL_RUNTIME"; } + /** + * Fires the EvaluatorAllocatedEvent to the handlers. Can only be done once. + */ + public synchronized void fireEvaluatorAllocatedEvent() { + if (!allocationFired && stateManager.isAllocated()) { + final AllocatedEvaluator allocatedEvaluator = + new AllocatedEvaluatorImpl(this, + remoteManager.getMyIdentifier(), + configurationSerializer, + getJobIdentifier(), + loggingScopeFactory, + evaluatorConfigurationProviders, + jvmProcessFactory, + clrProcessFactory); + LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId); + messageDispatcher.onEvaluatorAllocated(allocatedEvaluator); + allocationFired = true; + } else { + LOG.log(Level.WARNING, "Evaluator allocated event fired twice."); + } + } + private static boolean isDoneOrFailedOrKilled(final ResourceStatusEvent resourceStatusEvent) { return resourceStatusEvent.getState() == ReefServiceProtos.State.DONE || resourceStatusEvent.getState() == ReefServiceProtos.State.FAILED || http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9d107c90/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java index da35145..2a0dd49 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java @@ -83,11 +83,13 @@ public final class EvaluatorManagerFactory { /** * Instantiates a new EvaluatorManager based on a resource allocation. + * Fires the EvaluatorAllocatedEvent. * * @param resourceAllocationEvent - * @return + * @return an EvaluatorManager for the newly allocated Evaluator. */ - public EvaluatorManager getNewEvaluatorManager(final ResourceAllocationEvent resourceAllocationEvent) { + public EvaluatorManager getNewEvaluatorManagerForNewlyAllocatedEvaluator( + final ResourceAllocationEvent resourceAllocationEvent) { final NodeDescriptor nodeDescriptor = this.resourceCatalog.getNode(resourceAllocationEvent.getNodeId()); if (nodeDescriptor == null) { @@ -98,9 +100,19 @@ public final class EvaluatorManagerFactory { processFactory.newEvaluatorProcess()); LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]", resourceAllocationEvent.getIdentifier()); - return this.getNewEvaluatorManagerInstance(resourceAllocationEvent.getIdentifier(), evaluatorDescriptor); + final EvaluatorManager evaluatorManager = + getNewEvaluatorManagerInstance(resourceAllocationEvent.getIdentifier(), evaluatorDescriptor); + evaluatorManager.fireEvaluatorAllocatedEvent(); + + return evaluatorManager; } + /** + * Instantiates a new EvaluatorManager for a failed evaluator during driver restart. + * Does not fire an EvaluatorAllocatedEvent. + * @param resourceStatusEvent + * @return an EvaluatorManager for the user to call fail on. + */ public EvaluatorManager createForEvaluatorFailedDuringDriverRestart(final ResourceStatusEvent resourceStatusEvent) { if (!resourceStatusEvent.getIsFromPreviousDriver().get()) { throw new RuntimeException("Invalid resourceStatusEvent, must be status for resource from previous Driver."); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9d107c90/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java index c39844a..fe464d9 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java @@ -107,7 +107,7 @@ public final class Evaluators implements AutoCloseable { public synchronized void put( final EvaluatorManagerFactory evaluatorManagerFactory, final ResourceAllocationEvent evaluatorMsg) { - this.put(evaluatorManagerFactory.getNewEvaluatorManager(evaluatorMsg)); + this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewlyAllocatedEvaluator(evaluatorMsg)); } /**
