Repository: incubator-reef
Updated Branches:
refs/heads/master 9c820fd55 -> 0181e2cca
[REEF-483] Make Evaluator logging configurable
This addressed the issue by
* Creating `DriverRestartManager` interface and implementation
`YarnDriverRestartManager`.
* Binding ServiceHandlers for the DriverRestartManager to record Evaluators.
* Creating the `EvaluatorPreserver` interface and implementing it in
`DFSEvaluatorPreserver`.
* Improved object usage of evaluator preservation.
JIRA:
[REEF-483](https://issues.apache.org/jira/browse/REEF-483)
Pull Request:
This closes #324
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/0181e2cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/0181e2cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/0181e2cc
Branch: refs/heads/master
Commit: 0181e2ccaa697ef766455770d0bba4e7023cad31
Parents: 9c820fd
Author: Andrew Chung <[email protected]>
Authored: Thu Jul 30 10:42:20 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Aug 3 20:05:50 2015 -0700
----------------------------------------------------------------------
.../reef/client/DriverRestartConfiguration.java | 16 +-
.../reef/client/DriverServiceConfiguration.java | 6 +
.../FailDriverOnEvaluatorLogErrors.java | 31 ++
.../driver/restart/DriverRestartManager.java | 64 +++++
...atorPreservingEvaluatorAllocatedHandler.java | 45 +++
...atorPreservingEvaluatorCompletedHandler.java | 46 +++
...aluatorPreservingEvaluatorFailedHandler.java | 46 +++
.../reef/driver/restart/package-info.java | 22 ++
.../DriverRuntimeRestartConfiguration.java | 13 +-
.../common/driver/DriverStartHandler.java | 64 +++--
.../common/driver/EvaluatorPreserver.java | 51 ++++
.../yarn/driver/YarnContainerManager.java | 281 +++----------------
.../driver/YarnDriverRestartConfiguration.java | 14 +-
.../yarn/driver/YarnDriverRestartManager.java | 164 +++++++++++
.../parameters/YarnEvaluatorPreserver.java | 34 +++
.../restart/DFSEvaluatorLogAppendWriter.java | 76 +++++
.../restart/DFSEvaluatorLogOverwriteWriter.java | 104 +++++++
.../driver/restart/DFSEvaluatorLogWriter.java | 39 +++
.../driver/restart/DFSEvaluatorPreserver.java | 230 +++++++++++++++
.../yarn/driver/restart/package-info.java | 22 ++
20 files changed, 1102 insertions(+), 266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
index 9973fad..225bd41 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
@@ -19,18 +19,17 @@
package org.apache.reef.client;
import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Public;
import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.parameters.DriverRestartCompletedHandlers;
-import org.apache.reef.driver.parameters.DriverRestartContextActiveHandlers;
-import org.apache.reef.driver.parameters.DriverRestartHandler;
-import org.apache.reef.driver.parameters.DriverRestartTaskRunningHandlers;
+import org.apache.reef.driver.parameters.*;
import org.apache.reef.driver.task.RunningTask;
import org.apache.reef.runtime.common.DriverRestartCompleted;
import org.apache.reef.tang.formats.ConfigurationModule;
import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
import org.apache.reef.tang.formats.OptionalImpl;
+import org.apache.reef.tang.formats.OptionalParameter;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.time.event.StartTime;
@@ -41,6 +40,7 @@ import org.apache.reef.wake.time.event.StartTime;
@Public
@ClientSide
@Provided
+@Unstable
public final class DriverRestartConfiguration extends
ConfigurationModuleBuilder {
/**
* This event is fired in place of the ON_DRIVER_STARTED when the Driver is
in fact restarted after failure.
@@ -63,7 +63,15 @@ public final class DriverRestartConfiguration extends
ConfigurationModuleBuilder
public static final OptionalImpl<EventHandler<DriverRestartCompleted>>
ON_DRIVER_RESTART_COMPLETED =
new OptionalImpl<>();
+ /**
+ * Parameter to determine whether the driver should fail or continue if
there are evaluator
+ * preservation log failures. Defaults to false.
+ */
+ public static final OptionalParameter<Boolean>
FAIL_DRIVER_ON_EVALUATOR_LOG_ERROR =
+ new OptionalParameter<>();
+
public static final ConfigurationModule CONF = new
DriverRestartConfiguration()
+ .bindNamedParameter(FailDriverOnEvaluatorLogErrors.class,
FAIL_DRIVER_ON_EVALUATOR_LOG_ERROR)
.bindSetEntry(DriverRestartHandler.class, ON_DRIVER_RESTARTED)
.bindSetEntry(DriverRestartTaskRunningHandlers.class,
ON_DRIVER_RESTART_TASK_RUNNING)
.bindSetEntry(DriverRestartContextActiveHandlers.class,
ON_DRIVER_RESTART_CONTEXT_ACTIVE)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
index d320f2f..4fe0452 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverServiceConfiguration.java
@@ -75,6 +75,11 @@ public final class DriverServiceConfiguration extends
ConfigurationModuleBuilder
public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED
= new RequiredImpl<>();
/**
+ * The event handler invoked right after the driver restarts.
+ */
+ public static final OptionalImpl<EventHandler<StartTime>>
ON_DRIVER_RESTARTED = new OptionalImpl<>();
+
+ /**
* The event handler invoked right before the driver shuts down. Defaults to
ignore.
*/
public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP =
new OptionalImpl<>();
@@ -161,6 +166,7 @@ public final class DriverServiceConfiguration extends
ConfigurationModuleBuilder
// Start and stop events are the same handlers for applications and
services.
.bindSetEntry(Clock.StartHandler.class, ON_DRIVER_STARTED)
.bindSetEntry(Clock.StopHandler.class, ON_DRIVER_STOP)
+ .bindSetEntry(ServiceDriverRestartedHandlers.class, ON_DRIVER_RESTARTED)
// Evaluator handlers
.bindSetEntry(ServiceEvaluatorAllocatedHandlers.class,
ON_EVALUATOR_ALLOCATED)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/FailDriverOnEvaluatorLogErrors.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/FailDriverOnEvaluatorLogErrors.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/FailDriverOnEvaluatorLogErrors.java
new file mode 100644
index 0000000..08c5a29
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/FailDriverOnEvaluatorLogErrors.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Whether to fail driver on Evaluator preservation log errors.
+ */
+@NamedParameter(doc = "Handler for FailedContext", default_value = "false")
+public final class FailDriverOnEvaluatorLogErrors implements Name<Boolean> {
+ private FailDriverOnEvaluatorLogErrors() {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
new file mode 100644
index 0000000..c60740f
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+
+/**
+ * Classes implementing this interface are in charge of recording evaluator
+ * changes as they are allocated as well as recovering Evaluators and
+ * discovering which evaluators are lost on the event of a driver restart.
+ */
+@DriverSide
+@Private
+@RuntimeAuthor
+@Unstable
+public interface DriverRestartManager {
+
+ /**
+ * Determines whether or not the driver has been restarted.
+ */
+ boolean isRestart();
+
+ /**
+ * This function has a few jobs crucial jobs to enable restart:
+ * 1. Recover the list of evaluators that are reported to be alive by the
Resource Manager.
+ * 2. Make necessary operations to inform relevant runtime components about
evaluators that are alive
+ * with the set of evaluator IDs recovered in step 1.
+ * 3. Make necessary operations to inform relevant runtime components about
evaluators that have failed
+ * during the driver restart period.
+ */
+ void onRestart();
+
+ /**
+ * Records the evaluators when it is allocated.
+ * @param id The evaluator ID of the allocated evaluator.
+ */
+ void recordAllocatedEvaluator(final String id);
+
+
+ /**
+ * Records a removed evaluator into the evaluator log.
+ * @param id The evaluator ID of the removed evaluator.
+ */
+ void recordRemovedEvaluator(final String id);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
new file mode 100644
index 0000000..ca9321f
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorAllocatedHandler.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Records allocated evaluators for recovery on driver restart by using a
DriverRestartManager.
+ */
+public final class EvaluatorPreservingEvaluatorAllocatedHandler implements
EventHandler<AllocatedEvaluator> {
+ private final DriverRestartManager driverRestartManager;
+
+ @Inject
+ private EvaluatorPreservingEvaluatorAllocatedHandler(final
DriverRestartManager driverRestartManager) {
+ this.driverRestartManager = driverRestartManager;
+ }
+
+ /**
+ * Records the allocatedEvaluator ID with the DriverRestartManager.
+ * @param value the allocated evaluator event.
+ */
+ @Override
+ public void onNext(final AllocatedEvaluator value) {
+ this.driverRestartManager.recordAllocatedEvaluator(value.getId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorCompletedHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorCompletedHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorCompletedHandler.java
new file mode 100644
index 0000000..96536c3
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorCompletedHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Removes an evaluator from the evaluators to restart after it has completed.
+ */
+public final class EvaluatorPreservingEvaluatorCompletedHandler implements
EventHandler<CompletedEvaluator> {
+ private final DriverRestartManager driverRestartManager;
+
+ @Inject
+ private EvaluatorPreservingEvaluatorCompletedHandler(final
DriverRestartManager driverRestartManager) {
+ this.driverRestartManager = driverRestartManager;
+ }
+
+ /**
+ * Removes the completed evaluator from the list of evaluators to recover
+ * once it is completed.
+ * @param value The completed evaluator event.
+ */
+ @Override
+ public void onNext(final CompletedEvaluator value) {
+ this.driverRestartManager.recordRemovedEvaluator(value.getId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorFailedHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorFailedHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorFailedHandler.java
new file mode 100644
index 0000000..0c55759
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorPreservingEvaluatorFailedHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.restart;
+
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Removes an evaluator from the evaluators to restart after it has failed.
+ */
+public final class EvaluatorPreservingEvaluatorFailedHandler implements
EventHandler<FailedEvaluator> {
+ private final DriverRestartManager driverRestartManager;
+
+ @Inject
+ private EvaluatorPreservingEvaluatorFailedHandler(final DriverRestartManager
driverRestartManager) {
+ this.driverRestartManager = driverRestartManager;
+ }
+
+ /**
+ * Removes the completed evaluator from the list of evaluators to recover
+ * once it is failed.
+ * @param value The failed evaluator event.
+ */
+ @Override
+ public void onNext(final FailedEvaluator value) {
+ this.driverRestartManager.recordRemovedEvaluator(value.getId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/package-info.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/package-info.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/package-info.java
new file mode 100644
index 0000000..bf68f7c
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This package provides restart event service handlers.
+ */
+package org.apache.reef.driver.restart;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
index 1f0a527..70d0c9b 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeRestartConfiguration.java
@@ -20,6 +20,12 @@ package org.apache.reef.runtime.common.driver;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.ServiceEvaluatorAllocatedHandlers;
+import org.apache.reef.driver.parameters.ServiceEvaluatorCompletedHandlers;
+import org.apache.reef.driver.parameters.ServiceEvaluatorFailedHandlers;
+import
org.apache.reef.driver.restart.EvaluatorPreservingEvaluatorAllocatedHandler;
+import
org.apache.reef.driver.restart.EvaluatorPreservingEvaluatorCompletedHandler;
+import
org.apache.reef.driver.restart.EvaluatorPreservingEvaluatorFailedHandler;
import org.apache.reef.tang.formats.*;
/**
@@ -33,6 +39,9 @@ public final class DriverRuntimeRestartConfiguration extends
ConfigurationModule
private DriverRuntimeRestartConfiguration() {
}
- // TODO: bind service handlers in REEF-483
- public static final ConfigurationModule CONF = new
DriverRuntimeRestartConfiguration().build();
+ public static final ConfigurationModule CONF = new
DriverRuntimeRestartConfiguration()
+ .bindSetEntry(ServiceEvaluatorAllocatedHandlers.class,
EvaluatorPreservingEvaluatorAllocatedHandler.class)
+ .bindSetEntry(ServiceEvaluatorFailedHandlers.class,
EvaluatorPreservingEvaluatorFailedHandler.class)
+ .bindSetEntry(ServiceEvaluatorCompletedHandlers.class,
EvaluatorPreservingEvaluatorCompletedHandler.class)
+ .build();
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index 19c2e7a..bfd52a3 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -19,6 +19,8 @@
package org.apache.reef.runtime.common.driver;
import org.apache.reef.driver.parameters.DriverRestartHandler;
+import org.apache.reef.driver.parameters.ServiceDriverRestartedHandlers;
+import org.apache.reef.driver.restart.DriverRestartManager;
import org.apache.reef.exception.DriverFatalRuntimeException;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.util.Optional;
@@ -31,38 +33,54 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
- * This is bound to the start event of the clock and dispatches it to the
approriate application code.
+ * This is bound to the start event of the clock and dispatches it to the
appropriate application code.
*/
public final class DriverStartHandler implements EventHandler<StartTime> {
private static final Logger LOG =
Logger.getLogger(DriverStartHandler.class.getName());
private final Set<EventHandler<StartTime>> startHandlers;
private final Optional<Set<EventHandler<StartTime>>> restartHandlers;
- private final DriverStatusManager driverStatusManager;
+ private final Optional<Set<EventHandler<StartTime>>> serviceRestartHandlers;
+ private final Optional<DriverRestartManager> driverRestartManager;
@Inject
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
final Set<EventHandler<StartTime>> startHandler,
- @Parameter(DriverRestartHandler.class) final
Set<EventHandler<StartTime>> restartHandlers,
+ @Parameter(DriverRestartHandler.class)
+ final Set<EventHandler<StartTime>> restartHandlers,
+ @Parameter(ServiceDriverRestartedHandlers.class)
+ final Set<EventHandler<StartTime>> serviceRestartHandlers,
+ final DriverRestartManager driverRestartManager,
final DriverStatusManager driverStatusManager) {
- this.startHandlers = startHandler;
- this.restartHandlers = Optional.of(restartHandlers);
- this.driverStatusManager = driverStatusManager;
- LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandler
[{0}] and RestartHandler [{1}]",
- new String[]{this.startHandlers.toString(),
this.restartHandlers.toString()});
+ this(startHandler, Optional.of(restartHandlers),
Optional.of(serviceRestartHandlers),
+ Optional.of(driverRestartManager), driverStatusManager);
+ LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers
[{0}], RestartHandlers [{1}]," +
+ "and ServiceRestartHandlers [{2}], with a restart manager.",
+ new String[] {this.startHandlers.toString(),
this.restartHandlers.toString(),
+ this.serviceRestartHandlers.toString()});
}
@Inject
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
- final Set<EventHandler<StartTime>> startHandler,
+ final Set<EventHandler<StartTime>> startHandlers,
final DriverStatusManager driverStatusManager) {
- this.startHandlers = startHandler;
- this.restartHandlers = Optional.empty();
- this.driverStatusManager = driverStatusManager;
- LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandler
[{0}] and no RestartHandler",
+ this(startHandlers, Optional.<Set<EventHandler<StartTime>>>empty(),
+ Optional.<Set<EventHandler<StartTime>>>empty(),
Optional.<DriverRestartManager>empty(), driverStatusManager);
+ LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers
[{0}] and no restart.",
this.startHandlers.toString());
}
+ private DriverStartHandler(final Set<EventHandler<StartTime>> startHandler,
+ final Optional<Set<EventHandler<StartTime>>>
restartHandlers,
+ final Optional<Set<EventHandler<StartTime>>>
serviceRestartHandlers,
+ final Optional<DriverRestartManager>
driverRestartManager,
+ final DriverStatusManager driverStatusManager) {
+ this.startHandlers = startHandler;
+ this.restartHandlers = restartHandlers;
+ this.serviceRestartHandlers = serviceRestartHandlers;
+ this.driverRestartManager = driverRestartManager;
+ }
+
@Override
public void onNext(final StartTime startTime) {
if (isRestart()) {
@@ -73,10 +91,18 @@ public final class DriverStartHandler implements
EventHandler<StartTime> {
}
private void onRestart(final StartTime startTime) {
- if (restartHandlers.isPresent()) {
- for (EventHandler<StartTime> restartHandler :
this.restartHandlers.get()) {
+ if (this.driverRestartManager.isPresent() &&
this.restartHandlers.isPresent()) {
+ for (EventHandler<StartTime> serviceRestartHandler :
this.serviceRestartHandlers.get()) {
+ serviceRestartHandler.onNext(startTime);
+ }
+
+ for (EventHandler<StartTime> restartHandler :
this.restartHandlers.get()){
restartHandler.onNext(startTime);
}
+
+ // This can only be called after calling client restart handlers because
REEF.NET
+ // JobDriver requires making this call to set up the InterOp handlers.
+ this.driverRestartManager.get().onRestart();
} else {
throw new DriverFatalRuntimeException("Driver restart happened, but no
ON_DRIVER_RESTART handler is bound.");
}
@@ -89,9 +115,13 @@ public final class DriverStartHandler implements
EventHandler<StartTime> {
}
/**
- * @return true, if the Driver is in fact being restarted.
+ * @return true, if the configurations enable restart and the Driver is in
fact being restarted.
*/
private boolean isRestart() {
- return this.driverStatusManager.getNumPreviousContainers() > 0;
+ if (this.driverRestartManager.isPresent()) {
+ return this.driverRestartManager.get().isRestart();
+ }
+
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorPreserver.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorPreserver.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorPreserver.java
new file mode 100644
index 0000000..5c51d3c
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorPreserver.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+
+import java.util.Set;
+
+/**
+ * A interface to preserve evaluators across driver restarts.
+ */
+@DriverSide
+@Private
+@RuntimeAuthor
+@Unstable
+public interface EvaluatorPreserver {
+ /**
+ * Called on driver restart when evaluators are to be recovered.
+ */
+ Set<String> recoverEvaluators();
+
+ /**
+ * Called when an evaluator is to be preserved.
+ */
+ void recordAllocatedEvaluator(final String id);
+
+ /**
+ * Called when an evaluator is to be removed.
+ * @param id
+ */
+ void recordRemovedEvaluator(final String id);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index 0e7d0e7..af460d8 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -20,10 +20,6 @@ package org.apache.reef.runtime.yarn.driver;
import com.google.protobuf.ByteString;
import org.apache.commons.collections.ListUtils;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -36,13 +32,11 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.reef.exception.DriverFatalRuntimeException;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.driver.DriverStatusManager;
-import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
import
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl;
import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
import
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
-import org.apache.reef.runtime.yarn.util.YarnTypes;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.remote.Encoder;
@@ -64,10 +58,6 @@ final class YarnContainerManager
private static final String RUNTIME_NAME = "YARN";
- private static final String ADD_FLAG = "+";
-
- private static final String REMOVE_FLAG = "-";
-
private final YarnClient yarnClient = YarnClient.createYarnClient();
private final Queue<AMRMClient.ContainerRequest> requestsBeforeSentToRM =
new ConcurrentLinkedQueue<>();
@@ -133,7 +123,7 @@ final class YarnContainerManager
new Object[]{id, containers.size(),
this.containerRequestCounter.get()});
for (final Container container : containers) {
- handleNewContainer(container, false);
+ handleNewContainer(container);
}
LOG.log(Level.FINE, "TIME: Processed Containers {0}", id);
@@ -222,7 +212,6 @@ final class YarnContainerManager
LOG.log(Level.FINE, "Release container: {0}", containerId);
final Container container = this.containers.removeAndGet(containerId);
this.resourceManager.releaseAssignedContainer(container.getId());
- logContainerRemoval(container.getId().toString());
updateRuntimeStatus();
}
@@ -252,16 +241,6 @@ final class YarnContainerManager
LOG.log(Level.WARNING, "Unable to register application master.", e);
onRuntimeError(e);
}
-
- // TODO: this is currently being developed on a hacked 2.4.0 bits, should
be 2.4.1
- final String minVersionToGetPreviousContainer = "2.4.0";
-
- // when supported, obtain the list of the containers previously allocated,
and write info to driver folder
- if (YarnTypes.isAtOrAfterVersion(minVersionToGetPreviousContainer)) {
- LOG.log(Level.FINEST, "Hadoop version is {0} or after with support to
retain previous containers, " +
- "processing previous containers.", minVersionToGetPreviousContainer);
- processPreviousContainers();
- }
}
void onStop(final Throwable exception) {
@@ -320,48 +299,6 @@ final class YarnContainerManager
this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build());
}
- private void processPreviousContainers() {
- final List<Container> previousContainers =
this.registration.getRegistration().getContainersFromPreviousAttempts();
- if (previousContainers != null && !previousContainers.isEmpty()) {
- LOG.log(Level.INFO, "Driver restarted, with {0} previous containers",
previousContainers.size());
-
this.driverStatusManager.setNumPreviousContainers(previousContainers.size());
- final Set<String> expectedContainers =
getExpectedContainersFromLogReplay();
- final int numExpectedContainers = expectedContainers.size();
- final int numPreviousContainers = previousContainers.size();
- if (numExpectedContainers > numPreviousContainers) {
- // we expected more containers to be alive, some containers must have
died during driver restart
- LOG.log(Level.WARNING, "Expected {0} containers while only {1} are
still alive",
- new Object[]{numExpectedContainers, numPreviousContainers});
- final Set<String> previousContainersIds = new HashSet<>();
- for (final Container container : previousContainers) {
- previousContainersIds.add(container.getId().toString());
- }
- for (final String expectedContainerId : expectedContainers) {
- if (!previousContainersIds.contains(expectedContainerId)) {
- logContainerRemoval(expectedContainerId);
- LOG.log(Level.WARNING, "Expected container [{0}] not alive, must
have failed during driver restart.",
- expectedContainerId);
- informAboutConatinerFailureDuringRestart(expectedContainerId);
- }
- }
- }
- if (numExpectedContainers < numPreviousContainers) {
- // somehow we have more alive evaluators, this should not happen
- throw new RuntimeException("Expected only [" + numExpectedContainers +
"] containers " +
- "but resource manager believe that [" + numPreviousContainers + "]
are outstanding for driver.");
- }
-
- // numExpectedContainers == numPreviousContainers
- for (final Container container : previousContainers) {
- LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
- if (!expectedContainers.contains(container.getId().toString())) {
- throw new DriverFatalRuntimeException("Not expecting container " +
container.getId().toString());
- }
- handleNewContainer(container, true);
- }
- }
- }
-
/**
* Handles container status reports. Calls come from YARN.
*
@@ -394,7 +331,6 @@ final class YarnContainerManager
status.setExitCode(value.getExitStatus());
// remove the completed container (can be either done/killed/failed)
from book keeping
this.containers.removeAndGet(containerId);
- logContainerRemoval(containerId);
break;
default:
LOG.info("Container running");
@@ -426,53 +362,48 @@ final class YarnContainerManager
*
* @param container newly allocated
*/
- private void handleNewContainer(final Container container, final boolean
isRecoveredContainer) {
+ private void handleNewContainer(final Container container) {
LOG.log(Level.FINE, "allocated container: id[ {0} ]", container.getId());
- // recovered container is not new allocation, it is just checking back
from previous driver failover
- if (!isRecoveredContainer) {
- synchronized (this) {
- if (matchContainerWithPendingRequest(container)) {
- final AMRMClient.ContainerRequest matchedRequest =
this.requestsAfterSentToRM.peek();
- this.containerRequestCounter.decrement();
- this.containers.add(container);
-
- LOG.log(Level.FINEST, "{0} matched with {1}", new
Object[]{container.toString(), matchedRequest.toString()});
-
- // Due to the bug YARN-314 and the workings of AMRMCClientAsync,
when x-priority m-capacity zero-container
- // request and x-priority n-capacity nonzero-container request are
sent together, where m > n, RM ignores
- // the latter.
- // Therefore it is necessary avoid sending zero-container request,
even it means getting extra containers.
- // It is okay to send nonzero m-capacity and n-capacity request
together since bigger containers
- // can be matched.
- // TODO: revisit this when implementing locality-strictness (i.e. a
specific rack request can be ignored)
- if (this.requestsAfterSentToRM.size() > 1) {
- try {
- this.resourceManager.removeContainerRequest(matchedRequest);
- } catch (final Exception e) {
- LOG.log(Level.WARNING, "Nothing to remove from Async AMRM
client's queue, " +
- "removal attempt failed with exception", e);
- }
+ synchronized (this) {
+ if (matchContainerWithPendingRequest(container)) {
+ final AMRMClient.ContainerRequest matchedRequest =
this.requestsAfterSentToRM.peek();
+ this.containerRequestCounter.decrement();
+ this.containers.add(container);
+
+ LOG.log(Level.FINEST, "{0} matched with {1}", new
Object[]{container.toString(), matchedRequest.toString()});
+
+ // Due to the bug YARN-314 and the workings of AMRMCClientAsync, when
x-priority m-capacity zero-container
+ // request and x-priority n-capacity nonzero-container request are
sent together, where m > n, RM ignores
+ // the latter.
+ // Therefore it is necessary avoid sending zero-container request,
even it means getting extra containers.
+ // It is okay to send nonzero m-capacity and n-capacity request
together since bigger containers
+ // can be matched.
+ // TODO: revisit this when implementing locality-strictness (i.e. a
specific rack request can be ignored)
+ if (this.requestsAfterSentToRM.size() > 1) {
+ try {
+ this.resourceManager.removeContainerRequest(matchedRequest);
+ } catch (final Exception e) {
+ LOG.log(Level.WARNING, "Nothing to remove from Async AMRM client's
queue, " +
+ "removal attempt failed with exception", e);
}
-
- this.requestsAfterSentToRM.remove();
- doHomogeneousRequests();
-
- LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core
number = {1}",
- new Object[]{container.getResource().getMemory(),
container.getResource().getVirtualCores()});
-
this.reefEventHandlers.onResourceAllocation(ResourceAllocationEventImpl.newBuilder()
- .setIdentifier(container.getId().toString())
- .setNodeId(container.getNodeId().toString())
- .setResourceMemory(container.getResource().getMemory())
- .setVirtualCores(container.getResource().getVirtualCores())
- .build());
- // we only add this to Container log after the Container has been
registered as an REEF Evaluator.
- logContainerAddition(container.getId().toString());
- this.updateRuntimeStatus();
- } else {
- LOG.log(Level.WARNING, "Got an extra container {0} that doesn't
match, releasing...", container.getId());
- this.resourceManager.releaseAssignedContainer(container.getId());
}
+
+ this.requestsAfterSentToRM.remove();
+ doHomogeneousRequests();
+
+ LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number
= {1}",
+ new Object[]{container.getResource().getMemory(),
container.getResource().getVirtualCores()});
+
this.reefEventHandlers.onResourceAllocation(ResourceAllocationEventImpl.newBuilder()
+ .setIdentifier(container.getId().toString())
+ .setNodeId(container.getNodeId().toString())
+ .setResourceMemory(container.getResource().getMemory())
+ .setVirtualCores(container.getResource().getVirtualCores())
+ .build());
+ this.updateRuntimeStatus();
+ } else {
+ LOG.log(Level.WARNING, "Got an extra container {0} that doesn't match,
releasing...", container.getId());
+ this.resourceManager.releaseAssignedContainer(container.getId());
}
}
}
@@ -564,138 +495,4 @@ final class YarnContainerManager
this.reefEventHandlers.onRuntimeStatus(runtimeStatusBuilder.build());
}
-
- private Set<String> getExpectedContainersFromLogReplay() {
- final org.apache.hadoop.conf.Configuration config = new
org.apache.hadoop.conf.Configuration();
- config.setBoolean("dfs.support.append", true);
- config.setBoolean("dfs.support.broken.append", true);
- final Set<String> expectedContainers = new HashSet<>();
- try {
- final FileSystem fs = FileSystem.get(config);
- final Path path = new Path(getChangeLogLocation());
- if (!fs.exists(path)) {
- // empty set
- return expectedContainers;
- } else {
- final BufferedReader br = new BufferedReader(new
InputStreamReader(fs.open(path)));
- String line = br.readLine();
- while (line != null) {
- if (line.startsWith(ADD_FLAG)) {
- final String containerId = line.substring(ADD_FLAG.length());
- if (expectedContainers.contains(containerId)) {
- throw new DriverFatalRuntimeException("Duplicated add container
record found in the " +
- "change log for container " + containerId);
- }
- expectedContainers.add(containerId);
- } else if (line.startsWith(REMOVE_FLAG)) {
- final String containerId = line.substring(REMOVE_FLAG.length());
- if (!expectedContainers.contains(containerId)) {
- throw new DriverFatalRuntimeException("Change log includes
record that try to " +
- "remove non-exist or duplicate remove record for container +
" + containerId);
- }
- expectedContainers.remove(containerId);
- }
- line = br.readLine();
- }
- br.close();
- }
- } catch (final IOException e) {
- throw new DriverFatalRuntimeException("Cannot read from log file", e);
- }
- return expectedContainers;
- }
-
- private void informAboutConatinerFailureDuringRestart(final String
containerId) {
- LOG.log(Level.WARNING, "Container [" + containerId +
- "] has failed during driver restart process, FailedEvaluatorHandler
will be triggered, but " +
- "no additional evaluator can be requested due to YARN-2433.");
- // trigger a failed evaluator event
-
this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
- .setIdentifier(containerId)
- .setState(ReefServiceProtos.State.FAILED)
- .setExitCode(1)
- .setDiagnostics("Container [" + containerId + "] failed during driver
restart process.")
- .setIsFromPreviousDriver(true)
- .build());
- }
-
- private synchronized void writeToEvaluatorLog(final String entry) throws
IOException {
- final org.apache.hadoop.conf.Configuration config = new
org.apache.hadoop.conf.Configuration();
- config.setBoolean("dfs.support.append", true);
- config.setBoolean("dfs.support.broken.append", true);
- final FileSystem fs = getFileSystemInstance();
- final Path path = new Path(getChangeLogLocation());
- final boolean appendToLog = fs.exists(path);
-
- try (
- final BufferedWriter bw = appendToLog ?
- new BufferedWriter(new OutputStreamWriter(fs.append(path))) :
- new BufferedWriter(new OutputStreamWriter(fs.create(path)));
- ) {
- bw.write(entry);
- } catch (final IOException e) {
- if (appendToLog) {
- LOG.log(Level.FINE, "Unable to add an entry to the Evaluator log.
Attempting append by delete and recreate", e);
- appendByDeleteAndCreate(fs, path, entry);
- }
- }
- }
-
- private FileSystem getFileSystemInstance() throws IOException {
- final org.apache.hadoop.conf.Configuration config = new
org.apache.hadoop.conf.Configuration();
- config.setBoolean("dfs.support.append", true);
- config.setBoolean("dfs.support.broken.append", true);
- return FileSystem.get(config);
- }
-
- /**
- * For certain HDFS implementation, the append operation may not be
supported (e.g., Azure blob - wasb)
- * in this case, we will emulate the append operation by reading the
content, appending entry at the end,
- * then recreating the file with appended content.
- *
- * @throws java.io.IOException when the file can't be written.
- */
-
- private void appendByDeleteAndCreate(final FileSystem fs, final Path path,
final String appendEntry)
- throws IOException {
- final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-
- try (final InputStream inputStream = fs.open(path)) {
- IOUtils.copyBytes(inputStream, outputStream, 4096, true);
- }
-
- final String newContent = outputStream.toString() + appendEntry;
- fs.delete(path, true);
-
- try (final FSDataOutputStream newOutput = fs.create(path);
- final InputStream newInput = new
ByteArrayInputStream(newContent.getBytes())) {
- IOUtils.copyBytes(newInput, newOutput, 4096, true);
- }
-
- }
-
- private String getChangeLogLocation() {
- return "/ReefApplications/" + EvaluatorManager.getJobIdentifier() +
"/evaluatorsChangesLog";
- }
-
- private void logContainerAddition(final String containerId) {
- final String entry = ADD_FLAG + containerId + System.lineSeparator();
- logContainerChange(entry);
- }
-
- private void logContainerRemoval(final String containerId) {
- final String entry = REMOVE_FLAG + containerId + System.lineSeparator();
- logContainerChange(entry);
- }
-
- private void logContainerChange(final String entry) {
- try {
- writeToEvaluatorLog(entry);
- } catch (final IOException e) {
- final String errorMsg = "Unable to log the change of container [" +
entry +
- "] to the container log. Driver restart won't work properly.";
- LOG.log(Level.WARNING, errorMsg, e);
- throw new DriverFatalRuntimeException(errorMsg);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
index 7c11fec..5f10bf8 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
@@ -19,11 +19,16 @@
package org.apache.reef.runtime.yarn.driver;
import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.driver.restart.DriverRestartManager;
import org.apache.reef.runtime.common.driver.DriverRuntimeRestartConfiguration;
+import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
+import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
import org.apache.reef.tang.formats.ConfigurationModule;
import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalImpl;
/**
* Use this ConfigurationModule to configure YARN-specific Restart options for
the driver.
@@ -32,12 +37,19 @@ import
org.apache.reef.tang.formats.ConfigurationModuleBuilder;
@ClientSide
@Public
@Provided
+@Unstable
public final class YarnDriverRestartConfiguration extends
ConfigurationModuleBuilder {
/**
+ * The Evaluator Preserver implementation used for YARN. Defaults to
DFSEvalutorPreserver.
+ */
+ public static final OptionalImpl<EvaluatorPreserver> EVALUATOR_PRESERVER =
new OptionalImpl<>();
+
+ /**
* This event is fired in place of the ON_DRIVER_STARTED when the Driver is
in fact restarted after failure.
*/
- // TODO: Bind runtime-specific restart logic for REEF-483.
public static final ConfigurationModule CONF = new
YarnDriverRestartConfiguration()
+ .bindNamedParameter(YarnEvaluatorPreserver.class, EVALUATOR_PRESERVER)
+ .bindImplementation(DriverRestartManager.class,
YarnDriverRestartManager.class)
.merge(DriverRuntimeRestartConfiguration.CONF)
.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
new file mode 100644
index 0000000..d6be373
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartManager.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.driver.restart.DriverRestartManager;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
+import
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
+import org.apache.reef.runtime.yarn.driver.parameters.YarnEvaluatorPreserver;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The implementation of restart manager for YARN. Handles evaluator
preservation as well
+ * as evaluator recovery on YARN.
+ */
+@DriverSide
+@RuntimeAuthor
+@Private
+@Unstable
+public final class YarnDriverRestartManager implements DriverRestartManager {
+
+ private static final Logger LOG =
Logger.getLogger(YarnDriverRestartManager.class.getName());
+
+ private final EvaluatorPreserver evaluatorPreserver;
+ private final ApplicationMasterRegistration registration;
+ private final DriverStatusManager driverStatusManager;
+ private final REEFEventHandlers reefEventHandlers;
+ private List<Container> previousContainers;
+
+ @Inject
+ private YarnDriverRestartManager(@Parameter(YarnEvaluatorPreserver.class)
+ final EvaluatorPreserver evaluatorPreserver,
+ final REEFEventHandlers reefEventHandlers,
+ final ApplicationMasterRegistration registration,
+ final DriverStatusManager driverStatusManager){
+ this.registration = registration;
+ this.evaluatorPreserver = evaluatorPreserver;
+ this.driverStatusManager = driverStatusManager;
+ this.reefEventHandlers = reefEventHandlers;
+ this.previousContainers = null;
+ }
+
+ @Override
+ public boolean isRestart() {
+ // TODO [REEF-508]: Determine restart based on environment variable as
provided by YARN.
+ if (this.previousContainers == null) {
+ this.previousContainers =
this.registration.getRegistration().getContainersFromPreviousAttempts();
+
+ // If it's still null, create an empty list to indicate that it's not a
restart.
+ if (this.previousContainers == null) {
+ this.previousContainers = new ArrayList<>();
+ }
+ }
+
+ return !this.previousContainers.isEmpty();
+ }
+
+ @Override
+ public void onRestart() {
+ final Set<String> recoveredEvaluators = new HashSet<>();
+ final Set<String> failedEvaluators = new HashSet<>();
+
+ if (this.previousContainers != null && !this.previousContainers.isEmpty())
{
+ LOG.log(Level.INFO, "Driver restarted, with {0} previous containers",
this.previousContainers.size());
+ final Set<String> expectedContainers =
this.evaluatorPreserver.recoverEvaluators();
+
+ final int numExpectedContainers = expectedContainers.size();
+ final int numPreviousContainers = this.previousContainers.size();
+ if (numExpectedContainers > numPreviousContainers) {
+ // we expected more containers to be alive, some containers must have
died during driver restart
+ LOG.log(Level.WARNING, "Expected {0} containers while only {1} are
still alive",
+ new Object[]{numExpectedContainers, numPreviousContainers});
+ final Set<String> previousContainersIds = new HashSet<>();
+ for (final Container container : this.previousContainers) {
+ previousContainersIds.add(container.getId().toString());
+ }
+ for (final String expectedContainerId : expectedContainers) {
+ if (!previousContainersIds.contains(expectedContainerId)) {
+
this.evaluatorPreserver.recordRemovedEvaluator(expectedContainerId);
+ LOG.log(Level.WARNING, "Expected container [{0}] not alive, must
have failed during driver restart.",
+ expectedContainerId);
+ failedEvaluators.add(expectedContainerId);
+ }
+ }
+ }
+ if (numExpectedContainers < numPreviousContainers) {
+ // somehow we have more alive evaluators, this should not happen
+ throw new RuntimeException("Expected only [" + numExpectedContainers +
"] containers " +
+ "but resource manager believe that [" + numPreviousContainers + "]
are outstanding for driver.");
+ }
+
+ // numExpectedContainers == numPreviousContainers
+ for (final Container container : this.previousContainers) {
+ LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
+ if (!expectedContainers.contains(container.getId().toString())) {
+ throw new RuntimeException("Not expecting container " +
container.getId().toString());
+ }
+
+ recoveredEvaluators.add(container.getId().toString());
+ }
+ }
+
+ this.informAboutEvaluatorAlive(recoveredEvaluators);
+ this.informAboutEvaluatorFailures(failedEvaluators);
+ }
+
+ private void informAboutEvaluatorAlive(final Set<String> evaluatorIds) {
+ // We will wait for these evaluators to contact us, so we do not need to
record the entire container information.
+ this.driverStatusManager.setNumPreviousContainers(evaluatorIds.size());
+ }
+
+ private void informAboutEvaluatorFailures(final Set<String> evaluatorIds) {
+ for (String evaluatorId : evaluatorIds) {
+ LOG.log(Level.WARNING, "Container [" + evaluatorId +
+ "] has failed during driver restart process, FailedEvaluatorHandler
will be triggered, but " +
+ "no additional evaluator can be requested due to YARN-2433.");
+ // trigger a failed evaluator event
+
this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder()
+ .setIdentifier(evaluatorId)
+ .setState(ReefServiceProtos.State.FAILED)
+ .setExitCode(1)
+ .setDiagnostics("Container [" + evaluatorId + "] failed during
driver restart process.")
+ .setIsFromPreviousDriver(true)
+ .build());
+ }
+ }
+
+ @Override
+ public void recordAllocatedEvaluator(final String id) {
+ this.evaluatorPreserver.recordAllocatedEvaluator(id);
+ }
+
+ @Override
+ public void recordRemovedEvaluator(final String id) {
+ this.evaluatorPreserver.recordRemovedEvaluator(id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/YarnEvaluatorPreserver.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/YarnEvaluatorPreserver.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/YarnEvaluatorPreserver.java
new file mode 100644
index 0000000..9b07f0b
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/YarnEvaluatorPreserver.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.parameters;
+
+import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
+import org.apache.reef.runtime.yarn.driver.restart.DFSEvaluatorPreserver;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The Evaluator Preserver to use on YARN, defaults to DFS.
+ */
+@NamedParameter(doc = "The Evaluator Preserver to use on YARN, defaults to
DFS.",
+ default_class = DFSEvaluatorPreserver.class)
+public final class YarnEvaluatorPreserver implements Name<EvaluatorPreserver> {
+ private YarnEvaluatorPreserver() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
new file mode 100644
index 0000000..b88f4a0
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.annotations.audience.Private;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+/**
+ * The DFS evaluator logger that performs regular append. dfs.support.append
should be true.
+ */
+@Private
+public final class DFSEvaluatorLogAppendWriter implements
DFSEvaluatorLogWriter {
+
+ private final FileSystem fileSystem;
+
+ private final Path changelogPath;
+
+ private boolean fsClosed = false;
+
+ DFSEvaluatorLogAppendWriter(final FileSystem fileSystem, final Path
changelogPath) {
+ this.fileSystem = fileSystem;
+ this.changelogPath = changelogPath;
+ }
+
+ /**
+ * Writes a formatted entry (addition or removal) for an Evaluator ID into
the DFS evaluator log.
+ * The entry is appended regularly by an FS that supports append.
+ * @param formattedEntry The formatted entry (entry with evaluator ID and
addition/removal information).
+ * @throws IOException
+ */
+ @Override
+ public synchronized void writeToEvaluatorLog(final String formattedEntry)
throws IOException {
+ final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
+
+ try (
+ final BufferedWriter bw = fileCreated ?
+ new BufferedWriter(new
OutputStreamWriter(this.fileSystem.append(this.changelogPath))) :
+ new BufferedWriter(new
OutputStreamWriter(this.fileSystem.create(this.changelogPath)))
+ ) {
+ bw.write(formattedEntry);
+ }
+ }
+
+ /**
+ * Closes the FileSystem.
+ * @throws Exception
+ */
+ @Override
+ public synchronized void close() throws Exception {
+ if (this.fileSystem != null && !this.fsClosed) {
+ this.fileSystem.close();
+ this.fsClosed = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
new file mode 100644
index 0000000..e461b45
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.reef.annotations.audience.Private;
+
+import java.io.*;
+
+/**
+ * The DFS evaluator logger that does not support append and does append by
overwrite.
+ * dfs.support.append should be false.
+ */
+@Private
+public final class DFSEvaluatorLogOverwriteWriter implements
DFSEvaluatorLogWriter {
+
+ private final FileSystem fileSystem;
+
+ private final Path changelogPath;
+
+ private boolean fsClosed = false;
+
+ DFSEvaluatorLogOverwriteWriter(final FileSystem fileSystem, final Path
changelogPath) {
+ this.fileSystem = fileSystem;
+ this.changelogPath = changelogPath;
+ }
+
+ /**
+ * Writes a formatted entry (addition or removal) for an Evaluator ID into
the DFS evaluator log.
+ * The log is appended to by reading first, adding on the information, and
then overwriting the entire
+ * log.
+ * @param formattedEntry The formatted entry (entry with evaluator ID and
addition/removal information).
+ * @throws IOException when file cannot be written.
+ */
+ @Override
+ public synchronized void writeToEvaluatorLog(final String formattedEntry)
throws IOException {
+ final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
+
+ if (!fileCreated) {
+ try (
+ final BufferedWriter bw =
+ new BufferedWriter(new
OutputStreamWriter(this.fileSystem.create(this.changelogPath)))) {
+ bw.write(formattedEntry);
+ }
+ } else {
+ this.appendByDeleteAndCreate(formattedEntry);
+ }
+ }
+
+ /**
+ * For certain HDFS implementation, the append operation may not be
supported (e.g., Azure blob - wasb)
+ * in this case, we will emulate the append operation by reading the
content, appending entry at the end,
+ * then recreating the file with appended content.
+ *
+ * @throws java.io.IOException when the file can't be written.
+ */
+ private void appendByDeleteAndCreate(final String appendEntry)
+ throws IOException {
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ try (final InputStream inputStream =
this.fileSystem.open(this.changelogPath)) {
+ IOUtils.copyBytes(inputStream, outputStream, 4096, true);
+ }
+
+ final String newContent = outputStream.toString() + appendEntry;
+ this.fileSystem.delete(this.changelogPath, true);
+
+ try (final FSDataOutputStream newOutput =
this.fileSystem.create(this.changelogPath);
+ final InputStream newInput = new
ByteArrayInputStream(newContent.getBytes())) {
+ IOUtils.copyBytes(newInput, newOutput, 4096, true);
+ }
+ }
+
+ /**
+ * Closes the FileSystem.
+ * @throws Exception
+ */
+ @Override
+ public synchronized void close() throws Exception {
+ if (this.fileSystem != null && !this.fsClosed) {
+ this.fileSystem.close();
+ this.fsClosed = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
new file mode 100644
index 0000000..b5e3881
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.reef.annotations.audience.Private;
+
+import java.io.IOException;
+
+/**
+ * The Evaluator log writer that writes to DFS. Currently supports regular
append and append by overwrite.
+ * Actual log entries should be immutable and no entry should ever be deleted.
To remove an evaluator, a
+ * removal entry should be preferred.
+ */
+@Private
+public interface DFSEvaluatorLogWriter extends AutoCloseable {
+
+ /**
+ * Writes a formatted entry (addition or removal) for an Evaluator ID into
the DFS evaluator log.
+ * @param formattedEntry The formatted entry (entry with evaluator ID and
addition/removal information)
+ * @throws IOException
+ */
+ void writeToEvaluatorLog(final String formattedEntry) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
new file mode 100644
index 0000000..409e392
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory;
+import org.apache.reef.driver.parameters.FailDriverOnEvaluatorLogErrors;
+import org.apache.reef.exception.DriverFatalRuntimeException;
+import org.apache.reef.runtime.common.driver.EvaluatorPreserver;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.*;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An Evaluator Preserver that uses the DFS on YARN.
+ */
+@DriverSide
+@RuntimeAuthor
+@Unstable
+public final class DFSEvaluatorPreserver implements EvaluatorPreserver,
AutoCloseable {
+ private static final Logger LOG =
Logger.getLogger(DFSEvaluatorPreserver.class.getName());
+
+ private static final String ADD_FLAG = "+";
+
+ private static final String REMOVE_FLAG = "-";
+
+ private final boolean failDriverOnEvaluatorLogErrors;
+
+ private DFSEvaluatorLogWriter writer;
+
+ private Path changeLogLocation;
+
+ private FileSystem fileSystem;
+
+ private boolean writerClosed = false;
+
+ @Inject
DFSEvaluatorPreserver(@Parameter(FailDriverOnEvaluatorLogErrors.class)
+ final boolean failDriverOnEvaluatorLogErrors) {
+ this(failDriverOnEvaluatorLogErrors, "/ReefApplications/" +
EvaluatorManager.getJobIdentifier());
+ }
+
+ @Inject
+ private
DFSEvaluatorPreserver(@Parameter(FailDriverOnEvaluatorLogErrors.class)
+ final boolean failDriverOnEvaluatorLogErrors,
+ @Parameter(DriverJobSubmissionDirectory.class)
+ final String jobSubmissionDirectory) {
+
+ this.failDriverOnEvaluatorLogErrors = failDriverOnEvaluatorLogErrors;
+
+ try {
+ final org.apache.hadoop.conf.Configuration config = new
org.apache.hadoop.conf.Configuration();
+ this.fileSystem = FileSystem.get(config);
+ this.changeLogLocation =
+ new Path(StringUtils.stripEnd(jobSubmissionDirectory, "/") +
"/evaluatorsChangesLog");
+
+ boolean appendSupported = config.getBoolean("dfs.support.append", false);
+
+ if (appendSupported) {
+ this.writer = new DFSEvaluatorLogAppendWriter(this.fileSystem,
this.changeLogLocation);
+ } else {
+ this.writer = new DFSEvaluatorLogOverwriteWriter(this.fileSystem,
this.changeLogLocation);
+ }
+ } catch (final IOException e) {
+ final String errMsg = "Cannot read from log file with Exception " + e +
+ ", evaluators will not be recovered.";
+ final String fatalMsg = "Driver was not able to instantiate FileSystem.";
+
+ this.handleException(e, errMsg, fatalMsg);
+ this.fileSystem = null;
+ this.changeLogLocation = null;
+ this.writer = null;
+ }
+ }
+
+ /**
+ * Recovers the set of evaluators that are alive.
+ * @return
+ */
+ @Override
+ public synchronized Set<String> recoverEvaluators() {
+ final Set<String> expectedContainers = new HashSet<>();
+ try {
+ if (this.fileSystem == null || this.changeLogLocation == null) {
+ LOG.log(Level.WARNING, "Unable to recover evaluators due to failure to
instantiate FileSystem. Returning an" +
+ " empty set.");
+ return expectedContainers;
+ }
+
+ if (!this.fileSystem.exists(this.changeLogLocation)) {
+ // empty set
+ return expectedContainers;
+ } else {
+ final BufferedReader br =
+ new BufferedReader(new
InputStreamReader(this.fileSystem.open(this.changeLogLocation)));
+ String line = br.readLine();
+ while (line != null) {
+ if (line.startsWith(ADD_FLAG)) {
+ final String containerId = line.substring(ADD_FLAG.length());
+ if (expectedContainers.contains(containerId)) {
+ LOG.log(Level.WARNING, "Duplicated add container record found in
the change log for container " +
+ containerId);
+ } else {
+ expectedContainers.add(containerId);
+ }
+ } else if (line.startsWith(REMOVE_FLAG)) {
+ final String containerId = line.substring(REMOVE_FLAG.length());
+ if (!expectedContainers.contains(containerId)) {
+ LOG.log(Level.WARNING, "Change log includes record that try to
remove non-exist or duplicate " +
+ "remove record for container + " + containerId);
+ }
+ expectedContainers.remove(containerId);
+ }
+ line = br.readLine();
+ }
+ br.close();
+ }
+ } catch (final IOException e) {
+ final String errMsg = "Cannot read from log file with Exception " + e +
+ ", evaluators will not be recovered.";
+
+ final String fatalMsg = "Cannot read from evaluator log.";
+
+ this.handleException(e, errMsg, fatalMsg);
+ }
+ return expectedContainers;
+ }
+
+ /**
+ * Adds the allocated evaluator entry to the evaluator log.
+ * @param id
+ */
+ @Override
+ public synchronized void recordAllocatedEvaluator(final String id) {
+ if (this.fileSystem != null && this.changeLogLocation != null) {
+ final String entry = ADD_FLAG + id + System.lineSeparator();
+ this.logContainerChange(entry);
+ }
+ }
+
+ /**
+ * Adds the removed evaluator entry to the evaluator log.
+ * @param id
+ */
+ @Override
+ public synchronized void recordRemovedEvaluator(final String id) {
+ if (this.fileSystem != null && this.changeLogLocation != null) {
+ final String entry = REMOVE_FLAG + id + System.lineSeparator();
+ this.logContainerChange(entry);
+ }
+ }
+
+ private void logContainerChange(final String entry) {
+ try {
+ this.writer.writeToEvaluatorLog(entry);
+ } catch (final IOException e) {
+ final String errorMsg = "Unable to log the change of container [" +
entry +
+ "] to the container log. Driver restart won't work properly.";
+
+ final String fatalMsg = "Unable to log container change.";
+
+ this.handleException(e, errorMsg, fatalMsg);
+ }
+ }
+
+ private void handleException(final Exception e, final String errorMsg, final
String fatalMsg){
+ if (this.failDriverOnEvaluatorLogErrors) {
+ final Level logLevel;
+ if (this.failDriverOnEvaluatorLogErrors) {
+ logLevel = Level.SEVERE;
+ } else {
+ logLevel = Level.WARNING;
+ }
+
+ LOG.log(logLevel, errorMsg, e);
+
+ if (this.failDriverOnEvaluatorLogErrors) {
+ try {
+ this.close();
+ } catch (Exception e1) {
+ LOG.log(Level.SEVERE, "Failed on closing resource with " +
e1.getStackTrace());
+ }
+
+ if (fatalMsg != null) {
+ throw new DriverFatalRuntimeException(fatalMsg, e);
+ } else {
+ throw new DriverFatalRuntimeException("Driver failed on Evaluator
log error.", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Closes the writer, which in turn closes the FileSystem.
+ * @throws Exception
+ */
+ @Override
+ public synchronized void close() throws Exception {
+ if (this.writer != null && !this.writerClosed) {
+ this.writer.close();
+ this.writerClosed = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0181e2cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/package-info.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/package-info.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/package-info.java
new file mode 100644
index 0000000..8dde2fb
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The package contains classes that are used specifically on restart.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
\ No newline at end of file