Repository: giraph Updated Branches: refs/heads/trunk faf339206 -> 819f293f4
GIRAPH-1083: Make sure we fail after exception in ooc-io thread happens Summary: Currently if some exception happens in ooc-io thread the job is left running for long time after the exception. We should make sure we fail early. Test Plan: Ran a job with ooc on where I simulated the failure, without change job hangs for a long time, with the change it fails right after the exception happens, and logs it to command line. Differential Revision: https://reviews.facebook.net/D60291 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/819f293f Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/819f293f Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/819f293f Branch: refs/heads/trunk Commit: 819f293f4c780fc6833785da27e10f965570f44e Parents: faf3392 Author: Maja Kabiljo <[email protected]> Authored: Fri Jul 1 13:26:50 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Fri Jul 1 13:28:02 2016 -0700 ---------------------------------------------------------------------- .../apache/giraph/graph/GraphTaskManager.java | 12 +++++-- .../org/apache/giraph/ooc/OutOfCoreEngine.java | 19 ++-------- .../apache/giraph/ooc/OutOfCoreIOCallable.java | 5 ++- .../giraph/ooc/OutOfCoreIOCallableFactory.java | 38 +++++--------------- .../giraph/utils/LogStacktraceCallable.java | 21 ++++++++++- .../org/apache/giraph/utils/ThreadUtils.java | 24 +++++++++++++ 6 files changed, 67 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index a1d8522..4d97e5f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit; import com.sun.management.GarbageCollectionNotificationInfo; import com.yammer.metrics.core.Counter; + +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceMaster; import org.apache.giraph.bsp.CentralizedServiceWorker; @@ -1043,7 +1045,7 @@ end[PURE_YARN]*/ public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() { return new OverrideExceptionHandler( CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS.newInstance( - getConf())); + getConf()), getJobProgressTracker()); } public ImmutableClassesGiraphConfiguration<I, V, E> getConf() { @@ -1079,16 +1081,21 @@ end[PURE_YARN]*/ class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler { /** Checker if worker should fail after a thread gets an exception */ private final CheckerIfWorkerShouldFailAfterException checker; + /** JobProgressTracker to log problems to */ + private final JobProgressTracker jobProgressTracker; /** * Constructor * * @param checker Checker if worker should fail after a thread gets an * exception + * @param jobProgressTracker JobProgressTracker to log problems to */ public OverrideExceptionHandler( - CheckerIfWorkerShouldFailAfterException checker) { + CheckerIfWorkerShouldFailAfterException checker, + JobProgressTracker jobProgressTracker) { this.checker = checker; + this.jobProgressTracker = jobProgressTracker; } @Override @@ -1100,6 +1107,7 @@ end[PURE_YARN]*/ LOG.fatal( "uncaughtException: OverrideExceptionHandler on thread " + t.getName() + ", msg = " + e.getMessage() + ", exiting...", e); + jobProgressTracker.logError(ExceptionUtils.getStackTrace(e)); zooKeeperCleanup(); workerFailureCleanup(); http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java index 3187468..d5bfd4f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java @@ -77,11 +77,6 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver { private final MetaPartitionManager metaPartitionManager; /** Out-of-core oracle (brain of out-of-core mechanism) */ private final OutOfCoreOracle oracle; - /** - * Whether the job should fail due to IO threads terminating because of - * exceptions - */ - private volatile boolean jobFailed = false; /** IO statistics collector */ private final OutOfCoreIOStatistics statistics; /** @@ -167,7 +162,8 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver { } int numIOThreads = dataAccessor.getNumAccessorThreads(); this.oocIOCallableFactory = - new OutOfCoreIOCallableFactory(this, numIOThreads); + new OutOfCoreIOCallableFactory(this, numIOThreads, + service.getGraphTaskManager().createUncaughtExceptionHandler()); this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads); this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this); this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads); @@ -307,10 +303,6 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver { "InterruptedException while waiting to retrieve a partition to " + "process"); } - if (jobFailed) { - throw new RuntimeException("Job Failed due to a failure in an " + - "out-of-core IO thread!"); - } } if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) { partitionAvailable.notifyAll(); @@ -410,13 +402,6 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver { } /** - * Set a flag to fail the job. - */ - public void failTheJob() { - jobFailed = true; - } - - /** * Update the fraction of processing threads that should remain active. It is * the responsibility of out-of-core oracle to update the number of active * threads. http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java index bea3994..829ad80 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java @@ -112,9 +112,8 @@ public class OutOfCoreIOCallable implements Callable<Void>, 1000 / 1024 / 1024)))); } } catch (Exception e) { - oocEngine.failTheJob(); - LOG.error("call: execution of IO command " + command + " failed!"); - throw new RuntimeException(e); + throw new RuntimeException( + "call: execution of IO command " + command + " failed!", e); } // CHECKSTYLE: resume IllegalCatch if (!(command instanceof WaitIOCommand)) { http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java index 6aeb196..b8b730e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java @@ -19,7 +19,6 @@ package org.apache.giraph.ooc; import org.apache.giraph.utils.CallableFactory; -import org.apache.giraph.utils.LogStacktraceCallable; import org.apache.giraph.utils.ThreadUtils; import org.apache.log4j.Logger; @@ -46,20 +45,24 @@ public class OutOfCoreIOCallableFactory { private final List<Future> results; /** Number of threads used for IO operations */ private final int numIOThreads; + /** Thread UncaughtExceptionHandler to use */ + private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler; /** Executor service for IO threads */ private ExecutorService outOfCoreIOExecutor; /** * Constructor - * * @param oocEngine Out-of-core engine * @param numIOThreads Number of IO threads used + * @param uncaughtExceptionHandler Thread UncaughtExceptionHandler to use */ public OutOfCoreIOCallableFactory(OutOfCoreEngine oocEngine, - int numIOThreads) { + int numIOThreads, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { this.oocEngine = oocEngine; this.numIOThreads = numIOThreads; this.results = new ArrayList<>(numIOThreads); + this.uncaughtExceptionHandler = uncaughtExceptionHandler; } /** @@ -75,34 +78,11 @@ public class OutOfCoreIOCallableFactory { }; outOfCoreIOExecutor = new ThreadPoolExecutor(numIOThreads, numIOThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), - ThreadUtils.createThreadFactory("ooc-io-%d")) { - @Override - protected void afterExecute(Runnable r, Throwable t) { - super.afterExecute(r, t); - if (t == null && r instanceof Future<?>) { - try { - Future<?> future = (Future<?>) r; - if (future.isDone()) { - future.get(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (ExecutionException e) { - t = e; - } - if (t != null) { - LOG.info("afterExecute: an out-of-core thread terminated " + - "unexpectedly with " + t); - oocEngine.failTheJob(); - } - } - } - }; + ThreadUtils.createThreadFactory("ooc-io-%d")); for (int i = 0; i < numIOThreads; ++i) { - Future<Void> future = outOfCoreIOExecutor.submit( - new LogStacktraceCallable<>( - outOfCoreIOCallableFactory.newCallable(i))); + Future<Void> future = ThreadUtils.submitToExecutor(outOfCoreIOExecutor, + outOfCoreIOCallableFactory.newCallable(i), uncaughtExceptionHandler); results.add(future); } // Notify executor to not accept any more tasks http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java b/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java index 730825d..3b659aa 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java @@ -34,7 +34,9 @@ public class LogStacktraceCallable<V> implements Callable<V> { Logger.getLogger(LogStacktraceCallable.class); /** Pass call() to this callable. */ - private Callable<V> callable; + private final Callable<V> callable; + /** Uncaught exception handler, if any */ + private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler; /** * Construct an instance that will pass call() to the given callable. @@ -42,7 +44,21 @@ public class LogStacktraceCallable<V> implements Callable<V> { * @param callable Callable */ public LogStacktraceCallable(Callable<V> callable) { + this(callable, null); + } + + /** + * Construct an instance that will pass call() to the given callable. + * + * @param callable Callable + * @param uncaughtExceptionHandler Uncaught exception handler, if any + * + * + */ + public LogStacktraceCallable(Callable<V> callable, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { this.callable = callable; + this.uncaughtExceptionHandler = uncaughtExceptionHandler; } @Override @@ -55,6 +71,9 @@ public class LogStacktraceCallable<V> implements Callable<V> { } catch (Exception e) { // CHECKSTYLE: resume IllegalCatch LOG.error("Execution of callable failed", e); + if (uncaughtExceptionHandler != null) { + uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e); + } throw e; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java index 9518bdc..83eca14 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java @@ -19,6 +19,9 @@ package org.apache.giraph.utils; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; /** @@ -62,4 +65,25 @@ public class ThreadUtils { public static ThreadFactory createThreadFactory(String nameFormat) { return createThreadFactory(nameFormat, null); } + + /** + * Submit a callable to executor service, ensuring any exceptions are + * caught with provided exception handler. + * + * When using submit(), UncaughtExceptionHandler which is set on ThreadFactory + * isn't used, so we need this utility. + * + * @param executorService Executor service to submit callable to + * @param callable Callable to submit + * @param uncaughtExceptionHandler Handler for uncaught exceptions in callable + * @param <T> Type of callable result + * @return Future for callable + */ + public static <T> Future<T> submitToExecutor( + ExecutorService executorService, + Callable<T> callable, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { + return executorService.submit( + new LogStacktraceCallable<>(callable, uncaughtExceptionHandler)); + } }
