Repository: incubator-reef Updated Branches: refs/heads/master 4b7288de8 -> 2d67da786
[REEF-616] Keep state of previous evaluators with a state machine This switches to a state machine model for evaluator statuses for driver restart. JIRA: [REEF-616](https://issues.apache.org/jira/browse/REEF-616) Pull Request: This closes #396 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2d67da78 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2d67da78 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2d67da78 Branch: refs/heads/master Commit: 2d67da786f4ec4b4dcb885ab471c78b8f91d8dbf Parents: 4b7288d Author: Andrew Chung <[email protected]> Authored: Thu Aug 20 15:32:25 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Fri Aug 21 11:25:13 2015 -0700 ---------------------------------------------------------------------- .../DefaultDriverRuntimeRestartMangerImpl.java | 2 +- .../driver/restart/DriverRestartManager.java | 138 ++++++++++++++----- .../reef/driver/restart/DriverRestartState.java | 7 + .../driver/restart/DriverRestartUtilities.java | 44 ------ .../driver/restart/EvaluatorRestartState.java | 106 ++++++++++++++ .../driver/context/ContextRepresenters.java | 4 +- .../evaluator/EvaluatorHeartbeatHandler.java | 34 ++++- .../driver/evaluator/EvaluatorManager.java | 55 +++++--- .../common/driver/task/TaskRepresenter.java | 7 +- 9 files changed, 282 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java index 22714c5..12934f7 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java @@ -29,7 +29,7 @@ import java.util.Set; /** * The default driver runtime restart manager that is not able to perform any restart actions. * Thus, when performing actions pertaining to restart, it is recommended to call - * {@link DriverRuntimeRestartManager#hasRestarted()} first or use static functions in {@link DriverRestartUtilities}. + * {@link DriverRuntimeRestartManager#hasRestarted()} first. */ @Private @DriverSide http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java index 54a2c54..3e1be1f 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java @@ -24,9 +24,7 @@ import org.apache.reef.annotations.audience.Private; import org.apache.reef.exception.DriverFatalRuntimeException; import javax.inject.Inject; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import java.util.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -40,16 +38,12 @@ import java.util.logging.Logger; public final class DriverRestartManager { private static final Logger LOG = Logger.getLogger(DriverRestartManager.class.getName()); private final DriverRuntimeRestartManager driverRuntimeRestartManager; - private final Set<String> previousEvaluators; - private final Set<String> recoveredEvaluators; - private DriverRestartState state; + private final Map<String, EvaluatorRestartState> previousEvaluators = new HashMap<>(); + private DriverRestartState state = DriverRestartState.NotRestarted; @Inject private DriverRestartManager(final DriverRuntimeRestartManager driverRuntimeRestartManager) { this.driverRuntimeRestartManager = driverRuntimeRestartManager; - this.state = DriverRestartState.NotRestarted; - this.previousEvaluators = new HashSet<>(); - this.recoveredEvaluators = new HashSet<>(); } /** @@ -58,7 +52,8 @@ public final class DriverRestartManager { * Can be already done with restart or in the process of restart. */ public synchronized boolean detectRestart() { - if (!this.state.hasRestarted() && driverRuntimeRestartManager.hasRestarted()) { + if (this.state.hasNotRestarted() && driverRuntimeRestartManager.hasRestarted()) { + // set the state machine in motion. this.state = DriverRestartState.RestartBegan; } @@ -89,31 +84,34 @@ public final class DriverRestartManager { final EvaluatorRestartInfo evaluatorRestartInfo = driverRuntimeRestartManager.getAliveAndFailedEvaluators(); setPreviousEvaluatorIds(evaluatorRestartInfo.getAliveEvaluators()); driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators()); + // TODO[REEF-560]: Call onDriverRestartCompleted() (to do in REEF-617) on a Timer. } /** - * @return whether restart is completed. + * @return The restart state of the specified evaluator. Returns {@link EvaluatorRestartState#NOT_EXPECTED} + * if the {@link DriverRestartManager} does not believe that it's an evaluator to be recovered. */ - public synchronized boolean isRestartCompleted() { - return this.state == DriverRestartState.RestartCompleted; - } + public synchronized EvaluatorRestartState getEvaluatorRestartState(final String evaluatorId) { + if (this.state.hasNotRestarted() || + !this.previousEvaluators.containsKey(evaluatorId)) { + return EvaluatorRestartState.NOT_EXPECTED; + } - /** - * @return the Evaluators expected to check in from a previous run. - */ - public synchronized Set<String> getPreviousEvaluatorIds() { - return Collections.unmodifiableSet(this.previousEvaluators); + return this.previousEvaluators.get(evaluatorId); } /** * Set the Evaluators to expect still active from a previous execution of the Driver in a restart situation. * To be called exactly once during a driver restart. * - * @param ids the evaluator IDs of the evaluators that are expected to have survived driver restart. + * @param previousEvaluatorIds the evaluator IDs of the evaluators that are expected to have survived driver restart. */ - public synchronized void setPreviousEvaluatorIds(final Set<String> ids) { - if (this.state != DriverRestartState.RestartInProgress) { - previousEvaluators.addAll(ids); + private synchronized void setPreviousEvaluatorIds(final Set<String> previousEvaluatorIds) { + if (this.state == DriverRestartState.RestartBegan) { + for (final String previousEvaluatorId : previousEvaluatorIds) { + setEvaluatorExpected(previousEvaluatorId); + } + this.state = DriverRestartState.RestartInProgress; } else { final String errMsg = "Should not be setting the set of expected alive evaluators more than once."; @@ -123,34 +121,26 @@ public final class DriverRestartManager { } /** - * @return the IDs of the Evaluators from a previous Driver that have checked in with the Driver - * in a restart situation. - */ - public synchronized Set<String> getRecoveredEvaluatorIds() { - return Collections.unmodifiableSet(this.previousEvaluators); - } - - /** * Indicate that this Driver has re-established the connection with one more Evaluator of a previous run. + * Calls the restart complete action if the latest evaluator is the last evaluator to recover. * @return true if the driver restart is completed. */ public synchronized boolean onRecoverEvaluatorIsRestartComplete(final String evaluatorId) { - if (!this.previousEvaluators.contains(evaluatorId)) { + if (!this.previousEvaluators.containsKey(evaluatorId) || + this.previousEvaluators.get(evaluatorId) == EvaluatorRestartState.NOT_EXPECTED) { final String errMsg = "Evaluator with evaluator ID " + evaluatorId + " not expected to be alive."; LOG.log(Level.SEVERE, errMsg); throw new DriverFatalRuntimeException(errMsg); } - if (!this.recoveredEvaluators.add(evaluatorId)) { + if (this.previousEvaluators.get(evaluatorId) != EvaluatorRestartState.EXPECTED) { LOG.log(Level.WARNING, "Evaluator with evaluator ID " + evaluatorId + " added to the set" + " of recovered evaluators more than once. Ignoring second add..."); + } else { + setEvaluatorReported(evaluatorId); } - if (this.recoveredEvaluators.containsAll(this.previousEvaluators)) { - this.state = DriverRestartState.RestartCompleted; - } - - return this.state == DriverRestartState.RestartCompleted; + return haveAllExpectedEvaluatorsReported(); } /** @@ -168,4 +158,76 @@ public final class DriverRestartManager { public synchronized void recordRemovedEvaluator(final String id) { driverRuntimeRestartManager.recordRemovedEvaluator(id); } + + /** + * Signals to the {@link DriverRestartManager} that an evaluator is to be expected to report back after restart. + */ + public synchronized void setEvaluatorExpected(final String evaluatorId) { + if (previousEvaluators.containsKey(evaluatorId)) { + LOG.log(Level.WARNING, "Evaluator " + evaluatorId + " is already added to the set of previous evaluators with " + + "state [" + previousEvaluators.get(evaluatorId) + "]. Ignoring..."); + return; + } + + previousEvaluators.put(evaluatorId, EvaluatorRestartState.EXPECTED); + } + + /** + * Signals to the {@link DriverRestartManager} that an evaluator has reported back after restart. + */ + public synchronized void setEvaluatorReported(final String evaluatorId) { + setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.REPORTED); + } + + /** + * Signals to the {@link DriverRestartManager} that an evaluator has had its recovery heartbeat processed. + */ + public synchronized void setEvaluatorReregistered(final String evaluatorId) { + setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.REREGISTERED); + } + + /** + * Signals to the {@link DriverRestartManager} that an evaluator has had its running task processed. + */ + public synchronized void setEvaluatorRunningTask(final String evaluatorId) { + setPreviousEvaluatorState( + evaluatorId, EvaluatorRestartState.PROCESSED); + } + + /** + * Signals to the {@link DriverRestartManager} that an expected evaluator has been expired. + */ + public synchronized void setEvaluatorExpired(final String evaluatorId) { + setPreviousEvaluatorState(evaluatorId, EvaluatorRestartState.EXPIRED); + } + + private synchronized void setPreviousEvaluatorState(final String evaluatorId, + final EvaluatorRestartState to) { + if (!previousEvaluators.containsKey(evaluatorId) || + !EvaluatorRestartState.isLegalTransition(previousEvaluators.get(evaluatorId), to)) { + throw evaluatorTransitionFailed(evaluatorId, to); + } + + previousEvaluators.put(evaluatorId, to); + } + + private synchronized DriverFatalRuntimeException evaluatorTransitionFailed(final String evaluatorId, + final EvaluatorRestartState to) { + if (!previousEvaluators.containsKey(evaluatorId)) { + return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " is not expected."); + } + + return new DriverFatalRuntimeException("Evaluator " + evaluatorId + " wants to transition to state " + + "[" + to + "], but is in the illegal state [" + previousEvaluators.get(evaluatorId) + "]."); + } + + private synchronized boolean haveAllExpectedEvaluatorsReported() { + for (final EvaluatorRestartState evaluatorRestartState : this.previousEvaluators.values()) { + if (!evaluatorRestartState.hasReported()) { + return false; + } + } + + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java index 5ffd741..a20ec4c 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java @@ -69,4 +69,11 @@ public enum DriverRestartState { public boolean hasRestarted() { return this != NotRestarted; } + + /** + * the negation of {@link #hasRestarted()}. + */ + public boolean hasNotRestarted() { + return !this.hasRestarted(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java deleted file mode 100644 index e8cdd33..0000000 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.driver.restart; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.annotations.audience.Private; - -/** - * A static utilities class for simplifying calls to driver restart manager. - */ -@Private -@DriverSide -@Unstable -public final class DriverRestartUtilities { - - /** - * Helper function for driver restart to determine whether an evaluator ID is from an evaluator from the - * previous application attempt. - */ - public static boolean isRestartAndIsPreviousEvaluator(final DriverRestartManager driverRestartManager, - final String evaluatorId) { - return driverRestartManager.hasRestarted() && driverRestartManager.getPreviousEvaluatorIds().contains(evaluatorId); - } - - private DriverRestartUtilities() { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java new file mode 100644 index 0000000..4a0c540 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.driver.restart; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +/** + * The state that the evaluator is in in the driver restart process. + */ +@Private +@DriverSide +@Unstable +public enum EvaluatorRestartState { + /** + * The evaluator is not a restarted instance. Not expecting. + */ + NOT_EXPECTED, + + /** + * Have not yet heard back from an evaluator, but we are expecting it to report back. + */ + EXPECTED, + + /** + * Received the evaluator heartbeat, but have not yet processed it. + */ + REPORTED, + + /** + * The evaluator has had its recovery heartbeat processed. + */ + REREGISTERED, + + /** + * The evaluator has had its context/running task processed. + */ + PROCESSED, + + /** + * The evaluator has only contacted the driver after the expiration period. + */ + EXPIRED; + + /** + * @return true if the transition of {@link EvaluatorRestartState} is legal. + */ + public static boolean isLegalTransition(final EvaluatorRestartState from, final EvaluatorRestartState to) { + switch(from) { + case EXPECTED: + switch(to) { + case REPORTED: + return true; + default: + return false; + } + case REPORTED: + switch(to) { + case REREGISTERED: + return true; + default: + return false; + } + case REREGISTERED: + switch(to) { + case PROCESSED: + return true; + default: + return false; + } + default: + return false; + } + } + + /** + * @return true if the evaluator has heartbeated back to the driver. + */ + public boolean hasReported() { + switch(this) { + case REPORTED: + case REREGISTERED: + case PROCESSED: + return true; + default: + return false; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java index 41bdd43..8abc19d 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java @@ -24,7 +24,7 @@ import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.context.FailedContext; import org.apache.reef.driver.restart.DriverRestartManager; -import org.apache.reef.driver.restart.DriverRestartUtilities; +import org.apache.reef.driver.restart.EvaluatorRestartState; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; import org.apache.reef.util.Optional; @@ -215,7 +215,7 @@ public final class ContextRepresenters { Optional.of(contextStatusProto.getParentId()) : Optional.<String>empty(); final EvaluatorContext context = contextFactory.newContext(contextID, parentID); this.addContext(context); - if (DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, context.getEvaluatorId())) { + if (driverRestartManager.getEvaluatorRestartState(context.getEvaluatorId()) == EvaluatorRestartState.REREGISTERED) { // when we get a recovered active context, always notify application this.messageDispatcher.onDriverRestartContextActive(context); } else { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java index 8225a0a..3d02ead 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java @@ -20,6 +20,8 @@ package org.apache.reef.runtime.common.driver.evaluator; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.restart.DriverRestartManager; +import org.apache.reef.driver.restart.EvaluatorRestartState; import org.apache.reef.proto.EvaluatorRuntimeProtocol; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.util.Optional; @@ -40,11 +42,15 @@ public final class EvaluatorHeartbeatHandler private static final Logger LOG = Logger.getLogger(EvaluatorHeartbeatHandler.class.getName()); private final Evaluators evaluators; private final EvaluatorManagerFactory evaluatorManagerFactory; + private final DriverRestartManager driverRestartManager; @Inject - EvaluatorHeartbeatHandler(final Evaluators evaluators, final EvaluatorManagerFactory evaluatorManagerFactory) { + EvaluatorHeartbeatHandler(final Evaluators evaluators, + final EvaluatorManagerFactory evaluatorManagerFactory, + final DriverRestartManager driverRestartManager) { this.evaluators = evaluators; this.evaluatorManagerFactory = evaluatorManagerFactory; + this.driverRestartManager = driverRestartManager; } @Override @@ -58,10 +64,25 @@ public final class EvaluatorHeartbeatHandler new Object[]{evaluatorId, status.getState(), heartbeat.getTimestamp(), evaluatorHeartbeatMessage.getIdentifier()}); - final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(evaluatorId); - if (evaluatorManager.isPresent()) { - evaluatorManager.get().onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage); - } else { + try { + final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(evaluatorId); + if (evaluatorManager.isPresent()) { + evaluatorManager.get().onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage); + return; + } + + if (driverRestartManager.isRestarting() && + driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPECTED) { + // TODO[REEF-617]: Create EvaluatorManager for recovered evaluator and call onEvaluatorHeartbeatMessage(). + return; + } + + if (driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.EXPIRED) { + LOG.log(Level.FINE, "Expired evaluator " + evaluatorId + " has reported back to the driver after restart."); + // TODO[REEF-617]: Create EvaluatorManager for expired evaluator and close it. + return; + } + final StringBuilder message = new StringBuilder("Contact from unknown Evaluator with identifier '"); message.append(evaluatorId); if (heartbeat.hasEvaluatorStatus()) { @@ -70,7 +91,8 @@ public final class EvaluatorHeartbeatHandler } message.append('\''); throw new RuntimeException(message.toString()); + } finally { + LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId); } - LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java index 35dcd35..f4309c8 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java @@ -23,7 +23,7 @@ import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.evaluator.CLRProcessFactory; import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders; import org.apache.reef.driver.restart.DriverRestartManager; -import org.apache.reef.driver.restart.DriverRestartUtilities; +import org.apache.reef.driver.restart.EvaluatorRestartState; import org.apache.reef.tang.ConfigurationProvider; import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.driver.context.FailedContext; @@ -323,6 +323,9 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { } } + /** + * Process an evaluator heartbeat message. + */ public void onEvaluatorHeartbeatMessage( final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) { @@ -340,31 +343,42 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { this.sanityChecker.check(evaluatorId, evaluatorHeartbeatProto.getTimestamp()); final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString(); - // first message from a running evaluator trying to re-establish communications - if (DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, evaluatorId)) { - this.evaluatorControlHandler.setRemoteID(evaluatorRID); - this.stateManager.setRunning(); + final EvaluatorRestartState evaluatorRestartState = driverRestartManager.getEvaluatorRestartState(evaluatorId); - final boolean restartCompleted = - this.driverRestartManager.onRecoverEvaluatorIsRestartComplete(this.evaluatorId); + /* + * First message from a running evaluator. The evaluator can be a new evaluator or be a previous evaluator + * from a separate application attempt. In the case of a previous evaluator, if the restart period has not + * yet expired, we should register it and trigger context active and task events. If the restart period has + * expired, we should return immediately after setting its remote ID in order to close it. + */ + if (this.stateManager.isSubmitted() || + evaluatorRestartState == EvaluatorRestartState.REPORTED || + evaluatorRestartState == EvaluatorRestartState.EXPIRED) { - LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", this.evaluatorId); + this.evaluatorControlHandler.setRemoteID(evaluatorRID); - if (restartCompleted) { - this.messageDispatcher.onDriverRestartCompleted(new DriverRestartCompleted(System.currentTimeMillis())); - LOG.log(Level.INFO, "All expected evaluators checked in."); - } else { - LOG.log(Level.INFO, "Expecting [{0}], [{1}] have checked in.", - new Object[]{this.driverRestartManager.getPreviousEvaluatorIds(), - this.driverRestartManager.getRecoveredEvaluatorIds()}); + if (evaluatorRestartState == EvaluatorRestartState.EXPIRED) { + // Don't do anything if evaluator has expired. Close it immediately upon exit of this method. + return; } - } - // If this is the first message from this Evaluator, register it. - if (this.stateManager.isSubmitted()) { - this.evaluatorControlHandler.setRemoteID(evaluatorRID); this.stateManager.setRunning(); LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId); + + if (evaluatorRestartState == EvaluatorRestartState.REPORTED) { + + // TODO[REEF-617]: Move evaluator recovery to EvaluatorHeartbeatHandler and reregister evaluator here. + final boolean restartCompleted = + this.driverRestartManager.onRecoverEvaluatorIsRestartComplete(this.evaluatorId); + + LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", this.evaluatorId); + + // TODO[REEF-617]: Move restart completion logic to DriverRestartManager. + if (restartCompleted) { + this.messageDispatcher.onDriverRestartCompleted(new DriverRestartCompleted(System.currentTimeMillis())); + LOG.log(Level.INFO, "All expected evaluators checked in."); + } + } } // Process the Evaluator status message @@ -491,8 +505,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { if (taskStatusProto.getState() == ReefServiceProtos.State.INIT || taskStatusProto.getState() == ReefServiceProtos.State.FAILED || taskStatusProto.getState() == ReefServiceProtos.State.RUNNING || - // for task from recovered evaluators - DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, evaluatorId)) { + driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.REREGISTERED) { // [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order // [REEF-289] is a related item which may fix the issue http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d67da78/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java index 5436494..a09532b 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java @@ -22,7 +22,7 @@ import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.driver.restart.DriverRestartManager; -import org.apache.reef.driver.restart.DriverRestartUtilities; +import org.apache.reef.driver.restart.EvaluatorRestartState; import org.apache.reef.driver.task.FailedTask; import org.apache.reef.driver.task.RunningTask; import org.apache.reef.proto.ReefServiceProtos; @@ -88,7 +88,7 @@ public final class TaskRepresenter { throw new RuntimeException("Received a message for task " + taskStatusProto.getTaskId() + " in the TaskRepresenter for Task " + this.taskId); } - if (taskStatusProto.getRecovery()) { + if (driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == EvaluatorRestartState.REREGISTERED) { // when a recovered heartbeat is received, we will take its word for it LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.", new Object[]{taskStatusProto.getState(), this.taskId}); @@ -139,9 +139,10 @@ public final class TaskRepresenter { } // fire driver restart task running handler if this is a recovery heartbeat - if (DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, evaluatorManager.getId())) { + if (driverRestartManager.getEvaluatorRestartState(evaluatorManager.getId()) == EvaluatorRestartState.REREGISTERED) { final RunningTask runningTask = new RunningTaskImpl( this.evaluatorManager, this.taskId, this.context, this); + this.driverRestartManager.setEvaluatorRunningTask(evaluatorManager.getId()); this.messageDispatcher.onDriverRestartTaskRunning(runningTask); }
