Repository: giraph
Updated Branches:
  refs/heads/trunk 2aa3af8d9 -> 0afeb6360


GIRAPH-966: Add a way to ignore some thread exceptions


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/0afeb636
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/0afeb636
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/0afeb636

Branch: refs/heads/trunk
Commit: 0afeb63605cfcf3afacbf3fd43f63e69bf996b70
Parents: 2aa3af8
Author: Maja Kabiljo <[email protected]>
Authored: Mon Dec 1 11:39:33 2014 -0800
Committer: Maja Kabiljo <[email protected]>
Committed: Mon Dec 1 11:39:33 2014 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../apache/giraph/graph/GraphTaskManager.java   | 58 +++++++++++++++++++-
 2 files changed, 59 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/0afeb636/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index cf914e8..69618fd 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.2.0 - unreleased
+  GIRAPH-966: Add a way to ignore some thread exceptions (majakabiljo)
+
   GIRAPH-964: Remove quotes from output partition specification in hive-io 
(majakabiljo)
 
   GIRAPH-963: Aggregators may not be initialized properly (edunov via 
majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/0afeb636/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java 
b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index eb9fad3..c76dd9e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -36,6 +36,7 @@ import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.bsp.CheckpointStatus;
 import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.conf.ClassConfOption;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.job.JobProgressTracker;
@@ -92,6 +93,17 @@ public class GraphTaskManager<I extends WritableComparable, 
V extends Writable,
     Configuration.addDefaultResource("giraph-site.xml");
   }
   end[PURE_YARN]*/
+  /**
+   * Class which checks if an exception on some thread should cause worker
+   * to fail
+   */
+  public static final ClassConfOption<CheckerIfWorkerShouldFailAfterException>
+  CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS = ClassConfOption.create(
+      "giraph.checkerIfWorkerShouldFailAfterExceptionClass",
+      FailWithEveryException.class,
+      CheckerIfWorkerShouldFailAfterException.class,
+      "Class which checks if an exception on some thread should cause worker " 
+
+          "to fail, by default all exceptions cause failure");
   /** Name of metric for superstep time in msec */
   public static final String TIMER_SUPERSTEP_TIME = "superstep-time-ms";
   /** Name of metric for compute on all vertices in msec */
@@ -976,7 +988,9 @@ public class GraphTaskManager<I extends WritableComparable, 
V extends Writable,
    * @return new exception handler object.
    */
   public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() {
-    return new OverrideExceptionHandler();
+    return new OverrideExceptionHandler(
+        CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS.newInstance(
+            getConf()));
   }
 
   public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
@@ -989,8 +1003,25 @@ public class GraphTaskManager<I extends 
WritableComparable, V extends Writable,
    * It will do the best to clean up and then will terminate current giraph 
job.
    */
   class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
+    /** Checker if worker should fail after a thread gets an exception */
+    private final CheckerIfWorkerShouldFailAfterException checker;
+
+    /**
+     * Constructor
+     *
+     * @param checker Checker if worker should fail after a thread gets an
+     *                exception
+     */
+    public OverrideExceptionHandler(
+        CheckerIfWorkerShouldFailAfterException checker) {
+      this.checker = checker;
+    }
+
     @Override
     public void uncaughtException(final Thread t, final Throwable e) {
+      if (!checker.checkIfWorkerShouldFail(t, e)) {
+        return;
+      }
       try {
         LOG.fatal(
             "uncaughtException: OverrideExceptionHandler on thread " +
@@ -1003,4 +1034,29 @@ public class GraphTaskManager<I extends 
WritableComparable, V extends Writable,
       }
     }
   }
+
+  /**
+   * Interface to check if worker should fail after a thread gets an exception
+   */
+  public interface CheckerIfWorkerShouldFailAfterException {
+    /**
+     * Check if worker should fail after a thread gets an exception
+     *
+     * @param thread Thread which raised the exception
+     * @param exception Exception which occurred
+     * @return True iff worker should fail after this exception
+     */
+    boolean checkIfWorkerShouldFail(Thread thread, Throwable exception);
+  }
+
+  /**
+   * Class to use by default, where each exception causes job failure
+   */
+  public static class FailWithEveryException
+      implements CheckerIfWorkerShouldFailAfterException {
+    @Override
+    public boolean checkIfWorkerShouldFail(Thread thread, Throwable exception) 
{
+      return true;
+    }
+  }
 }

Reply via email to