Repository: giraph Updated Branches: refs/heads/trunk 2aa3af8d9 -> 0afeb6360
GIRAPH-966: Add a way to ignore some thread exceptions Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/0afeb636 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/0afeb636 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/0afeb636 Branch: refs/heads/trunk Commit: 0afeb63605cfcf3afacbf3fd43f63e69bf996b70 Parents: 2aa3af8 Author: Maja Kabiljo <[email protected]> Authored: Mon Dec 1 11:39:33 2014 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Mon Dec 1 11:39:33 2014 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/graph/GraphTaskManager.java | 58 +++++++++++++++++++- 2 files changed, 59 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/0afeb636/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index cf914e8..69618fd 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.2.0 - unreleased + GIRAPH-966: Add a way to ignore some thread exceptions (majakabiljo) + GIRAPH-964: Remove quotes from output partition specification in hive-io (majakabiljo) GIRAPH-963: Aggregators may not be initialized properly (edunov via majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/0afeb636/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index eb9fad3..c76dd9e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -36,6 +36,7 @@ import org.apache.giraph.bsp.CentralizedServiceMaster; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.bsp.CheckpointStatus; import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.conf.ClassConfOption; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.job.JobProgressTracker; @@ -92,6 +93,17 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, Configuration.addDefaultResource("giraph-site.xml"); } end[PURE_YARN]*/ + /** + * Class which checks if an exception on some thread should cause worker + * to fail + */ + public static final ClassConfOption<CheckerIfWorkerShouldFailAfterException> + CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS = ClassConfOption.create( + "giraph.checkerIfWorkerShouldFailAfterExceptionClass", + FailWithEveryException.class, + CheckerIfWorkerShouldFailAfterException.class, + "Class which checks if an exception on some thread should cause worker " + + "to fail, by default all exceptions cause failure"); /** Name of metric for superstep time in msec */ public static final String TIMER_SUPERSTEP_TIME = "superstep-time-ms"; /** Name of metric for compute on all vertices in msec */ @@ -976,7 +988,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, * @return new exception handler object. */ public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() { - return new OverrideExceptionHandler(); + return new OverrideExceptionHandler( + CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS.newInstance( + getConf())); } public ImmutableClassesGiraphConfiguration<I, V, E> getConf() { @@ -989,8 +1003,25 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, * It will do the best to clean up and then will terminate current giraph job. */ class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler { + /** Checker if worker should fail after a thread gets an exception */ + private final CheckerIfWorkerShouldFailAfterException checker; + + /** + * Constructor + * + * @param checker Checker if worker should fail after a thread gets an + * exception + */ + public OverrideExceptionHandler( + CheckerIfWorkerShouldFailAfterException checker) { + this.checker = checker; + } + @Override public void uncaughtException(final Thread t, final Throwable e) { + if (!checker.checkIfWorkerShouldFail(t, e)) { + return; + } try { LOG.fatal( "uncaughtException: OverrideExceptionHandler on thread " + @@ -1003,4 +1034,29 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, } } } + + /** + * Interface to check if worker should fail after a thread gets an exception + */ + public interface CheckerIfWorkerShouldFailAfterException { + /** + * Check if worker should fail after a thread gets an exception + * + * @param thread Thread which raised the exception + * @param exception Exception which occurred + * @return True iff worker should fail after this exception + */ + boolean checkIfWorkerShouldFail(Thread thread, Throwable exception); + } + + /** + * Class to use by default, where each exception causes job failure + */ + public static class FailWithEveryException + implements CheckerIfWorkerShouldFailAfterException { + @Override + public boolean checkIfWorkerShouldFail(Thread thread, Throwable exception) { + return true; + } + } }
