Repository: incubator-reef
Updated Branches:
refs/heads/master a5cc97299 -> 037866c68
[REEF-426] YARN reports success even though REEF job failed
This addressed the issue by
* Not unregistering the AM if job fails on a RuntimeException.
* Creating a DriverFatalRuntimeException to inform YARN not to restart
a job if the driver fails it on purpose.
JIRA:
[REEF-426](https://issues.apache.org/jira/browse/426)
Pull Request:
This closes #316
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/037866c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/037866c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/037866c6
Branch: refs/heads/master
Commit: 037866c683c771786e7b4e3fdb85b903f4a349be
Parents: a5cc972
Author: Andrew Chung <[email protected]>
Authored: Fri Jul 24 22:52:46 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Jul 27 15:31:56 2015 -0700
----------------------------------------------------------------------
.../exception/DriverFatalRuntimeException.java | 35 ++++++++++++++++++++
.../common/driver/DriverStartHandler.java | 4 +--
.../yarn/driver/YARNRuntimeStopHandler.java | 2 +-
.../yarn/driver/YarnContainerManager.java | 27 +++++++++------
4 files changed, 55 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/037866c6/lang/java/reef-common/src/main/java/org/apache/reef/exception/DriverFatalRuntimeException.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/exception/DriverFatalRuntimeException.java
b/lang/java/reef-common/src/main/java/org/apache/reef/exception/DriverFatalRuntimeException.java
new file mode 100644
index 0000000..ce4fbdb
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/exception/DriverFatalRuntimeException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.exception;
+
+/**
+ * A special RuntimeException we throw to indicate to the resource manager to
never attempt to restart us
+ * if this exception is thrown.
+ */
+public final class DriverFatalRuntimeException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public DriverFatalRuntimeException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+
+ public DriverFatalRuntimeException(final String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/037866c6/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index 8193a44..92d78ca 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -19,6 +19,7 @@
package org.apache.reef.runtime.common.driver;
import org.apache.reef.driver.parameters.DriverRestartHandler;
+import org.apache.reef.exception.DriverFatalRuntimeException;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.EventHandler;
@@ -75,8 +76,7 @@ public final class DriverStartHandler implements
EventHandler<StartTime> {
if (restartHandler.isPresent()) {
this.restartHandler.get().onNext(startTime);
} else {
- // TODO: We might have to indicate this to YARN somehow such that it
doesn't try another time.
- throw new RuntimeException("Driver restart happened, but no
ON_DRIVER_RESTART handler is bound.");
+ throw new DriverFatalRuntimeException("Driver restart happened, but no
ON_DRIVER_RESTART handler is bound.");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/037866c6/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
index 98bdd6e..82e6f2f 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
@@ -37,6 +37,6 @@ public final class YARNRuntimeStopHandler implements
EventHandler<RuntimeStop> {
@Override
public void onNext(final RuntimeStop runtimeStop) {
- this.yarnContainerManager.onStop();
+ this.yarnContainerManager.onStop(runtimeStop.getException());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/037866c6/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index 7c15b78..6034086 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.exception.DriverFatalRuntimeException;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.driver.DriverStatusManager;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
@@ -263,7 +264,7 @@ final class YarnContainerManager
}
}
- void onStop() {
+ void onStop(final Exception exception) {
LOG.log(Level.FINE, "Stop Runtime: RM status {0}",
this.resourceManager.getServiceState());
@@ -271,8 +272,14 @@ final class YarnContainerManager
// invariant: if RM is still running then we declare success.
try {
this.reefEventHandlers.close();
- this.resourceManager.unregisterApplicationMaster(
- FinalApplicationStatus.SUCCEEDED, null, null);
+ if (exception == null) {
+ this.resourceManager.unregisterApplicationMaster(
+ FinalApplicationStatus.SUCCEEDED, null, null);
+ } else if (exception instanceof DriverFatalRuntimeException) {
+ this.resourceManager.unregisterApplicationMaster(
+ FinalApplicationStatus.FAILED, null, null);
+ }
+
this.resourceManager.close();
} catch (final Exception e) {
LOG.log(Level.WARNING, "Error shutting down YARN application", e);
@@ -348,7 +355,7 @@ final class YarnContainerManager
for (final Container container : previousContainers) {
LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
if (!expectedContainers.contains(container.getId().toString())) {
- throw new RuntimeException("Not expecting container " +
container.getId().toString());
+ throw new DriverFatalRuntimeException("Not expecting container " +
container.getId().toString());
}
handleNewContainer(container, true);
}
@@ -576,15 +583,15 @@ final class YarnContainerManager
if (line.startsWith(ADD_FLAG)) {
final String containerId = line.substring(ADD_FLAG.length());
if (expectedContainers.contains(containerId)) {
- throw new RuntimeException("Duplicated add container record
found in the change log for container " +
- containerId);
+ throw new DriverFatalRuntimeException("Duplicated add container
record found in the " +
+ "change log for container " + containerId);
}
expectedContainers.add(containerId);
} else if (line.startsWith(REMOVE_FLAG)) {
final String containerId = line.substring(REMOVE_FLAG.length());
if (!expectedContainers.contains(containerId)) {
- throw new RuntimeException("Change log includes record that try
to remove non-exist or duplicate " +
- "remove record for container + " + containerId);
+ throw new DriverFatalRuntimeException("Change log includes
record that try to " +
+ "remove non-exist or duplicate remove record for container +
" + containerId);
}
expectedContainers.remove(containerId);
}
@@ -593,7 +600,7 @@ final class YarnContainerManager
br.close();
}
} catch (final IOException e) {
- throw new RuntimeException("Cannot read from log file", e);
+ throw new DriverFatalRuntimeException("Cannot read from log file", e);
}
return expectedContainers;
}
@@ -688,7 +695,7 @@ final class YarnContainerManager
final String errorMsg = "Unable to log the change of container [" +
entry +
"] to the container log. Driver restart won't work properly.";
LOG.log(Level.WARNING, errorMsg, e);
- throw new RuntimeException(errorMsg);
+ throw new DriverFatalRuntimeException(errorMsg);
}
}
}