GIRAPH-903: Detect crashes of Netty threads (edunov via pavanka)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/61cb37ec Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/61cb37ec Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/61cb37ec Branch: refs/heads/trunk Commit: 61cb37ecd50b0d9400873624e46692c3282e4cfc Parents: 7f9218a Author: Pavan Kumar <[email protected]> Authored: Tue Jul 8 12:08:53 2014 -0700 Committer: Pavan Kumar <[email protected]> Committed: Tue Jul 8 12:11:12 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + findbugs-exclude.xml | 6 +-- .../apache/giraph/comm/netty/NettyClient.java | 38 +++++++++++--- .../giraph/comm/netty/NettyMasterClient.java | 8 ++- .../giraph/comm/netty/NettyMasterServer.java | 6 ++- .../apache/giraph/comm/netty/NettyServer.java | 36 ++++++++----- .../giraph/comm/netty/NettyWorkerClient.java | 8 ++- .../giraph/comm/netty/NettyWorkerServer.java | 6 ++- .../handler/MasterRequestServerHandler.java | 11 ++-- .../netty/handler/RequestServerHandler.java | 18 ++++--- .../handler/WorkerRequestServerHandler.java | 11 ++-- .../org/apache/giraph/graph/GraphMapper.java | 23 +++------ .../apache/giraph/graph/GraphTaskManager.java | 30 +++++++++++ .../apache/giraph/master/BspServiceMaster.java | 9 ++-- .../org/apache/giraph/utils/ThreadUtils.java | 54 ++++++++++++++++++++ .../apache/giraph/worker/BspServiceWorker.java | 6 ++- .../org/apache/giraph/yarn/GiraphYarnTask.java | 13 ----- .../org/apache/giraph/comm/ConnectionTest.java | 26 ++++++---- .../giraph/comm/MockExceptionHandler.java | 26 ++++++++++ .../apache/giraph/comm/RequestFailureTest.java | 5 +- .../org/apache/giraph/comm/RequestTest.java | 5 +- .../apache/giraph/comm/SaslConnectionTest.java | 6 ++- 22 files changed, 255 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 834b45f..13dfcd7 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-903: Detect crashes of Netty threads (edunov via pavanka) + GIRAPH-925: Unit tests should pass even if zookeeper port not available (edunov via pavanka) GIRAPH-713: Provide an option to do request compression (pavanka) http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml index e0466f7..9ac4412 100644 --- a/findbugs-exclude.xml +++ b/findbugs-exclude.xml @@ -39,11 +39,7 @@ <Bug pattern="DM_EXIT"/> </Match> <Match> - <Class name="org.apache.giraph.graph.GraphMapper$OverrideExceptionHandler"/> - <Bug pattern="DM_EXIT"/> - </Match> - <Match> - <Class name="org.apache.giraph.yarn.GiraphYarnTask$OverrideExceptionHandler"/> + <Class name="org.apache.giraph.graph.GraphTaskManager$OverrideExceptionHandler"/> <Bug pattern="DM_EXIT"/> </Match> <Match> http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java index 5bb5545..97394bf 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java @@ -35,6 +35,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.TaskInfo; import org.apache.giraph.utils.PipelineUtils; import org.apache.giraph.utils.ProgressableUtils; +import org.apache.giraph.utils.ThreadUtils; import org.apache.giraph.utils.TimedLogger; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; @@ -42,7 +43,6 @@ import org.apache.log4j.Logger; import com.google.common.collect.Lists; import com.google.common.collect.MapMaker; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.net.InetSocketAddress; @@ -175,6 +175,12 @@ public class NettyClient { /** When was the last time we checked if we should resend some requests */ private final AtomicLong lastTimeCheckedRequestsForProblems = new AtomicLong(0); + /** + * Logger used to dump stack traces for every exception that happens + * in netty client threads. + */ + private final LogOnErrorChannelFutureListener logErrorListener = + new LogOnErrorChannelFutureListener(); /** * Only constructor @@ -182,10 +188,13 @@ public class NettyClient { * @param context Context for progress * @param conf Configuration * @param myTaskInfo Current task info + * @param exceptionHandler handler for uncaught exception. Will + * terminate job. */ public NettyClient(Mapper<?, ?, ?, ?>.Context context, final ImmutableClassesGiraphConfiguration conf, - TaskInfo myTaskInfo) { + TaskInfo myTaskInfo, + final Thread.UncaughtExceptionHandler exceptionHandler) { this.context = context; this.myTaskInfo = myTaskInfo; this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf); @@ -226,8 +235,8 @@ public class NettyClient { if (useExecutionGroup) { int executionThreads = NETTY_CLIENT_EXECUTION_THREADS.get(conf); executionGroup = new DefaultEventExecutorGroup(executionThreads, - new ThreadFactoryBuilder().setNameFormat("netty-client-exec-%d") - .build()); + ThreadUtils.createThreadFactory( + "netty-client-exec-%d", exceptionHandler)); if (LOG.isInfoEnabled()) { LOG.info("NettyClient: Using execution handler with " + executionThreads + " threads after " + @@ -238,8 +247,8 @@ public class NettyClient { } workerGroup = new NioEventLoopGroup(maxPoolSize, - new ThreadFactoryBuilder().setNameFormat( - "netty-client-worker-%d").build()); + ThreadUtils.createThreadFactory( + "netty-client-worker-%d", exceptionHandler)); bootstrap = new Bootstrap(); bootstrap.group(workerGroup) @@ -696,6 +705,7 @@ public class NettyClient { } ChannelFuture writeFuture = channel.write(request); newRequestInfo.setWriteFuture(writeFuture); + writeFuture.addListener(logErrorListener); if (limitNumberOfOpenRequests && clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) { @@ -868,6 +878,7 @@ public class NettyClient { } ChannelFuture writeFuture = channel.write(requestInfo.getRequest()); requestInfo.setWriteFuture(writeFuture); + writeFuture.addListener(logErrorListener); } addedRequestIds.clear(); addedRequestInfos.clear(); @@ -906,4 +917,19 @@ public class NettyClient { } return address; } + + /** + * This listener class just dumps exception stack traces if + * something happens. + */ + private static class LogOnErrorChannelFutureListener + implements ChannelFutureListener { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isDone() && !future.isSuccess()) { + LOG.error("Request failed", future.cause()); + } + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java index c982209..1218d29 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java @@ -54,12 +54,16 @@ public class NettyMasterClient implements MasterClient { * @param context Context from mapper * @param configuration Configuration * @param service Centralized service + * @param exceptionHandler handler for uncaught exception. Will + * terminate job. */ public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration configuration, - CentralizedServiceMaster<?, ?, ?> service) { + CentralizedServiceMaster<?, ?, ?> service, + Thread.UncaughtExceptionHandler exceptionHandler) { this.nettyClient = - new NettyClient(context, configuration, service.getMasterInfo()); + new NettyClient(context, configuration, service.getMasterInfo(), + exceptionHandler); this.service = service; this.progressable = context; maxBytesPerAggregatorRequest = configuration.getInt( http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java index cb36c3e..1c05910 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java @@ -39,13 +39,15 @@ public class NettyMasterServer implements MasterServer { * @param conf Hadoop configuration * @param service Centralized service * @param progressable Progressable for reporting progress + * @param exceptionHandler to handle uncaught exceptions */ public NettyMasterServer(ImmutableClassesGiraphConfiguration conf, CentralizedServiceMaster<?, ?, ?> service, - Progressable progressable) { + Progressable progressable, + Thread.UncaughtExceptionHandler exceptionHandler) { nettyServer = new NettyServer(conf, new MasterRequestServerHandler.Factory(service.getAggregatorHandler()), - service.getMasterInfo(), progressable); + service.getMasterInfo(), progressable, exceptionHandler); nettyServer.start(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java index 8162857..454232a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java @@ -33,6 +33,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.TaskInfo; import org.apache.giraph.utils.PipelineUtils; import org.apache.giraph.utils.ProgressableUtils; +import org.apache.giraph.utils.ThreadUtils; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; import io.netty.bootstrap.ServerBootstrap; @@ -54,8 +55,6 @@ import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.channel.AdaptiveRecvByteBufAllocator; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -122,6 +121,8 @@ public class NettyServer { private final EventExecutorGroup executionGroup; /** Name of the handler before the execution handler (if used) */ private final String handlerToUseExecutionGroup; + /** Handles all uncaught exceptions in netty threads */ + private final Thread.UncaughtExceptionHandler exceptionHandler; /** * Constructor for creating the server @@ -130,10 +131,12 @@ public class NettyServer { * @param requestServerHandlerFactory Factory for request handlers * @param myTaskInfo Current task info * @param progressable Progressable for reporting progress + * @param exceptionHandler handle uncaught exceptions */ public NettyServer(ImmutableClassesGiraphConfiguration conf, RequestServerHandler.Factory requestServerHandlerFactory, - TaskInfo myTaskInfo, Progressable progressable) { + TaskInfo myTaskInfo, Progressable progressable, + Thread.UncaughtExceptionHandler exceptionHandler) { this.conf = conf; this.progressable = progressable; this.requestServerHandlerFactory = requestServerHandlerFactory; @@ -141,6 +144,7 @@ public class NettyServer { this.saslServerHandlerFactory = new SaslServerHandler.Factory(); /*end[HADOOP_NON_SECURE]*/ this.myTaskInfo = myTaskInfo; + this.exceptionHandler = exceptionHandler; sendBufferSize = GiraphConstants.SERVER_SEND_BUFFER_SIZE.get(conf); receiveBufferSize = GiraphConstants.SERVER_RECEIVE_BUFFER_SIZE.get(conf); @@ -149,12 +153,12 @@ public class NettyServer { maxPoolSize = GiraphConstants.NETTY_SERVER_THREADS.get(conf); bossGroup = new NioEventLoopGroup(4, - new ThreadFactoryBuilder().setNameFormat( - "netty-server-boss-%d").build()); + ThreadUtils.createThreadFactory( + "netty-server-boss-%d", exceptionHandler)); workerGroup = new NioEventLoopGroup(maxPoolSize, - new ThreadFactoryBuilder().setNameFormat( - "netty-server-worker-%d").build()); + ThreadUtils.createThreadFactory( + "netty-server-worker-%d", exceptionHandler)); try { this.localHostname = conf.getLocalHostname(); @@ -173,8 +177,8 @@ public class NettyServer { if (useExecutionGroup) { int executionThreads = conf.getNettyServerExecutionThreads(); executionGroup = new DefaultEventExecutorGroup(executionThreads, - new ThreadFactoryBuilder().setNameFormat("netty-server-exec-%d"). - build()); + ThreadUtils.createThreadFactory( + "netty-server-exec-%d", exceptionHandler)); if (LOG.isInfoEnabled()) { LOG.info("NettyServer: Using execution group with " + executionThreads + " threads for " + @@ -194,13 +198,16 @@ public class NettyServer { * @param myTaskInfo Current task info * @param progressable Progressable for reporting progress * @param saslServerHandlerFactory Factory for SASL handlers + * @param exceptionHandler handle uncaught exceptions */ public NettyServer(ImmutableClassesGiraphConfiguration conf, RequestServerHandler.Factory requestServerHandlerFactory, TaskInfo myTaskInfo, Progressable progressable, - SaslServerHandler.Factory saslServerHandlerFactory) { - this(conf, requestServerHandlerFactory, myTaskInfo, progressable); + SaslServerHandler.Factory saslServerHandlerFactory, + Thread.UncaughtExceptionHandler exceptionHandler) { + this(conf, requestServerHandlerFactory, myTaskInfo, + progressable, exceptionHandler); this.saslServerHandlerFactory = saslServerHandlerFactory; } /*end[HADOOP_NON_SECURE]*/ @@ -267,8 +274,8 @@ public class NettyServer { executionGroup, ch); PipelineUtils.addLastWithExecutorCheck("requestServerHandler", requestServerHandlerFactory.newHandler(workerRequestReservedMap, - conf, myTaskInfo), handlerToUseExecutionGroup, - executionGroup, ch); + conf, myTaskInfo, exceptionHandler), + handlerToUseExecutionGroup, executionGroup, ch); // Removed after authentication completes: PipelineUtils.addLastWithExecutorCheck("responseEncoder", new ResponseEncoder(), handlerToUseExecutionGroup, @@ -310,7 +317,7 @@ public class NettyServer { handlerToUseExecutionGroup, executionGroup, ch); PipelineUtils.addLastWithExecutorCheck("requestServerHandler", requestServerHandlerFactory.newHandler( - workerRequestReservedMap, conf, myTaskInfo), + workerRequestReservedMap, conf, myTaskInfo, exceptionHandler), handlerToUseExecutionGroup, executionGroup, ch); /*if_not[HADOOP_NON_SECURE]*/ } @@ -404,5 +411,6 @@ public class NettyServer { public InetSocketAddress getMyAddress() { return myAddress; } + } http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java index 7541418..c893a24 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java @@ -74,13 +74,17 @@ public class NettyWorkerClient<I extends WritableComparable, * @param context Context from mapper * @param configuration Configuration * @param service Used to get partition mapping + * @param exceptionHandler handler for uncaught exception. Will + * terminate job. */ public NettyWorkerClient( Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration<I, V, E> configuration, - CentralizedServiceWorker<I, V, E> service) { + CentralizedServiceWorker<I, V, E> service, + Thread.UncaughtExceptionHandler exceptionHandler) { this.nettyClient = - new NettyClient(context, configuration, service.getWorkerInfo()); + new NettyClient(context, configuration, service.getWorkerInfo(), + exceptionHandler); this.conf = configuration; this.service = service; this.superstepRequestCounters = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java index adb96cb..22ecc0e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java @@ -77,10 +77,12 @@ public class NettyWorkerServer<I extends WritableComparable, * @param conf Configuration * @param service Service to get partition mappings * @param context Mapper context + * @param exceptionHandler handle uncaught exceptions */ public NettyWorkerServer(ImmutableClassesGiraphConfiguration<I, V, E> conf, CentralizedServiceWorker<I, V, E> service, - Mapper<?, ?, ?, ?>.Context context) { + Mapper<?, ?, ?, ?>.Context context, + Thread.UncaughtExceptionHandler exceptionHandler) { this.conf = conf; this.service = service; this.context = context; @@ -91,7 +93,7 @@ public class NettyWorkerServer<I extends WritableComparable, nettyServer = new NettyServer(conf, new WorkerRequestServerHandler.Factory<I, V, E>(serverData), - service.getWorkerInfo(), context); + service.getWorkerInfo(), context, exceptionHandler); nettyServer.start(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java index 3e06026..e043314 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java @@ -36,13 +36,15 @@ public class MasterRequestServerHandler extends * @param conf Configuration * @param myTaskInfo Current task info * @param aggregatorHandler Master aggregator handler + * @param exceptionHandler Handles uncaught exceptions */ public MasterRequestServerHandler( WorkerRequestReservedMap workerRequestReservedMap, ImmutableClassesGiraphConfiguration conf, TaskInfo myTaskInfo, - MasterAggregatorHandler aggregatorHandler) { - super(workerRequestReservedMap, conf, myTaskInfo); + MasterAggregatorHandler aggregatorHandler, + Thread.UncaughtExceptionHandler exceptionHandler) { + super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler); this.aggregatorHandler = aggregatorHandler; } @@ -71,9 +73,10 @@ public class MasterRequestServerHandler extends public RequestServerHandler newHandler( WorkerRequestReservedMap workerRequestReservedMap, ImmutableClassesGiraphConfiguration conf, - TaskInfo myTaskInfo) { + TaskInfo myTaskInfo, + Thread.UncaughtExceptionHandler exceptionHandler) { return new MasterRequestServerHandler(workerRequestReservedMap, conf, - myTaskInfo, aggregatorHandler); + myTaskInfo, aggregatorHandler, exceptionHandler); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java index b6d0533..d75870a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java @@ -56,6 +56,8 @@ public abstract class RequestServerHandler<R> extends private final TaskInfo myTaskInfo; /** Start nanoseconds for the processing time */ private long startProcessingNanoseconds = -1; + /** Handler for uncaught exceptions */ + private final Thread.UncaughtExceptionHandler exceptionHandler; /** * Constructor @@ -63,14 +65,17 @@ public abstract class RequestServerHandler<R> extends * @param workerRequestReservedMap Worker request reservation map * @param conf Configuration * @param myTaskInfo Current task info + * @param exceptionHandler Handles uncaught exceptions */ public RequestServerHandler( WorkerRequestReservedMap workerRequestReservedMap, ImmutableClassesGiraphConfiguration conf, - TaskInfo myTaskInfo) { + TaskInfo myTaskInfo, + Thread.UncaughtExceptionHandler exceptionHandler) { this.workerRequestReservedMap = workerRequestReservedMap; closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf); this.myTaskInfo = myTaskInfo; + this.exceptionHandler = exceptionHandler; } @Override @@ -159,10 +164,9 @@ public abstract class RequestServerHandler<R> extends } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - LOG.warn("exceptionCaught: Channel failed with " + - "remote address " + ctx.channel().remoteAddress(), cause); + public void exceptionCaught( + ChannelHandlerContext ctx, Throwable cause) throws Exception { + exceptionHandler.uncaughtException(Thread.currentThread(), cause); } /** @@ -175,11 +179,13 @@ public abstract class RequestServerHandler<R> extends * @param workerRequestReservedMap Worker request reservation map * @param conf Configuration to use * @param myTaskInfo Current task info + * @param exceptionHandler Handles uncaught exceptions * @return New {@link RequestServerHandler} */ RequestServerHandler newHandler( WorkerRequestReservedMap workerRequestReservedMap, ImmutableClassesGiraphConfiguration conf, - TaskInfo myTaskInfo); + TaskInfo myTaskInfo, + Thread.UncaughtExceptionHandler exceptionHandler); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java index f64c373..574e413 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java @@ -46,12 +46,14 @@ public class WorkerRequestServerHandler<I extends WritableComparable, * @param workerRequestReservedMap Worker request reservation map * @param conf Configuration * @param myTaskInfo Current task info + * @param exceptionHandler Handles uncaught exceptions */ public WorkerRequestServerHandler(ServerData<I, V, E> serverData, WorkerRequestReservedMap workerRequestReservedMap, ImmutableClassesGiraphConfiguration conf, - TaskInfo myTaskInfo) { - super(workerRequestReservedMap, conf, myTaskInfo); + TaskInfo myTaskInfo, + Thread.UncaughtExceptionHandler exceptionHandler) { + super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler); this.serverData = serverData; } @@ -80,9 +82,10 @@ public class WorkerRequestServerHandler<I extends WritableComparable, public RequestServerHandler newHandler( WorkerRequestReservedMap workerRequestReservedMap, ImmutableClassesGiraphConfiguration conf, - TaskInfo myTaskInfo) { + TaskInfo myTaskInfo, + Thread.UncaughtExceptionHandler exceptionHandler) { return new WorkerRequestServerHandler<I, V, E, Writable>(serverData, - workerRequestReservedMap, conf, myTaskInfo); + workerRequestReservedMap, conf, myTaskInfo, exceptionHandler); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java index c86a024..6f748c5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java @@ -49,13 +49,15 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, @Override public void setup(Context context) throws IOException, InterruptedException { - // Setting the default handler for uncaught exceptions. - Thread.setDefaultUncaughtExceptionHandler( - new OverrideExceptionHandler()); - // Execute all Giraph-related role(s) assigned to this compute node. // Roles can include "master," "worker," "zookeeper," or . . . ? graphTaskManager = new GraphTaskManager<I, V, E>(context); + + // Setting the default handler for uncaught exceptions. + Thread.setDefaultUncaughtExceptionHandler( + graphTaskManager.createUncaughtExceptionHandler()); + + graphTaskManager.setup( DistributedCache.getLocalCacheArchives(context.getConfiguration())); } @@ -96,6 +98,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, // CHECKSTYLE: stop IllegalCatch } catch (RuntimeException e) { // CHECKSTYLE: resume IllegalCatch + LOG.error("Caught an unrecoverable exception " + e.getMessage(), e); graphTaskManager.zooKeeperCleanup(); graphTaskManager.workerFailureCleanup(); throw new IllegalStateException( @@ -103,16 +106,4 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, } } - /** - * Default handler for uncaught exceptions. - */ - class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - LOG.fatal( - "uncaughtException: OverrideExceptionHandler on thread " + - t.getName() + ", msg = " + e.getMessage() + ", exiting...", e); - System.exit(1); - } - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index e13eedd..b2a5c84 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -911,7 +911,37 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, } } + /** + * Creates exception handler that will terminate process gracefully in case + * of any uncaught exception. + * @return new exception handler object. + */ + public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() { + return new OverrideExceptionHandler(); + } + public ImmutableClassesGiraphConfiguration<I, V, E> getConf() { return conf; } + + + /** + * Default handler for uncaught exceptions. + * It will do the best to clean up and then will terminate current giraph job. + */ + class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler { + @Override + public void uncaughtException(final Thread t, final Throwable e) { + try { + LOG.fatal( + "uncaughtException: OverrideExceptionHandler on thread " + + t.getName() + ", msg = " + e.getMessage() + ", exiting...", e); + + zooKeeperCleanup(); + workerFailureCleanup(); + } finally { + System.exit(1); + } + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 02d4f2b..0275395 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -883,11 +883,13 @@ public class BspServiceMaster<I extends WritableComparable, masterInfo = new MasterInfo(); masterServer = - new NettyMasterServer(getConfiguration(), this, getContext()); + new NettyMasterServer(getConfiguration(), this, getContext(), + getGraphTaskManager().createUncaughtExceptionHandler()); masterInfo.setInetSocketAddress(masterServer.getMyAddress()); masterInfo.setTaskId(getTaskPartition()); masterClient = - new NettyMasterClient(getContext(), getConfiguration(), this); + new NettyMasterClient(getContext(), getConfiguration(), this, + getGraphTaskManager().createUncaughtExceptionHandler()); if (LOG.isInfoEnabled()) { LOG.info("becomeMaster: I am now the master!"); @@ -1397,8 +1399,7 @@ public class BspServiceMaster<I extends WritableComparable, // Did a worker die? try { - if ((getSuperstep() > 0) && - !superstepChosenWorkerAlive( + if (!superstepChosenWorkerAlive( workerInfoHealthyPath, workerInfoList)) { return false; http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java new file mode 100644 index 0000000..a235ff4 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.utils; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.concurrent.ThreadFactory; + +/** + * Utility class for thread related functions. + */ +public class ThreadUtils { + + /** + * Utility class. Do not inherit or create objects. + */ + private ThreadUtils() { } + + /** + * Creates new thread factory with specified thread name format. + * + * @param nameFormat defines naming format for threads created by + * thread factory + * @param exceptionHandler handles uncaught exceptions in all threads + * produced created thread factory + * @return new thread factory with specified thread name format and + * exception handler. + */ + public static ThreadFactory createThreadFactory( + String nameFormat, + Thread.UncaughtExceptionHandler exceptionHandler) { + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(). + setNameFormat(nameFormat); + if (exceptionHandler != null) { + builder.setUncaughtExceptionHandler(exceptionHandler); + } + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index dbe6a45..de7af28 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -197,10 +197,12 @@ public class BspServiceWorker<I extends WritableComparable, workerGraphPartitioner = getGraphPartitionerFactory().createWorkerGraphPartitioner(); workerInfo = new WorkerInfo(); - workerServer = new NettyWorkerServer<I, V, E>(conf, this, context); + workerServer = new NettyWorkerServer<I, V, E>(conf, this, context, + graphTaskManager.createUncaughtExceptionHandler()); workerInfo.setInetSocketAddress(workerServer.getMyAddress()); workerInfo.setTaskId(getTaskPartition()); - workerClient = new NettyWorkerClient<I, V, E>(context, conf, this); + workerClient = new NettyWorkerClient<I, V, E>(context, conf, this, + graphTaskManager.createUncaughtExceptionHandler()); workerAggregatorRequestProcessor = new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this); http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java index f4719cc..ccfc972 100644 --- a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java +++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java @@ -166,19 +166,6 @@ public class GiraphYarnTask<I extends WritableComparable, V extends Writable, } /** - * Default handler for uncaught exceptions. - */ - class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - LOG.fatal( - "uncaughtException: OverrideExceptionHandler on thread " + - t.getName() + ", msg = " + e.getMessage() + ", exiting...", e); - System.exit(1); - } - } - - /** * Task entry point. * @param args CLI arguments injected by GiraphApplicationMaster to hand off * job, task, and attempt ID's to this (and every) Giraph task. http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java index e771e36..5bc9ef0 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java @@ -70,11 +70,12 @@ public class ConnectionTest { NettyServer server = new NettyServer(conf, new WorkerRequestServerHandler.Factory(serverData), workerInfo, - context); + context, new MockExceptionHandler()); server.start(); workerInfo.setInetSocketAddress(server.getMyAddress()); - NettyClient client = new NettyClient(context, conf, new WorkerInfo()); + NettyClient client = new NettyClient(context, conf, new WorkerInfo(), + new MockExceptionHandler()); client.connectAllAddresses( Lists.<WorkerInfo>newArrayList(workerInfo)); @@ -101,7 +102,8 @@ public class ConnectionTest { WorkerInfo workerInfo1 = new WorkerInfo(); workerInfo1.setTaskId(1); NettyServer server1 = - new NettyServer(conf, requestServerHandlerFactory, workerInfo1, context); + new NettyServer(conf, requestServerHandlerFactory, workerInfo1, + context, new MockExceptionHandler()); server1.start(); workerInfo1.setInetSocketAddress(server1.getMyAddress()); @@ -109,7 +111,7 @@ public class ConnectionTest { workerInfo1.setTaskId(2); NettyServer server2 = new NettyServer(conf, requestServerHandlerFactory, workerInfo2, - context); + context, new MockExceptionHandler()); server2.start(); workerInfo2.setInetSocketAddress(server2.getMyAddress()); @@ -117,11 +119,12 @@ public class ConnectionTest { workerInfo1.setTaskId(3); NettyServer server3 = new NettyServer(conf, requestServerHandlerFactory, workerInfo3, - context); + context, new MockExceptionHandler()); server3.start(); workerInfo3.setInetSocketAddress(server3.getMyAddress()); - NettyClient client = new NettyClient(context, conf, new WorkerInfo()); + NettyClient client = new NettyClient(context, conf, new WorkerInfo(), + new MockExceptionHandler()); List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo1, workerInfo2, workerInfo3); client.connectAllAddresses(addresses); @@ -148,16 +151,19 @@ public class ConnectionTest { WorkerInfo workerInfo = new WorkerInfo(); NettyServer server = new NettyServer(conf, new WorkerRequestServerHandler.Factory(serverData), workerInfo, - context); + context, new MockExceptionHandler()); server.start(); workerInfo.setInetSocketAddress(server.getMyAddress()); List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo); - NettyClient client1 = new NettyClient(context, conf, new WorkerInfo()); + NettyClient client1 = new NettyClient(context, conf, new WorkerInfo(), + new MockExceptionHandler()); client1.connectAllAddresses(addresses); - NettyClient client2 = new NettyClient(context, conf, new WorkerInfo()); + NettyClient client2 = new NettyClient(context, conf, new WorkerInfo(), + new MockExceptionHandler()); client2.connectAllAddresses(addresses); - NettyClient client3 = new NettyClient(context, conf, new WorkerInfo()); + NettyClient client3 = new NettyClient(context, conf, new WorkerInfo(), + new MockExceptionHandler()); client3.connectAllAddresses(addresses); client1.stop(); http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/MockExceptionHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/MockExceptionHandler.java b/giraph-core/src/test/java/org/apache/giraph/comm/MockExceptionHandler.java new file mode 100644 index 0000000..edd3fc0 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/comm/MockExceptionHandler.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm; + +public class MockExceptionHandler implements Thread.UncaughtExceptionHandler{ + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new RuntimeException(e); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java index 157a543..572e290 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java @@ -160,10 +160,11 @@ public class RequestFailureTest { WorkerInfo workerInfo = new WorkerInfo(); server = new NettyServer(conf, new WorkerRequestServerHandler.Factory(serverData), workerInfo, - context); + context, new MockExceptionHandler()); server.start(); workerInfo.setInetSocketAddress(server.getMyAddress()); - client = new NettyClient(context, conf, new WorkerInfo()); + client = new NettyClient(context, conf, new WorkerInfo(), + new MockExceptionHandler()); client.connectAllAddresses( Lists.<WorkerInfo>newArrayList(workerInfo)); http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java index 32454f4..8037db9 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java @@ -95,10 +95,11 @@ public class RequestTest { workerInfo = new WorkerInfo(); server = new NettyServer(conf, new WorkerRequestServerHandler.Factory(serverData), workerInfo, - context); + context, new MockExceptionHandler()); server.start(); workerInfo.setInetSocketAddress(server.getMyAddress()); - client = new NettyClient(context, conf, new WorkerInfo()); + client = new NettyClient(context, conf, new WorkerInfo(), + new MockExceptionHandler()); client.connectAllAddresses( Lists.<WorkerInfo>newArrayList(workerInfo)); } http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java index c026cf8..96ce062 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java @@ -85,11 +85,13 @@ public class SaslConnectionTest { new WorkerRequestServerHandler.Factory(serverData), workerInfo, context, - mockedSaslServerFactory); + mockedSaslServerFactory, + new MockExceptionHandler()); server.start(); workerInfo.setInetSocketAddress(server.getMyAddress()); - NettyClient client = new NettyClient(context, conf, new WorkerInfo()); + NettyClient client = new NettyClient(context, conf, new WorkerInfo(), + new MockExceptionHandler()); client.connectAllAddresses(Lists.<WorkerInfo>newArrayList(workerInfo)); client.stop();
