HADOOP-12749. Create a threadpoolexecutor that overrides afterExecute to log uncaught exceptions/errors. Contributed by Sidharta Seethana.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d37eb828 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d37eb828 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d37eb828 Branch: refs/heads/yarn-2877 Commit: d37eb828ffa09d55936964f555ea351b946d286e Parents: af21810 Author: Varun Vasudev <vvasu...@apache.org> Authored: Mon Feb 8 20:19:17 2016 +0530 Committer: Varun Vasudev <vvasu...@apache.org> Committed: Mon Feb 8 20:19:17 2016 +0530 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../hadoop/util/concurrent/ExecutorHelper.java | 67 ++++++++++++++ .../hadoop/util/concurrent/HadoopExecutors.java | 96 ++++++++++++++++++++ .../HadoopScheduledThreadPoolExecutor.java | 71 +++++++++++++++ .../concurrent/HadoopThreadPoolExecutor.java | 92 +++++++++++++++++++ .../hadoop/util/concurrent/package-info.java | 26 ++++++ 6 files changed, 355 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index dbfa482..d99936f 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -685,6 +685,9 @@ Release 2.9.0 - UNRELEASED HADOOP-12662. The build should fail if a -Dbundle option fails (Kai Zheng via cmccabe) + HADOOP-12749. Create a threadpoolexecutor that overrides afterExecute to + log uncaught exceptions/errors. (Sidharta Seethana via vvasudev) + BUG FIXES HADOOP-12605. Fix intermittent failure of TestIPC.testIpcWithReaderQueuing http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java new file mode 100644 index 0000000..3bc9ed9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java @@ -0,0 +1,67 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.util.concurrent; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** Helper functions for Executors. */ +public final class ExecutorHelper { + + private static final Log LOG = LogFactory + .getLog(ExecutorHelper.class); + + static void logThrowableFromAfterExecute(Runnable r, Throwable t) { + if (LOG.isDebugEnabled()) { + LOG.debug("afterExecute in thread: " + Thread.currentThread() + .getName() + ", runnable type: " + r.getClass().getName()); + } + + //For additional information, see: https://docs.oracle + // .com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor + // .html#afterExecute(java.lang.Runnable,%20java.lang.Throwable) . + + if (t == null && r instanceof Future<?>) { + try { + ((Future<?>) r).get(); + } catch (ExecutionException ee) { + LOG.warn("Execution exception when running task in " + + Thread.currentThread().getName()); + t = ee.getCause(); + } catch (InterruptedException ie) { + LOG.warn("Thread (" + Thread.currentThread() + ") interrupted: ", ie); + Thread.currentThread().interrupt(); + } catch (Throwable throwable) { + t = throwable; + } + } + + if (t != null) { + LOG.warn("Caught exception in thread " + Thread + .currentThread().getName() + ": ", t); + } + } + + private ExecutorHelper() {} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java new file mode 100644 index 0000000..1bc6976 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java @@ -0,0 +1,96 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.util.concurrent; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + + +/** Factory methods for ExecutorService, ScheduledExecutorService instances. + * These executor service instances provide additional functionality (e.g + * logging uncaught exceptions). */ +public final class HadoopExecutors { + public static ExecutorService newCachedThreadPool(ThreadFactory + threadFactory) { + return new HadoopThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), + threadFactory); + } + + public static ExecutorService newFixedThreadPool(int nThreads, + ThreadFactory threadFactory) { + return new HadoopThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + threadFactory); + } + + public static ExecutorService newFixedThreadPool(int nThreads) { + return new HadoopThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>()); + } + + //Executors.newSingleThreadExecutor has special semantics - for the + // moment we'll delegate to it rather than implement the semantics here. + public static ExecutorService newSingleThreadExecutor() { + return Executors.newSingleThreadExecutor(); + } + + //Executors.newSingleThreadExecutor has special semantics - for the + // moment we'll delegate to it rather than implement the semantics here. + public static ExecutorService newSingleThreadExecutor(ThreadFactory + threadFactory) { + return Executors.newSingleThreadExecutor(threadFactory); + } + + public static ScheduledExecutorService newScheduledThreadPool( + int corePoolSize) { + return new HadoopScheduledThreadPoolExecutor(corePoolSize); + } + + public static ScheduledExecutorService newScheduledThreadPool( + int corePoolSize, ThreadFactory threadFactory) { + return new HadoopScheduledThreadPoolExecutor(corePoolSize, threadFactory); + } + + //Executors.newSingleThreadScheduledExecutor has special semantics - for the + // moment we'll delegate to it rather than implement the semantics here + public static ScheduledExecutorService newSingleThreadScheduledExecutor() { + return Executors.newSingleThreadScheduledExecutor(); + } + + //Executors.newSingleThreadScheduledExecutor has special semantics - for the + // moment we'll delegate to it rather than implement the semantics here + public static ScheduledExecutorService newSingleThreadScheduledExecutor( + ThreadFactory threadFactory) { + return Executors.newSingleThreadScheduledExecutor(threadFactory); + } + + //disable instantiation + private HadoopExecutors() { } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java new file mode 100644 index 0000000..8d910b6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java @@ -0,0 +1,71 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.util.concurrent; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +/** An extension of ScheduledThreadPoolExecutor that provides additional + * functionality. */ +public class HadoopScheduledThreadPoolExecutor extends + ScheduledThreadPoolExecutor { + + private static final Log LOG = LogFactory + .getLog(HadoopScheduledThreadPoolExecutor.class); + + public HadoopScheduledThreadPoolExecutor(int corePoolSize) { + super(corePoolSize); + } + + public HadoopScheduledThreadPoolExecutor(int corePoolSize, + ThreadFactory threadFactory) { + super(corePoolSize, threadFactory); + } + + public HadoopScheduledThreadPoolExecutor(int corePoolSize, + RejectedExecutionHandler handler) { + super(corePoolSize, handler); + } + + public HadoopScheduledThreadPoolExecutor(int corePoolSize, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, threadFactory, handler); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + if (LOG.isDebugEnabled()) { + LOG.debug("beforeExecute in thread: " + Thread.currentThread() + .getName() + ", runnable type: " + r.getClass().getName()); + } + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + ExecutorHelper.logThrowableFromAfterExecute(r, t); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java new file mode 100644 index 0000000..bcf26cb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java @@ -0,0 +1,92 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.util.concurrent; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +/** An extension of ThreadPoolExecutor that provides additional functionality. + * */ +public final class HadoopThreadPoolExecutor extends ThreadPoolExecutor { + + private static final Log LOG = LogFactory + .getLog(HadoopThreadPoolExecutor.class); + + public HadoopThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue<Runnable> workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + public HadoopThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue<Runnable> workQueue, + ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + threadFactory); + } + + public HadoopThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue<Runnable> workQueue, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + handler); + } + + public HadoopThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue<Runnable> workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + threadFactory, handler); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + if (LOG.isDebugEnabled()) { + LOG.debug("beforeExecute in thread: " + Thread.currentThread() + .getName() + ", runnable type: " + r.getClass().getName()); + } + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + ExecutorHelper.logThrowableFromAfterExecute(r, t); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java new file mode 100644 index 0000000..2effb65 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java @@ -0,0 +1,26 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.util.concurrent; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability;