Repository: incubator-reef
Updated Branches:
  refs/heads/master a5cc97299 -> 037866c68


[REEF-426] YARN reports success even though REEF job failed

This addressed the issue by
  * Not unregistering the AM if job fails on a RuntimeException.
  * Creating a DriverFatalRuntimeException to inform YARN not to restart
    a job if the driver fails it on purpose.

JIRA:
  [REEF-426](https://issues.apache.org/jira/browse/426)

Pull Request:
  This closes #316


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/037866c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/037866c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/037866c6

Branch: refs/heads/master
Commit: 037866c683c771786e7b4e3fdb85b903f4a349be
Parents: a5cc972
Author: Andrew Chung <[email protected]>
Authored: Fri Jul 24 22:52:46 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Jul 27 15:31:56 2015 -0700

----------------------------------------------------------------------
 .../exception/DriverFatalRuntimeException.java  | 35 ++++++++++++++++++++
 .../common/driver/DriverStartHandler.java       |  4 +--
 .../yarn/driver/YARNRuntimeStopHandler.java     |  2 +-
 .../yarn/driver/YarnContainerManager.java       | 27 +++++++++------
 4 files changed, 55 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/037866c6/lang/java/reef-common/src/main/java/org/apache/reef/exception/DriverFatalRuntimeException.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/exception/DriverFatalRuntimeException.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/exception/DriverFatalRuntimeException.java
new file mode 100644
index 0000000..ce4fbdb
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/exception/DriverFatalRuntimeException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.exception;
+
+/**
+ * A special RuntimeException we throw to indicate to the resource manager to 
never attempt to restart us
+ * if this exception is thrown.
+ */
+public final class DriverFatalRuntimeException extends RuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public DriverFatalRuntimeException(final String msg, final Throwable cause) {
+    super(msg, cause);
+  }
+
+  public DriverFatalRuntimeException(final String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/037866c6/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index 8193a44..92d78ca 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -19,6 +19,7 @@
 package org.apache.reef.runtime.common.driver;
 
 import org.apache.reef.driver.parameters.DriverRestartHandler;
+import org.apache.reef.exception.DriverFatalRuntimeException;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.util.Optional;
 import org.apache.reef.wake.EventHandler;
@@ -75,8 +76,7 @@ public final class DriverStartHandler implements 
EventHandler<StartTime> {
     if (restartHandler.isPresent()) {
       this.restartHandler.get().onNext(startTime);
     } else {
-      // TODO: We might have to indicate this to YARN somehow such that it 
doesn't try another time.
-      throw new RuntimeException("Driver restart happened, but no 
ON_DRIVER_RESTART handler is bound.");
+      throw new DriverFatalRuntimeException("Driver restart happened, but no 
ON_DRIVER_RESTART handler is bound.");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/037866c6/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
index 98bdd6e..82e6f2f 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
@@ -37,6 +37,6 @@ public final class YARNRuntimeStopHandler implements 
EventHandler<RuntimeStop> {
 
   @Override
   public void onNext(final RuntimeStop runtimeStop) {
-    this.yarnContainerManager.onStop();
+    this.yarnContainerManager.onStop(runtimeStop.getException());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/037866c6/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index 7c15b78..6034086 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.exception.DriverFatalRuntimeException;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.DriverStatusManager;
 import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
@@ -263,7 +264,7 @@ final class YarnContainerManager
     }
   }
 
-  void onStop() {
+  void onStop(final Exception exception) {
 
     LOG.log(Level.FINE, "Stop Runtime: RM status {0}", 
this.resourceManager.getServiceState());
 
@@ -271,8 +272,14 @@ final class YarnContainerManager
       // invariant: if RM is still running then we declare success.
       try {
         this.reefEventHandlers.close();
-        this.resourceManager.unregisterApplicationMaster(
-            FinalApplicationStatus.SUCCEEDED, null, null);
+        if (exception == null) {
+          this.resourceManager.unregisterApplicationMaster(
+              FinalApplicationStatus.SUCCEEDED, null, null);
+        } else if (exception instanceof DriverFatalRuntimeException) {
+          this.resourceManager.unregisterApplicationMaster(
+              FinalApplicationStatus.FAILED, null, null);
+        }
+
         this.resourceManager.close();
       } catch (final Exception e) {
         LOG.log(Level.WARNING, "Error shutting down YARN application", e);
@@ -348,7 +355,7 @@ final class YarnContainerManager
       for (final Container container : previousContainers) {
         LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
         if (!expectedContainers.contains(container.getId().toString())) {
-          throw new RuntimeException("Not expecting container " + 
container.getId().toString());
+          throw new DriverFatalRuntimeException("Not expecting container " + 
container.getId().toString());
         }
         handleNewContainer(container, true);
       }
@@ -576,15 +583,15 @@ final class YarnContainerManager
           if (line.startsWith(ADD_FLAG)) {
             final String containerId = line.substring(ADD_FLAG.length());
             if (expectedContainers.contains(containerId)) {
-              throw new RuntimeException("Duplicated add container record 
found in the change log for container " +
-                  containerId);
+              throw new DriverFatalRuntimeException("Duplicated add container 
record found in the " +
+                  "change log for container " + containerId);
             }
             expectedContainers.add(containerId);
           } else if (line.startsWith(REMOVE_FLAG)) {
             final String containerId = line.substring(REMOVE_FLAG.length());
             if (!expectedContainers.contains(containerId)) {
-              throw new RuntimeException("Change log includes record that try 
to remove non-exist or duplicate " +
-                  "remove record for container + " + containerId);
+              throw new DriverFatalRuntimeException("Change log includes 
record that try to " +
+                  "remove non-exist or duplicate remove record for container + 
" + containerId);
             }
             expectedContainers.remove(containerId);
           }
@@ -593,7 +600,7 @@ final class YarnContainerManager
         br.close();
       }
     } catch (final IOException e) {
-      throw new RuntimeException("Cannot read from log file", e);
+      throw new DriverFatalRuntimeException("Cannot read from log file", e);
     }
     return expectedContainers;
   }
@@ -688,7 +695,7 @@ final class YarnContainerManager
       final String errorMsg = "Unable to log the change of container [" + 
entry +
           "] to the container log. Driver restart won't work properly.";
       LOG.log(Level.WARNING, errorMsg, e);
-      throw new RuntimeException(errorMsg);
+      throw new DriverFatalRuntimeException(errorMsg);
     }
   }
 }

Reply via email to