Repository: incubator-reef Updated Branches: refs/heads/master fd18d2f40 -> 46de9e70a
[REEF-506] Move restart determination from Evaluator to Driver This change makes sure that the Driver calls restart handlers by checking the restart status based on DriverRestartManager instead of using the protobuf message from the recovered evaluator. JIRA: [REEF-506](https://issues.apache.org/jira/browse/REEF-506) Pull Request: This closes #364 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/46de9e70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/46de9e70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/46de9e70 Branch: refs/heads/master Commit: 46de9e70a09d0dd54340ba4d5f41718959564525 Parents: fd18d2f Author: Andrew Chung <[email protected]> Authored: Tue Aug 11 13:54:36 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed Aug 12 13:59:26 2015 -0700 ---------------------------------------------------------------------- .../restart/DriverRestartManagerImpl.java | 33 +++++---- .../reef/driver/restart/DriverRestartState.java | 71 ++++++++++++++++++++ .../driver/restart/DriverRestartUtilities.java | 58 ++++++++++++++++ .../driver/context/ContextRepresenters.java | 21 +++++- .../driver/evaluator/EvaluatorManager.java | 43 ++++++------ .../common/driver/task/TaskRepresenter.java | 9 ++- 6 files changed, 194 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/46de9e70/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java index 2b07027..4ea3b56 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java @@ -42,34 +42,40 @@ public final class DriverRestartManagerImpl implements DriverRestartManager { private final DriverRuntimeRestartManager driverRuntimeRestartManager; private final Set<String> previousEvaluators; private final Set<String> recoveredEvaluators; - - private boolean restartBegan; - private boolean restartCompleted; + private DriverRestartState state; @Inject private DriverRestartManagerImpl(final DriverRuntimeRestartManager driverRuntimeRestartManager) { this.driverRuntimeRestartManager = driverRuntimeRestartManager; - this.restartCompleted = false; - this.restartBegan = false; + this.state = DriverRestartState.NotRestarted; this.previousEvaluators = new HashSet<>(); this.recoveredEvaluators = new HashSet<>(); } @Override - public boolean isRestart() { - return driverRuntimeRestartManager.isRestart(); + public synchronized boolean isRestart() { + if (this.state.isRestart()) { + return true; + } + + if (driverRuntimeRestartManager.isRestart()) { + this.state = DriverRestartState.RestartBegan; + return true; + } + + return false; } @Override - public void onRestart() { + public synchronized void onRestart() { final EvaluatorRestartInfo evaluatorRestartInfo = driverRuntimeRestartManager.getAliveAndFailedEvaluators(); setPreviousEvaluatorIds(evaluatorRestartInfo.getAliveEvaluators()); driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators()); } @Override - public boolean isRestartCompleted() { - return this.restartCompleted; + public synchronized boolean isRestartCompleted() { + return this.state == DriverRestartState.RestartCompleted; } @Override @@ -79,8 +85,9 @@ public final class DriverRestartManagerImpl implements DriverRestartManager { @Override public synchronized void setPreviousEvaluatorIds(final Set<String> ids) { - if (!this.restartBegan) { + if (this.state != DriverRestartState.RestartInProgress) { previousEvaluators.addAll(ids); + this.state = DriverRestartState.RestartInProgress; } else { final String errMsg = "Should not be setting the set of expected alive evaluators more than once."; LOG.log(Level.SEVERE, errMsg); @@ -107,10 +114,10 @@ public final class DriverRestartManagerImpl implements DriverRestartManager { } if (this.recoveredEvaluators.containsAll(this.previousEvaluators)) { - this.restartCompleted = true; + this.state = DriverRestartState.RestartCompleted; } - return this.restartCompleted; + return this.state == DriverRestartState.RestartCompleted; } @Override http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/46de9e70/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java new file mode 100644 index 0000000..04c711f --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartState.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.driver.restart; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +/** + * Represents the current driver restart progress. + */ +@Private +@DriverSide +@Unstable +public enum DriverRestartState { + /** + * Driver restart is not implemented. + */ + NotImplemented, + + /** + * Driver has not begun the restart progress yet. + */ + NotRestarted, + + /** + * Driver has been notified of the restart by the runtime, but has not yet + * received its set of evaluator IDs to recover yet. + */ + RestartBegan, + + /** + * Driver has received its set of evaluator IDs to recover. + */ + RestartInProgress, + + /** + * Driver has recovered all the evaluator IDs that it can, and the restart process is completed. + */ + RestartCompleted; + + /** + * Returns true if the restart process has began. + */ + public boolean isRestart() { + switch (this) { + case RestartBegan: + case RestartInProgress: + case RestartCompleted: + return true; + default: + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/46de9e70/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java new file mode 100644 index 0000000..dcf753b --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartUtilities.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.driver.restart; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.util.Optional; + +/** + * A static utilities class for simplifying calls to driver restart manager. + */ +@Private +@DriverSide +@Unstable +public final class DriverRestartUtilities { + + /** + * Helper function for driver restart to determine whether an evaluator ID is from an evaluator from the + * previous application attempt. DriverRestartManager is optional here. + */ + public static boolean isRestartAndIsPreviousEvaluator(final Optional<DriverRestartManager> driverRestartManager, + final String evaluatorId) { + if (!driverRestartManager.isPresent()) { + return false; + } + + return isRestartAndIsPreviousEvaluator(driverRestartManager.get(), evaluatorId); + } + + /** + * Helper function for driver restart to determine whether an evaluator ID is from an evaluator from the + * previous application attempt. + */ + public static boolean isRestartAndIsPreviousEvaluator(final DriverRestartManager driverRestartManager, + final String evaluatorId) { + return driverRestartManager.isRestart() && driverRestartManager.getPreviousEvaluatorIds().contains(evaluatorId); + } + + private DriverRestartUtilities() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/46de9e70/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java index ba72ad9..a8d538e 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java @@ -23,6 +23,8 @@ import net.jcip.annotations.ThreadSafe; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.restart.DriverRestartManager; +import org.apache.reef.driver.restart.DriverRestartUtilities; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher; import org.apache.reef.util.Optional; @@ -43,6 +45,7 @@ public final class ContextRepresenters { private final EvaluatorMessageDispatcher messageDispatcher; private final ContextFactory contextFactory; + private final Optional<DriverRestartManager> driverRestartManager; // Mutable fields @GuardedBy("this") @@ -53,8 +56,22 @@ public final class ContextRepresenters { @Inject private ContextRepresenters(final EvaluatorMessageDispatcher messageDispatcher, final ContextFactory contextFactory) { + this(messageDispatcher, contextFactory, Optional.<DriverRestartManager>empty()); + } + + @Inject + private ContextRepresenters(final EvaluatorMessageDispatcher messageDispatcher, + final ContextFactory contextFactory, + final DriverRestartManager driverRestartManager) { + this(messageDispatcher, contextFactory, Optional.of(driverRestartManager)); + } + + private ContextRepresenters(final EvaluatorMessageDispatcher messageDispatcher, + final ContextFactory contextFactory, + final Optional<DriverRestartManager> driverRestartManager) { this.messageDispatcher = messageDispatcher; this.contextFactory = contextFactory; + this.driverRestartManager = driverRestartManager; } /** @@ -91,7 +108,7 @@ public final class ContextRepresenters { /** * Process heartbeats from the contexts on an Evaluator. * - * @param contextStatusProto + * @param contextStatusProtos * @param notifyClientOnNewActiveContext */ public synchronized void onContextStatusMessages(final Iterable<ReefServiceProtos.ContextStatusProto> @@ -210,7 +227,7 @@ public final class ContextRepresenters { Optional.of(contextStatusProto.getParentId()) : Optional.<String>empty(); final EvaluatorContext context = contextFactory.newContext(contextID, parentID); this.addContext(context); - if (contextStatusProto.getRecovery()) { + if (DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, context.getEvaluatorId())) { // when we get a recovered active context, always notify application this.messageDispatcher.onDriverRestartContextActive(context); } else { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/46de9e70/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java index d546e59..032c0cb 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java @@ -22,7 +22,7 @@ import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.evaluator.CLRProcessFactory; import org.apache.reef.driver.restart.DriverRestartManager; -import org.apache.reef.exception.DriverFatalRuntimeException; +import org.apache.reef.driver.restart.DriverRestartUtilities; import org.apache.reef.tang.ConfigurationProvider; import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.driver.context.FailedContext; @@ -392,27 +392,21 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString(); // first message from a running evaluator trying to re-establish communications - if (evaluatorHeartbeatProto.getRecovery()) { - if(this.driverRestartManager.isPresent()) { - this.evaluatorControlHandler.setRemoteID(evaluatorRID); - this.stateManager.setRunning(); - - boolean restartCompleted = this.driverRestartManager.get().evaluatorRecovered(this.evaluatorId); - - LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", this.evaluatorId); - - if (restartCompleted) { - this.messageDispatcher.onDriverRestartCompleted(new DriverRestartCompleted(System.currentTimeMillis())); - LOG.log(Level.INFO, "All expected evaluators checked in."); - } else { - LOG.log(Level.INFO, "Expecting [{0}], [{1}] have checked in.", - new Object[]{this.driverRestartManager.get().getPreviousEvaluatorIds(), - this.driverRestartManager.get().getRecoveredEvaluatorIds()}); - } + if (DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, evaluatorId)) { + this.evaluatorControlHandler.setRemoteID(evaluatorRID); + this.stateManager.setRunning(); + + boolean restartCompleted = this.driverRestartManager.get().evaluatorRecovered(this.evaluatorId); + + LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", this.evaluatorId); + + if (restartCompleted) { + this.messageDispatcher.onDriverRestartCompleted(new DriverRestartCompleted(System.currentTimeMillis())); + LOG.log(Level.INFO, "All expected evaluators checked in."); } else { - final String errorMsg = "Restart configurations are not set properly. The DriverRestartManager is missing."; - LOG.log(Level.SEVERE, errorMsg); - throw new DriverFatalRuntimeException(errorMsg); + LOG.log(Level.INFO, "Expecting [{0}], [{1}] have checked in.", + new Object[]{this.driverRestartManager.get().getPreviousEvaluatorIds(), + this.driverRestartManager.get().getRecoveredEvaluatorIds()}); } } @@ -547,8 +541,8 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { if (taskStatusProto.getState() == ReefServiceProtos.State.INIT || taskStatusProto.getState() == ReefServiceProtos.State.FAILED || taskStatusProto.getState() == ReefServiceProtos.State.RUNNING || - taskStatusProto.getRecovery() // for task from recovered evaluators - ) { + // for task from recovered evaluators + DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, evaluatorId)) { // [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order // [REEF-289] is a related item which may fix the issue @@ -565,7 +559,8 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { this.contextRepresenters.getContext(taskStatusProto.getContextId()), this.messageDispatcher, this, - this.exceptionCodec)); + this.exceptionCodec, + this.driverRestartManager)); } else { throw new RuntimeException("Received a message of state " + taskStatusProto.getState() + ", not INIT, RUNNING, or FAILED for Task " + taskStatusProto.getTaskId() + http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/46de9e70/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java index 6bfee45..75dd24e 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java @@ -21,6 +21,8 @@ package org.apache.reef.runtime.common.driver.task; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.restart.DriverRestartManager; +import org.apache.reef.driver.restart.DriverRestartUtilities; import org.apache.reef.driver.task.FailedTask; import org.apache.reef.driver.task.RunningTask; import org.apache.reef.proto.ReefServiceProtos; @@ -47,6 +49,7 @@ public final class TaskRepresenter { private final EvaluatorManager evaluatorManager; private final ExceptionCodec exceptionCodec; private final String taskId; + private final Optional<DriverRestartManager> driverRestartManager; // Mutable state private ReefServiceProtos.State state = ReefServiceProtos.State.INIT; @@ -55,12 +58,14 @@ public final class TaskRepresenter { final EvaluatorContext context, final EvaluatorMessageDispatcher messageDispatcher, final EvaluatorManager evaluatorManager, - final ExceptionCodec exceptionCodec) { + final ExceptionCodec exceptionCodec, + final Optional<DriverRestartManager> driverRestartManager) { this.taskId = taskId; this.context = context; this.messageDispatcher = messageDispatcher; this.evaluatorManager = evaluatorManager; this.exceptionCodec = exceptionCodec; + this.driverRestartManager = driverRestartManager; } private static byte[] getResult(final ReefServiceProtos.TaskStatusProto taskStatusProto) { @@ -134,7 +139,7 @@ public final class TaskRepresenter { } // fire driver restart task running handler if this is a recovery heartbeat - if (taskStatusProto.getRecovery()) { + if (DriverRestartUtilities.isRestartAndIsPreviousEvaluator(driverRestartManager, evaluatorManager.getId())) { final RunningTask runningTask = new RunningTaskImpl( this.evaluatorManager, this.taskId, this.context, this); this.messageDispatcher.onDriverRestartTaskRunning(runningTask);
