Repository: incubator-reef
Updated Branches:
refs/heads/master a7b655790 -> 2a09c826a
[REEF-561] Move restart functions from DriverStatusManager to
DriverRestartManager
This addressed the issue by
* Splitting the original `DriverRestartManager` into
`DriverRestartManager` and `DriverRuntimeRestartManager`.
* The implementation `DriverRestartManagerImpl` is responsible for
non-runtime dependent code, while the interface
`DriverRuntimeRestartManager` is responsible for runtime-dependent
restart methods.
* Moving restart related functionality from the `DriverStatusManager`
to D`riverRestartManager` and `DriverRuntimeRestartManager`.
* Additionally corrected the behavior to `RuntimeClock` on failure to
propagate the exception.
JIRA:
[REEF-561](https://issues.apache.org/jira/browse/REEF-561)
Pull Request:
This closes #340
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2a09c826
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2a09c826
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2a09c826
Branch: refs/heads/master
Commit: 2a09c826a06ac24ee85b99f3267150e543e65b46
Parents: a7b6557
Author: Andrew Chung <[email protected]>
Authored: Wed Aug 5 14:49:25 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Aug 6 15:20:37 2015 -0700
----------------------------------------------------------------------
.../driver/restart/DriverRestartManager.java | 54 +++--
.../restart/DriverRestartManagerImpl.java | 111 +++++++++
.../restart/DriverRuntimeRestartManager.java | 66 +++++
...atorPreservingEvaluatorAllocatedHandler.java | 4 +-
.../driver/restart/EvaluatorRestartInfo.java | 56 +++++
.../DriverRuntimeRestartConfiguration.java | 5 +-
.../common/driver/DriverStartHandler.java | 13 +-
.../common/driver/DriverStatusManager.java | 67 ------
.../driver/evaluator/EvaluatorManager.java | 112 +++++++--
.../driver/YarnDriverRestartConfiguration.java | 4 +-
.../yarn/driver/YarnDriverRestartManager.java | 238 -------------------
.../driver/YarnDriverRuntimeRestartManager.java | 231 ++++++++++++++++++
.../reef/wake/time/runtime/RuntimeClock.java | 6 +-
13 files changed, 605 insertions(+), 362 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index c60740f..99bd7b0 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -21,43 +21,67 @@ package org.apache.reef.driver.restart;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.annotations.audience.RuntimeAuthor;
/**
- * Classes implementing this interface are in charge of recording evaluator
- * changes as they are allocated as well as recovering Evaluators and
- * discovering which evaluators are lost on the event of a driver restart.
+ * The manager that handles aspects of driver restart such as determining
whether the driver is in
+ * restart mode, what to do on restart, whether restart is completed, and
others.
*/
@DriverSide
@Private
-@RuntimeAuthor
@Unstable
public interface DriverRestartManager {
/**
- * Determines whether or not the driver has been restarted.
+ * @return Whether or not the driver instance is a restarted instance.
*/
boolean isRestart();
/**
- * This function has a few jobs crucial jobs to enable restart:
- * 1. Recover the list of evaluators that are reported to be alive by the
Resource Manager.
- * 2. Make necessary operations to inform relevant runtime components about
evaluators that are alive
- * with the set of evaluator IDs recovered in step 1.
- * 3. Make necessary operations to inform relevant runtime components about
evaluators that have failed
- * during the driver restart period.
+ * Recovers the list of alive and failed evaluators and inform about
evaluator failures
+ * based on the specific runtime. Also sets the expected amount of
evaluators to report back
+ * as alive to the job driver.
*/
void onRestart();
/**
- * Records the evaluators when it is allocated.
+ * Indicate that the Driver restart is complete. It is meant to be called
exactly once during a restart and never
+ * during the ininital launch of a Driver.
+ */
+ void setRestartCompleted();
+
+ /**
+ * @return the number of Evaluators expected to check in from a previous run.
+ */
+ int getNumPreviousContainers();
+
+
+ /**
+ * Set the number of containers to expect still active from a previous
execution of the Driver in a restart situation.
+ * To be called exactly once during a driver restart.
+ *
+ * @param num
+ */
+ void setNumPreviousContainers(final int num);
+
+ /**
+ * @return the number of Evaluators from a previous Driver that have checked
in with the Driver
+ * in a restart situation.
+ */
+ int getNumRecoveredContainers();
+
+ /**
+ * Indicate that this Driver has re-established the connection with one more
Evaluator of a previous run.
+ */
+ void oneContainerRecovered();
+
+ /**
+ * Records the evaluators when it is allocated. The implementation depends
on the runtime.
* @param id The evaluator ID of the allocated evaluator.
*/
void recordAllocatedEvaluator(final String id);
-
/**
- * Records a removed evaluator into the evaluator log.
+ * Records a removed evaluator into the evaluator log. The implementation
depends on the runtime.
* @param id The evaluator ID of the removed evaluator.
*/
void recordRemovedEvaluator(final String id);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
new file mode 100644
index 0000000..0d7a75a
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManagerImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The implementation of DriverRestartManager. A few methods here are proxy
methods for
+ * the DriverRuntimeRestartManager that depends on the runtime implementation.
+ */
+@DriverSide
+@Private
+@Unstable
+public final class DriverRestartManagerImpl implements DriverRestartManager {
+ private static final Logger LOG =
Logger.getLogger(DriverRestartManagerImpl.class.getName());
+ private final DriverRuntimeRestartManager driverRuntimeRestartManager;
+
+ private boolean restartCompleted;
+ private int numPreviousContainers;
+ private int numRecoveredContainers;
+
+ @Inject
+ private DriverRestartManagerImpl(final DriverRuntimeRestartManager
driverRuntimeRestartManager) {
+ this.driverRuntimeRestartManager = driverRuntimeRestartManager;
+ this.restartCompleted = false;
+ this.numPreviousContainers = -1;
+ this.numRecoveredContainers = 0;
+ }
+
+ @Override
+ public boolean isRestart() {
+ return driverRuntimeRestartManager.isRestart();
+ }
+
+ @Override
+ public void onRestart() {
+ final EvaluatorRestartInfo evaluatorRestartInfo =
driverRuntimeRestartManager.getAliveAndFailedEvaluators();
+ setNumPreviousContainers(evaluatorRestartInfo.getAliveEvaluators().size());
+
driverRuntimeRestartManager.informAboutEvaluatorFailures(evaluatorRestartInfo.getFailedEvaluators());
+ }
+
+ @Override
+ public synchronized void setRestartCompleted() {
+ if (this.restartCompleted) {
+ LOG.log(Level.WARNING, "Calling setRestartCompleted more than once.");
+ } else {
+ this.restartCompleted = true;
+ }
+ }
+
+ @Override
+ public synchronized int getNumPreviousContainers() {
+ return this.numPreviousContainers;
+ }
+
+ @Override
+ public synchronized void setNumPreviousContainers(final int num) {
+ if (this.numPreviousContainers >= 0) {
+ throw new IllegalStateException("Attempting to set the number of
expected containers left " +
+ "from a previous container more than once.");
+ } else {
+ this.numPreviousContainers = num;
+ }
+ }
+
+ @Override
+ public synchronized int getNumRecoveredContainers() {
+ return this.numRecoveredContainers;
+ }
+
+ @Override
+ public synchronized void oneContainerRecovered() {
+ this.numRecoveredContainers += 1;
+ if (this.numRecoveredContainers > this.numPreviousContainers) {
+ throw new IllegalStateException("Reconnected to" +
+ this.numRecoveredContainers + "Evaluators while only expecting " +
this.numPreviousContainers);
+ }
+ }
+
+ @Override
+ public void recordAllocatedEvaluator(final String id) {
+ driverRuntimeRestartManager.recordAllocatedEvaluator(id);
+ }
+
+ @Override
+ public void recordRemovedEvaluator(final String id) {
+ driverRuntimeRestartManager.recordRemovedEvaluator(id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
new file mode 100644
index 0000000..c581e93
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+
+import java.util.Set;
+
+/**
+ * Classes implementing this interface are in charge of recording evaluator
+ * changes as they are allocated as well as recovering Evaluators and
+ * discovering which evaluators are lost on the event of a driver restart.
+ */
+@DriverSide
+@Private
+@RuntimeAuthor
+@Unstable
+public interface DriverRuntimeRestartManager {
+ /**
+ * Determines whether or not the driver has been restarted.
+ */
+ boolean isRestart();
+
+ /**
+ * Records the evaluators when it is allocated.
+ * @param id The evaluator ID of the allocated evaluator.
+ */
+ void recordAllocatedEvaluator(final String id);
+
+ /**
+ * Records a removed evaluator into the evaluator log.
+ * @param id The evaluator ID of the removed evaluator.
+ */
+ void recordRemovedEvaluator(final String id);
+
+ /**
+ * Gets the sets of alive and failed evaluators based on the runtime
implementation.
+ * @return EvaluatorRestartInfo, which encapsulates the alive and failed set
of evaluator IDs.
+ */
+ EvaluatorRestartInfo getAliveAndFailedEvaluators();
+
+ /**
+ * Informs the necessary components about failed evaluators. The
implementation is runtime dependent.
+ * @param failedEvaluatorIds The set of evaluator IDs of evaluators that
failed during restart.
+ */
+ void informAboutEvaluatorFailures(final Set<String> failedEvaluatorIds);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
index ca9321f..e30d695 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
@@ -24,7 +24,7 @@ import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
/**
- * Records allocated evaluators for recovery on driver restart by using a
DriverRestartManager.
+ * Records allocated evaluators for recovery on driver restart by using a
DriverRuntimeRestartManager.
*/
public final class EvaluatorPreservingEvaluatorAllocatedHandler implements
EventHandler<AllocatedEvaluator> {
private final DriverRestartManager driverRestartManager;
@@ -35,7 +35,7 @@ public final class
EvaluatorPreservingEvaluatorAllocatedHandler implements Event
}
/**
- * Records the allocatedEvaluator ID with the DriverRestartManager.
+ * Records the allocatedEvaluator ID with the DriverRuntimeRestartManager.
* @param value the allocated evaluator event.
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
new file mode 100644
index 0000000..10beb78
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * The encapsulating class for alive and failed evaluators on driver restart.
+ */
+@Private
+@DriverSide
+@Unstable
+public final class EvaluatorRestartInfo {
+ private final Set<String> aliveEvaluators;
+ private final Set<String> failedEvaluators;
+
+ public EvaluatorRestartInfo(final Set<String> aliveEvaluators, final
Set<String> failedEvaluators) {
+ this.aliveEvaluators = Collections.unmodifiableSet(aliveEvaluators);
+ this.failedEvaluators = Collections.unmodifiableSet(failedEvaluators);
+ }
+
+ /**
+ * @return the set of evaluator IDs for alive evaluators on driver restart.
The returned set is unmodifiable.
+ */
+ public Set<String> getAliveEvaluators() {
+ return this.aliveEvaluators;
+ }
+
+ /**
+ * @return the set of evaluator IDs for faiuled evaluators on driver
restart. The returned set is unmodifiable.
+ */
+ public Set<String> getFailedEvaluators() {
+ return this.failedEvaluators;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
index 70d0c9b..695ac8a 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
@@ -23,9 +23,7 @@ import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.parameters.ServiceEvaluatorAllocatedHandlers;
import org.apache.reef.driver.parameters.ServiceEvaluatorCompletedHandlers;
import org.apache.reef.driver.parameters.ServiceEvaluatorFailedHandlers;
-import
org.apache.reef.driver.restart.EvaluatorPreservingEvaluatorAllocatedHandler;
-import
org.apache.reef.driver.restart.EvaluatorPreservingEvaluatorCompletedHandler;
-import
org.apache.reef.driver.restart.EvaluatorPreservingEvaluatorFailedHandler;
+import org.apache.reef.driver.restart.*;
import org.apache.reef.tang.formats.*;
/**
@@ -40,6 +38,7 @@ public final class DriverRuntimeRestartConfiguration extends
ConfigurationModule
}
public static final ConfigurationModule CONF = new
DriverRuntimeRestartConfiguration()
+ .bindImplementation(DriverRestartManager.class,
DriverRestartManagerImpl.class)
.bindSetEntry(ServiceEvaluatorAllocatedHandlers.class,
EvaluatorPreservingEvaluatorAllocatedHandler.class)
.bindSetEntry(ServiceEvaluatorFailedHandlers.class,
EvaluatorPreservingEvaluatorFailedHandler.class)
.bindSetEntry(ServiceEvaluatorCompletedHandlers.class,
EvaluatorPreservingEvaluatorCompletedHandler.class)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index bfd52a3..2811497 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -50,10 +50,9 @@ public final class DriverStartHandler implements
EventHandler<StartTime> {
final Set<EventHandler<StartTime>> restartHandlers,
@Parameter(ServiceDriverRestartedHandlers.class)
final Set<EventHandler<StartTime>> serviceRestartHandlers,
- final DriverRestartManager driverRestartManager,
- final DriverStatusManager driverStatusManager) {
+ final DriverRestartManager driverRestartManager) {
this(startHandler, Optional.of(restartHandlers),
Optional.of(serviceRestartHandlers),
- Optional.of(driverRestartManager), driverStatusManager);
+ Optional.of(driverRestartManager));
LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers
[{0}], RestartHandlers [{1}]," +
"and ServiceRestartHandlers [{2}], with a restart manager.",
new String[] {this.startHandlers.toString(),
this.restartHandlers.toString(),
@@ -62,10 +61,9 @@ public final class DriverStartHandler implements
EventHandler<StartTime> {
@Inject
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
- final Set<EventHandler<StartTime>> startHandlers,
- final DriverStatusManager driverStatusManager) {
+ final Set<EventHandler<StartTime>> startHandlers) {
this(startHandlers, Optional.<Set<EventHandler<StartTime>>>empty(),
- Optional.<Set<EventHandler<StartTime>>>empty(),
Optional.<DriverRestartManager>empty(), driverStatusManager);
+ Optional.<Set<EventHandler<StartTime>>>empty(),
Optional.<DriverRestartManager>empty());
LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers
[{0}] and no restart.",
this.startHandlers.toString());
}
@@ -73,8 +71,7 @@ public final class DriverStartHandler implements
EventHandler<StartTime> {
private DriverStartHandler(final Set<EventHandler<StartTime>> startHandler,
final Optional<Set<EventHandler<StartTime>>>
restartHandlers,
final Optional<Set<EventHandler<StartTime>>>
serviceRestartHandlers,
- final Optional<DriverRestartManager>
driverRestartManager,
- final DriverStatusManager driverStatusManager) {
+ final Optional<DriverRestartManager>
driverRestartManager) {
this.startHandlers = startHandler;
this.restartHandlers = restartHandlers;
this.serviceRestartHandlers = serviceRestartHandlers;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
index 0c913de..4fbf618 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
@@ -43,9 +43,6 @@ public final class DriverStatusManager {
private DriverStatus driverStatus = DriverStatus.PRE_INIT;
private Optional<Throwable> shutdownCause = Optional.empty();
private boolean driverTerminationHasBeenCommunicatedToClient = false;
- private boolean restartCompleted = false;
- private int numPreviousContainers = -1;
- private int numRecoveredContainers = 0;
/**
@@ -204,70 +201,6 @@ public final class DriverStatusManager {
}
}
- /**
- * Indicate that the Driver restart is complete. It is meant to be called
exactly once during a restart and never
- * during the ininital launch of a Driver.
- */
- public synchronized void setRestartCompleted() {
- if (!this.isDriverRestart()) {
- throw new IllegalStateException("setRestartCompleted() called in a
Driver that is not, in fact, restarted.");
- } else if (this.restartCompleted) {
- LOG.log(Level.WARNING, "Calling setRestartCompleted more than once.");
- } else {
- this.restartCompleted = true;
- }
- }
-
- /**
- * @return the number of Evaluators expected to check in from a previous run.
- */
- public synchronized int getNumPreviousContainers() {
- return this.numPreviousContainers;
- }
-
- /**
- * Set the number of containers to expect still active from a previous
execution of the Driver in a restart situation.
- * To be called exactly once during a driver restart.
- *
- * @param num
- */
- public synchronized void setNumPreviousContainers(final int num) {
- if (this.numPreviousContainers >= 0) {
- throw new IllegalStateException("Attempting to set the number of
expected containers left " +
- "from a previous container more than once.");
- } else {
- this.numPreviousContainers = num;
- }
- }
-
- /**
- * @return the number of Evaluators from a previous Driver that have checked
in with the Driver
- * in a restart situation.
- */
- public synchronized int getNumRecoveredContainers() {
- return this.numRecoveredContainers;
- }
-
- /**
- * Indicate that this Driver has re-established the connection with one more
Evaluator of a previous run.
- */
- public synchronized void oneContainerRecovered() {
- this.numRecoveredContainers += 1;
- if (this.numRecoveredContainers > this.numPreviousContainers) {
- throw new IllegalStateException("Reconnected to" +
- this.numRecoveredContainers +
- "Evaluators while only expecting " +
- this.numPreviousContainers);
- }
- }
-
- /**
- * @return true if the Driver is a restarted driver of an earlier attempt.
- */
- private synchronized boolean isDriverRestart() {
- return this.getNumPreviousContainers() > 0;
- }
-
public synchronized boolean isShuttingDownOrFailing() {
return DriverStatus.SHUTTING_DOWN.equals(this.driverStatus)
|| DriverStatus.FAILING.equals(this.driverStatus);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 91e9e1d..e73cfef 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -21,6 +21,8 @@ package org.apache.reef.runtime.common.driver.evaluator;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.evaluator.CLRProcessFactory;
+import org.apache.reef.driver.restart.DriverRestartManager;
+import org.apache.reef.exception.DriverFatalRuntimeException;
import org.apache.reef.tang.ConfigurationProvider;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.FailedContext;
@@ -35,7 +37,6 @@ import org.apache.reef.io.naming.Identifiable;
import org.apache.reef.proto.EvaluatorRuntimeProtocol;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.DriverRestartCompleted;
-import org.apache.reef.runtime.common.driver.DriverStatusManager;
import org.apache.reef.driver.evaluator.EvaluatorProcess;
import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
import org.apache.reef.runtime.common.driver.api.ResourceReleaseEventImpl;
@@ -96,7 +97,6 @@ public final class EvaluatorManager implements Identifiable,
AutoCloseable {
private final ContextControlHandler contextControlHandler;
private final EvaluatorStatusManager stateManager;
private final ExceptionCodec exceptionCodec;
- private final DriverStatusManager driverStatusManager;
private final EventHandlerIdlenessSource idlenessSource;
private final RemoteManager remoteManager;
private final ConfigurationSerializer configurationSerializer;
@@ -104,6 +104,7 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
private final Set<ConfigurationProvider> evaluatorConfigurationProviders;
private final JVMProcessFactory jvmProcessFactory;
private final CLRProcessFactory clrProcessFactory;
+ private final Optional<DriverRestartManager> driverRestartManager;
// Mutable fields
private Optional<TaskRepresenter> task = Optional.empty();
@@ -124,15 +125,68 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
final EvaluatorControlHandler evaluatorControlHandler,
final ContextControlHandler contextControlHandler,
final EvaluatorStatusManager stateManager,
- final DriverStatusManager driverStatusManager,
final ExceptionCodec exceptionCodec,
final EventHandlerIdlenessSource idlenessSource,
final LoggingScopeFactory loggingScopeFactory,
@Parameter(EvaluatorConfigurationProviders.class)
final Set<ConfigurationProvider> evaluatorConfigurationProviders,
- // TODO: Eventually remove the factories when they are removed from
AllocatedEvaluatorImpl
+ final JVMProcessFactory jvmProcessFactory,
+ final CLRProcessFactory clrProcessFactory,
+ final DriverRestartManager driverRestartManager) {
+ this(clock, remoteManager, resourceReleaseHandler, resourceLaunchHandler,
evaluatorId, evaluatorDescriptor,
+ contextRepresenters, configurationSerializer, messageDispatcher,
evaluatorControlHandler,
+ contextControlHandler, stateManager, exceptionCodec, idlenessSource,
loggingScopeFactory,
+ evaluatorConfigurationProviders, jvmProcessFactory, clrProcessFactory,
Optional.of(driverRestartManager));
+ }
+
+ @Inject
+ private EvaluatorManager(
+ final Clock clock,
+ final RemoteManager remoteManager,
+ final ResourceReleaseHandler resourceReleaseHandler,
+ final ResourceLaunchHandler resourceLaunchHandler,
+ @Parameter(EvaluatorIdentifier.class) final String evaluatorId,
+ @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl
evaluatorDescriptor,
+ final ContextRepresenters contextRepresenters,
+ final ConfigurationSerializer configurationSerializer,
+ final EvaluatorMessageDispatcher messageDispatcher,
+ final EvaluatorControlHandler evaluatorControlHandler,
+ final ContextControlHandler contextControlHandler,
+ final EvaluatorStatusManager stateManager,
+ final ExceptionCodec exceptionCodec,
+ final EventHandlerIdlenessSource idlenessSource,
+ final LoggingScopeFactory loggingScopeFactory,
+ @Parameter(EvaluatorConfigurationProviders.class)
+ final Set<ConfigurationProvider> evaluatorConfigurationProviders,
final JVMProcessFactory jvmProcessFactory,
final CLRProcessFactory clrProcessFactory) {
+ this(clock, remoteManager, resourceReleaseHandler, resourceLaunchHandler,
evaluatorId, evaluatorDescriptor,
+ contextRepresenters, configurationSerializer, messageDispatcher,
evaluatorControlHandler,
+ contextControlHandler, stateManager, exceptionCodec, idlenessSource,
loggingScopeFactory,
+ evaluatorConfigurationProviders, jvmProcessFactory, clrProcessFactory,
Optional.<DriverRestartManager>empty());
+ }
+
+ private EvaluatorManager(
+ final Clock clock,
+ final RemoteManager remoteManager,
+ final ResourceReleaseHandler resourceReleaseHandler,
+ final ResourceLaunchHandler resourceLaunchHandler,
+ final String evaluatorId,
+ final EvaluatorDescriptorImpl evaluatorDescriptor,
+ final ContextRepresenters contextRepresenters,
+ final ConfigurationSerializer configurationSerializer,
+ final EvaluatorMessageDispatcher messageDispatcher,
+ final EvaluatorControlHandler evaluatorControlHandler,
+ final ContextControlHandler contextControlHandler,
+ final EvaluatorStatusManager stateManager,
+ final ExceptionCodec exceptionCodec,
+ final EventHandlerIdlenessSource idlenessSource,
+ final LoggingScopeFactory loggingScopeFactory,
+ final Set<ConfigurationProvider> evaluatorConfigurationProviders,
+ // TODO: Eventually remove the factories when they are removed from
AllocatedEvaluatorImpl
+ final JVMProcessFactory jvmProcessFactory,
+ final CLRProcessFactory clrProcessFactory,
+ final Optional<DriverRestartManager> driverRestartManager) {
this.contextRepresenters = contextRepresenters;
this.idlenessSource = idlenessSource;
LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator:
{0}", evaluatorId);
@@ -146,7 +200,6 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
this.evaluatorControlHandler = evaluatorControlHandler;
this.contextControlHandler = contextControlHandler;
this.stateManager = stateManager;
- this.driverStatusManager = driverStatusManager;
this.exceptionCodec = exceptionCodec;
this.remoteManager = remoteManager;
@@ -155,6 +208,7 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
this.jvmProcessFactory = jvmProcessFactory;
this.clrProcessFactory = clrProcessFactory;
+ this.driverRestartManager = driverRestartManager;
LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator:
[{0}]", this.getId());
}
@@ -195,7 +249,7 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
allocationFired = true;
} else {
- LOG.log(Level.WARNING, "Evaluator allocated event fired twice.");
+ LOG.log(Level.WARNING, "Evaluator allocated event fired more than
once.");
}
}
@@ -339,27 +393,33 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
// first message from a running evaluator trying to re-establish
communications
if (evaluatorHeartbeatProto.getRecovery()) {
- this.evaluatorControlHandler.setRemoteID(evaluatorRID);
- this.stateManager.setRunning();
-
- this.driverStatusManager.oneContainerRecovered();
- final int numRecoveredContainers =
this.driverStatusManager.getNumRecoveredContainers();
-
- LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.",
this.evaluatorId);
- final int expectedEvaluatorsNumber =
this.driverStatusManager.getNumPreviousContainers();
-
- if (numRecoveredContainers > expectedEvaluatorsNumber) {
- LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators,
but [{1}] evaluators have checked in.",
- new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
- throw new RuntimeException("More then expected number of evaluators
are checking in during recovery.");
- } else if (numRecoveredContainers == expectedEvaluatorsNumber) {
- LOG.log(Level.INFO, "All [{0}] expected evaluators have checked in.
Recovery completed.",
- expectedEvaluatorsNumber);
- this.driverStatusManager.setRestartCompleted();
- this.messageDispatcher.onDriverRestartCompleted(new
DriverRestartCompleted(System.currentTimeMillis()));
+ if(this.driverRestartManager.isPresent()) {
+ this.evaluatorControlHandler.setRemoteID(evaluatorRID);
+ this.stateManager.setRunning();
+
+ this.driverRestartManager.get().oneContainerRecovered();
+ final int numRecoveredContainers =
this.driverRestartManager.get().getNumRecoveredContainers();
+
+ LOG.log(Level.FINE, "Received recovery heartbeat from evaluator
{0}.", this.evaluatorId);
+ final int expectedEvaluatorsNumber =
this.driverRestartManager.get().getNumPreviousContainers();
+
+ if (numRecoveredContainers > expectedEvaluatorsNumber) {
+ LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators,
but [{1}] evaluators have checked in.",
+ new Object[]{expectedEvaluatorsNumber,
numRecoveredContainers});
+ throw new RuntimeException("More then expected number of
evaluators are checking in during recovery.");
+ } else if (numRecoveredContainers == expectedEvaluatorsNumber) {
+ LOG.log(Level.INFO, "All [{0}] expected evaluators have checked
in. Recovery completed.",
+ expectedEvaluatorsNumber);
+ this.driverRestartManager.get().setRestartCompleted();
+ this.messageDispatcher.onDriverRestartCompleted(new
DriverRestartCompleted(System.currentTimeMillis()));
+ } else {
+ LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}]
evaluators have checked in.",
+ new Object[]{expectedEvaluatorsNumber,
numRecoveredContainers});
+ }
} else {
- LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}]
evaluators have checked in.",
- new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
+ final String errorMsg = "Restart configurations are not set
properly. The DriverRestartManager is missing.";
+ LOG.log(Level.SEVERE, errorMsg);
+ throw new DriverFatalRuntimeException(errorMsg);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
index 5f10bf8..1b563a2 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
@@ -22,7 +22,7 @@ import org.apache.reef.annotations.Provided;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Public;
-import org.apache.reef.driver.restart.DriverRestartManager;
+import org.apache.reef.driver.restart.DriverRuntimeRestartManager;
import org.apache.reef.runtime.common.driver.DriverRuntimeRestartConfiguration;
import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
@@ -49,7 +49,7 @@ public final class YarnDriverRestartConfiguration extends
ConfigurationModuleBui
*/
public static final ConfigurationModule CONF = new
YarnDriverRestartConfiguration()
.bindNamedParameter(YarnEvaluatorPreserver.class, EVALUATOR_PRESERVER)
- .bindImplementation(DriverRestartManager.class,
YarnDriverRestartManager.class)
+ .bindImplementation(DriverRuntimeRestartManager.class,
YarnDriverRuntimeRestartManager.class)
.merge(DriverRuntimeRestartConfiguration.CONF)
.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
deleted file mode 100644
index 43920e2..0000000
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.runtime.yarn.driver;
-
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.annotations.audience.RuntimeAuthor;
-import org.apache.reef.driver.restart.DriverRestartManager;
-import org.apache.reef.proto.ReefServiceProtos;
-import org.apache.reef.runtime.common.driver.DriverStatusManager;
-import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
-import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
-import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
-import org.apache.reef.tang.annotations.Parameter;
-
-import javax.inject.Inject;
-import java.util.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * The implementation of restart manager for YARN. Handles evaluator
preservation as well
- * as evaluator recovery on YARN.
- */
-@DriverSide
-@RuntimeAuthor
-@Private
-@Unstable
-public final class YarnDriverRestartManager implements DriverRestartManager {
-
- private static final Logger LOG =
Logger.getLogger(YarnDriverRestartManager.class.getName());
-
- private final EvaluatorPreserver evaluatorPreserver;
- private final ApplicationMasterRegistration registration;
- private final DriverStatusManager driverStatusManager;
- private final REEFEventHandlers reefEventHandlers;
- private Set<Container> previousContainers;
-
- @Inject
- private YarnDriverRestartManager(@Parameter(YarnEvaluatorPreserver.class)
- final EvaluatorPreserver evaluatorPreserver,
- final REEFEventHandlers reefEventHandlers,
- final ApplicationMasterRegistration registration,
- final DriverStatusManager driverStatusManager){
- this.registration = registration;
- this.evaluatorPreserver = evaluatorPreserver;
- this.driverStatusManager = driverStatusManager;
- this.reefEventHandlers = reefEventHandlers;
- this.previousContainers = null;
- }
-
- /**
- * Determines whether the application master has been restarted based on the
container ID environment
- * variable provided by YARN. If that fails, determine whether the
application master is a restart
- * based on the number of previous containers reported by YARN.
- * @return true if the application master is a restarted instance, false
otherwise.
- */
- @Override
- public boolean isRestart() {
- final String containerIdString = getContainerIdString();
-
- if (containerIdString == null) {
- // container id should always be set in the env by the framework
- LOG.log(Level.WARNING, "Container ID is null, determining restart based
on previous containers.");
- return this.isRestartByPreviousContainers();
- }
-
- final ApplicationAttemptId appAttemptID =
getAppAttemptId(containerIdString);
-
- if (appAttemptID == null) {
- LOG.log(Level.WARNING, "applicationAttempt ID is null, determining
restart based on previous containers.");
- return this.isRestartByPreviousContainers();
- }
-
- LOG.log(Level.FINE, "Application attempt: " + appAttemptID.getAttemptId());
-
- return appAttemptID.getAttemptId() > 1;
- }
-
- private static String getContainerIdString() {
- try {
- return
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.key());
- } catch (Exception e) {
- LOG.log(Level.WARNING, "Unable to get the container ID from the
environment, exception " +
- e + " was thrown.");
- return null;
- }
- }
-
- private static ApplicationAttemptId getAppAttemptId(final String
containerIdString) {
- try {
- final ContainerId containerId =
ConverterUtils.toContainerId(containerIdString);
- return containerId.getApplicationAttemptId();
- } catch (Exception e) {
- LOG.log(Level.WARNING, "Unable to get the applicationAttempt ID from the
environment, exception " +
- e + " was thrown.");
- return null;
- }
- }
-
- /**
- * Initializes the list of previous containers and determine whether or not
this is an instance of restart
- * based on information reported by the RM.
- * @return true if previous containers is not empty.
- */
- private boolean isRestartByPreviousContainers() {
- this.initializeListOfPreviousContainers();
- return !this.previousContainers.isEmpty();
- }
-
- /**
- * Initializes the list of previous containers as reported by YARN.
- */
- private synchronized void initializeListOfPreviousContainers() {
- if (this.previousContainers == null) {
- this.previousContainers = new
HashSet<>(this.registration.getRegistration().getContainersFromPreviousAttempts());
-
- // If it's still null, create an empty list to indicate that it's not a
restart.
- if (this.previousContainers == null) {
- this.previousContainers = new HashSet<>();
- }
- }
- }
-
- @Override
- public void onRestart() {
- final Set<String> recoveredEvaluators = new HashSet<>();
- final Set<String> failedEvaluators = new HashSet<>();
-
- this.initializeListOfPreviousContainers();
-
- if (this.previousContainers != null && !this.previousContainers.isEmpty())
{
- LOG.log(Level.INFO, "Driver restarted, with {0} previous containers",
this.previousContainers.size());
- final Set<String> expectedContainers =
this.evaluatorPreserver.recoverEvaluators();
-
- final int numExpectedContainers = expectedContainers.size();
- final int numPreviousContainers = this.previousContainers.size();
- if (numExpectedContainers > numPreviousContainers) {
- // we expected more containers to be alive, some containers must have
died during driver restart
- LOG.log(Level.WARNING, "Expected {0} containers while only {1} are
still alive",
- new Object[]{numExpectedContainers, numPreviousContainers});
- final Set<String> previousContainersIds = new HashSet<>();
- for (final Container container : this.previousContainers) {
- previousContainersIds.add(container.getId().toString());
- }
- for (final String expectedContainerId : expectedContainers) {
- if (!previousContainersIds.contains(expectedContainerId)) {
-
this.evaluatorPreserver.recordRemovedEvaluator(expectedContainerId);
- LOG.log(Level.WARNING, "Expected container [{0}] not alive, must
have failed during driver restart.",
- expectedContainerId);
- failedEvaluators.add(expectedContainerId);
- }
- }
- }
- if (numExpectedContainers < numPreviousContainers) {
- // somehow we have more alive evaluators, this should not happen
- throw new RuntimeException("Expected only [" + numExpectedContainers +
"] containers " +
- "but resource manager believe that [" + numPreviousContainers + "]
are outstanding for driver.");
- }
-
- // numExpectedContainers == numPreviousContainers
- for (final Container container : this.previousContainers) {
- LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
- if (!expectedContainers.contains(container.getId().toString())) {
- throw new RuntimeException("Not expecting container " +
container.getId().toString());
- }
-
- recoveredEvaluators.add(container.getId().toString());
- }
- }
-
- this.informAboutEvaluatorAlive(recoveredEvaluators);
- this.informAboutEvaluatorFailures(failedEvaluators);
- }
-
- /**
- * Informs the driver status manager about the number of evaluators to wait
for to reinitiate contact
- * with the driver.
- * TODO [REEF-559]: Tighten previous evaluator ID checks by using entire set
of evaluator IDs.
- * @param evaluatorIds The set of evaluator IDs of evaluators expected to be
alive.
- */
- private void informAboutEvaluatorAlive(final Set<String> evaluatorIds) {
- // We will wait for these evaluators to contact us, so we do not need to
record the entire container information.
- this.driverStatusManager.setNumPreviousContainers(evaluatorIds.size());
- }
-
- /**
- * Generate failure events for evaluators that cannot be recovered.
- * @param evaluatorIds The set of evaluator IDs of evaluators that have
failed on restart.
- */
- private void informAboutEvaluatorFailures(final Set<String> evaluatorIds) {
- for (String evaluatorId : evaluatorIds) {
- LOG.log(Level.WARNING, "Container [" + evaluatorId +
- "] has failed during driver restart process, FailedEvaluatorHandler
will be triggered, but " +
- "no additional evaluator can be requested due to YARN-2433.");
- // trigger a failed evaluator event
-
this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
- .setIdentifier(evaluatorId)
- .setState(ReefServiceProtos.State.FAILED)
- .setExitCode(1)
- .setDiagnostics("Container [" + evaluatorId + "] failed during
driver restart process.")
- .setIsFromPreviousDriver(true)
- .build());
- }
- }
-
- @Override
- public void recordAllocatedEvaluator(final String id) {
- this.evaluatorPreserver.recordAllocatedEvaluator(id);
- }
-
- @Override
- public void recordRemovedEvaluator(final String id) {
- this.evaluatorPreserver.recordRemovedEvaluator(id);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
new file mode 100644
index 0000000..466fec2
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.driver.restart.DriverRuntimeRestartManager;
+import org.apache.reef.driver.restart.EvaluatorRestartInfo;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
+import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
+import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The implementation of restart manager for YARN. Handles evaluator
preservation as well
+ * as evaluator recovery on YARN.
+ */
+@DriverSide
+@RuntimeAuthor
+@Private
+@Unstable
+public final class YarnDriverRuntimeRestartManager implements
DriverRuntimeRestartManager {
+
+ private static final Logger LOG =
Logger.getLogger(YarnDriverRuntimeRestartManager.class.getName());
+
+ private final EvaluatorPreserver evaluatorPreserver;
+ private final ApplicationMasterRegistration registration;
+ private final REEFEventHandlers reefEventHandlers;
+ private Set<Container> previousContainers;
+
+ @Inject
+ private
YarnDriverRuntimeRestartManager(@Parameter(YarnEvaluatorPreserver.class)
+ final EvaluatorPreserver
evaluatorPreserver,
+ final REEFEventHandlers
reefEventHandlers,
+ final ApplicationMasterRegistration
registration){
+ this.registration = registration;
+ this.evaluatorPreserver = evaluatorPreserver;
+ this.reefEventHandlers = reefEventHandlers;
+ this.previousContainers = null;
+ }
+
+ /**
+ * Determines whether the application master has been restarted based on the
container ID environment
+ * variable provided by YARN. If that fails, determine whether the
application master is a restart
+ * based on the number of previous containers reported by YARN.
+ * @return true if the application master is a restarted instance, false
otherwise.
+ */
+ @Override
+ public boolean isRestart() {
+ final String containerIdString = getContainerIdString();
+
+ if (containerIdString == null) {
+ // container id should always be set in the env by the framework
+ LOG.log(Level.WARNING, "Container ID is null, determining restart based
on previous containers.");
+ return this.isRestartByPreviousContainers();
+ }
+
+ final ApplicationAttemptId appAttemptID =
getAppAttemptId(containerIdString);
+
+ if (appAttemptID == null) {
+ LOG.log(Level.WARNING, "applicationAttempt ID is null, determining
restart based on previous containers.");
+ return this.isRestartByPreviousContainers();
+ }
+
+ LOG.log(Level.FINE, "Application attempt: " + appAttemptID.getAttemptId());
+
+ return appAttemptID.getAttemptId() > 1;
+ }
+
+ private static String getContainerIdString() {
+ try {
+ return
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.key());
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Unable to get the container ID from the
environment, exception " +
+ e + " was thrown.");
+ return null;
+ }
+ }
+
+ private static ApplicationAttemptId getAppAttemptId(final String
containerIdString) {
+ try {
+ final ContainerId containerId =
ConverterUtils.toContainerId(containerIdString);
+ return containerId.getApplicationAttemptId();
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Unable to get the applicationAttempt ID from the
environment, exception " +
+ e + " was thrown.");
+ return null;
+ }
+ }
+
+ /**
+ * Initializes the list of previous containers and determine whether or not
this is an instance of restart
+ * based on information reported by the RM.
+ * @return true if previous containers is not empty.
+ */
+ private boolean isRestartByPreviousContainers() {
+ this.initializeListOfPreviousContainers();
+ return !this.previousContainers.isEmpty();
+ }
+
+ /**
+ * Initializes the list of previous containers as reported by YARN.
+ */
+ private synchronized void initializeListOfPreviousContainers() {
+ if (this.previousContainers == null) {
+ this.previousContainers = new
HashSet<>(this.registration.getRegistration().getContainersFromPreviousAttempts());
+
+ // If it's still null, create an empty list to indicate that it's not a
restart.
+ if (this.previousContainers == null) {
+ this.previousContainers = new HashSet<>();
+ }
+ }
+ }
+
+ @Override
+ public void recordAllocatedEvaluator(final String id) {
+ this.evaluatorPreserver.recordAllocatedEvaluator(id);
+ }
+
+ @Override
+ public void recordRemovedEvaluator(final String id) {
+ this.evaluatorPreserver.recordRemovedEvaluator(id);
+ }
+
+ /**
+ * Used by tDriverRestartManager. Gets the list of previous containers from
the resource manager,
+ * compares that list to the YarnDriverRuntimeRestartManager's own list
based on the evalutor preserver,
+ * and determine which evaluators are alive and which have failed during
restart.
+ * @return EvaluatorRestartInfo, the object encapsulating alive and failed
evaluator IDs.
+ */
+ @Override
+ public EvaluatorRestartInfo getAliveAndFailedEvaluators() {
+ final Set<String> recoveredEvaluators = new HashSet<>();
+ final Set<String> failedEvaluators = new HashSet<>();
+
+ this.initializeListOfPreviousContainers();
+
+ if (this.previousContainers != null && !this.previousContainers.isEmpty())
{
+ LOG.log(Level.INFO, "Driver restarted, with {0} previous containers",
this.previousContainers.size());
+ final Set<String> expectedContainers =
this.evaluatorPreserver.recoverEvaluators();
+
+ final int numExpectedContainers = expectedContainers.size();
+ final int numPreviousContainers = this.previousContainers.size();
+ if (numExpectedContainers > numPreviousContainers) {
+ // we expected more containers to be alive, some containers must have
died during driver restart
+ LOG.log(Level.WARNING, "Expected {0} containers while only {1} are
still alive",
+ new Object[]{numExpectedContainers, numPreviousContainers});
+ final Set<String> previousContainersIds = new HashSet<>();
+ for (final Container container : this.previousContainers) {
+ previousContainersIds.add(container.getId().toString());
+ }
+ for (final String expectedContainerId : expectedContainers) {
+ if (!previousContainersIds.contains(expectedContainerId)) {
+
this.evaluatorPreserver.recordRemovedEvaluator(expectedContainerId);
+ LOG.log(Level.WARNING, "Expected container [{0}] not alive, must
have failed during driver restart.",
+ expectedContainerId);
+ failedEvaluators.add(expectedContainerId);
+ }
+ }
+ }
+ if (numExpectedContainers < numPreviousContainers) {
+ // somehow we have more alive evaluators, this should not happen
+ throw new RuntimeException("Expected only [" + numExpectedContainers +
"] containers " +
+ "but resource manager believe that [" + numPreviousContainers + "]
are outstanding for driver.");
+ }
+
+ // numExpectedContainers == numPreviousContainers
+ for (final Container container : this.previousContainers) {
+ LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
+ if (!expectedContainers.contains(container.getId().toString())) {
+ throw new RuntimeException("Not expecting container " +
container.getId().toString());
+ }
+
+ recoveredEvaluators.add(container.getId().toString());
+ }
+ }
+
+ return new EvaluatorRestartInfo(recoveredEvaluators, failedEvaluators);
+ }
+
+ /**
+ * Calls the appropriate handler via REEFEventHandlers, which is a runtime
specific implementation
+ * of the YARN runtime.
+ * @param evaluatorIds the set of evaluator IDs of failed evaluators during
restart.
+ */
+ @Override
+ public void informAboutEvaluatorFailures(final Set<String> evaluatorIds) {
+ for (String evaluatorId : evaluatorIds) {
+ LOG.log(Level.WARNING, "Container [" + evaluatorId +
+ "] has failed during driver restart process, FailedEvaluatorHandler
will be triggered, but " +
+ "no additional evaluator can be requested due to YARN-2433.");
+ // trigger a failed evaluator event
+
this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
+ .setIdentifier(evaluatorId)
+ .setState(ReefServiceProtos.State.FAILED)
+ .setExitCode(1)
+ .setDiagnostics("Container [" + evaluatorId + "] failed during
driver restart process.")
+ .setIsFromPreviousDriver(true)
+ .build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a09c826/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
index acd0c48..01f6744 100644
---
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
+++
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
@@ -251,7 +251,11 @@ public final class RuntimeClock implements Clock {
// waiting interrupted - return to loop
}
}
- this.handlers.onNext(new RuntimeStop(this.timer.getCurrent()));
+ if (this.stoppedOnException == null) {
+ this.handlers.onNext(new RuntimeStop(this.timer.getCurrent()));
+ } else {
+ this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(),
this.stoppedOnException));
+ }
} catch (final Exception e) {
e.printStackTrace();
this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e));