HADOOP-12749. Create a threadpoolexecutor that overrides afterExecute to log 
uncaught exceptions/errors. Contributed by Sidharta Seethana.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d37eb828
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d37eb828
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d37eb828

Branch: refs/heads/yarn-2877
Commit: d37eb828ffa09d55936964f555ea351b946d286e
Parents: af21810
Author: Varun Vasudev <vvasu...@apache.org>
Authored: Mon Feb 8 20:19:17 2016 +0530
Committer: Varun Vasudev <vvasu...@apache.org>
Committed: Mon Feb 8 20:19:17 2016 +0530

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../hadoop/util/concurrent/ExecutorHelper.java  | 67 ++++++++++++++
 .../hadoop/util/concurrent/HadoopExecutors.java | 96 ++++++++++++++++++++
 .../HadoopScheduledThreadPoolExecutor.java      | 71 +++++++++++++++
 .../concurrent/HadoopThreadPoolExecutor.java    | 92 +++++++++++++++++++
 .../hadoop/util/concurrent/package-info.java    | 26 ++++++
 6 files changed, 355 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index dbfa482..d99936f 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -685,6 +685,9 @@ Release 2.9.0 - UNRELEASED
     HADOOP-12662. The build should fail if a -Dbundle option fails (Kai Zheng
     via cmccabe)
 
+    HADOOP-12749. Create a threadpoolexecutor that overrides afterExecute to
+    log uncaught exceptions/errors. (Sidharta Seethana via vvasudev)
+
   BUG FIXES
 
     HADOOP-12605. Fix intermittent failure of TestIPC.testIpcWithReaderQueuing

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java
new file mode 100644
index 0000000..3bc9ed9
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/ExecutorHelper.java
@@ -0,0 +1,67 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.util.concurrent;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/** Helper functions for Executors. */
+public final class ExecutorHelper {
+
+  private static final Log LOG = LogFactory
+      .getLog(ExecutorHelper.class);
+
+  static void logThrowableFromAfterExecute(Runnable r, Throwable t) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("afterExecute in thread: " + Thread.currentThread()
+          .getName() + ", runnable type: " + r.getClass().getName());
+    }
+
+    //For additional information, see: https://docs.oracle
+    // .com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor
+    // .html#afterExecute(java.lang.Runnable,%20java.lang.Throwable) .
+
+    if (t == null && r instanceof Future<?>) {
+      try {
+        ((Future<?>) r).get();
+      } catch (ExecutionException ee) {
+        LOG.warn("Execution exception when running task in " +
+            Thread.currentThread().getName());
+        t = ee.getCause();
+      } catch (InterruptedException ie) {
+        LOG.warn("Thread (" + Thread.currentThread() + ") interrupted: ", ie);
+        Thread.currentThread().interrupt();
+      } catch (Throwable throwable) {
+        t = throwable;
+      }
+    }
+
+    if (t != null) {
+      LOG.warn("Caught exception in thread " + Thread
+          .currentThread().getName() + ": ", t);
+    }
+  }
+
+  private ExecutorHelper() {}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
new file mode 100644
index 0000000..1bc6976
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
@@ -0,0 +1,96 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+
+/** Factory methods for ExecutorService, ScheduledExecutorService instances.
+ * These executor service instances provide additional functionality (e.g
+ * logging uncaught exceptions). */
+public final class HadoopExecutors {
+  public static ExecutorService newCachedThreadPool(ThreadFactory
+      threadFactory) {
+    return new HadoopThreadPoolExecutor(0, Integer.MAX_VALUE,
+        60L, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(),
+        threadFactory);
+  }
+
+  public static ExecutorService newFixedThreadPool(int nThreads,
+      ThreadFactory threadFactory) {
+    return new HadoopThreadPoolExecutor(nThreads, nThreads,
+        0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>(),
+        threadFactory);
+  }
+
+  public static ExecutorService newFixedThreadPool(int nThreads) {
+    return new HadoopThreadPoolExecutor(nThreads, nThreads,
+        0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>());
+  }
+
+  //Executors.newSingleThreadExecutor has special semantics - for the
+  // moment we'll delegate to it rather than implement the semantics here.
+  public static ExecutorService newSingleThreadExecutor() {
+    return Executors.newSingleThreadExecutor();
+  }
+
+  //Executors.newSingleThreadExecutor has special semantics - for the
+  // moment we'll delegate to it rather than implement the semantics here.
+  public static ExecutorService newSingleThreadExecutor(ThreadFactory
+      threadFactory) {
+    return Executors.newSingleThreadExecutor(threadFactory);
+  }
+
+  public static ScheduledExecutorService newScheduledThreadPool(
+      int corePoolSize) {
+    return new HadoopScheduledThreadPoolExecutor(corePoolSize);
+  }
+
+  public static ScheduledExecutorService newScheduledThreadPool(
+      int corePoolSize, ThreadFactory threadFactory) {
+    return new HadoopScheduledThreadPoolExecutor(corePoolSize, threadFactory);
+  }
+
+  //Executors.newSingleThreadScheduledExecutor has special semantics - for the
+  // moment we'll delegate to it rather than implement the semantics here
+  public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
+    return Executors.newSingleThreadScheduledExecutor();
+  }
+
+  //Executors.newSingleThreadScheduledExecutor has special semantics - for the
+  // moment we'll delegate to it rather than implement the semantics here
+  public static ScheduledExecutorService newSingleThreadScheduledExecutor(
+      ThreadFactory threadFactory) {
+    return Executors.newSingleThreadScheduledExecutor(threadFactory);
+  }
+
+  //disable instantiation
+  private HadoopExecutors() { }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java
new file mode 100644
index 0000000..8d910b6
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java
@@ -0,0 +1,71 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.util.concurrent;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+/** An extension of ScheduledThreadPoolExecutor that provides additional
+ * functionality. */
+public class HadoopScheduledThreadPoolExecutor extends
+    ScheduledThreadPoolExecutor {
+
+  private static final Log LOG = LogFactory
+      .getLog(HadoopScheduledThreadPoolExecutor.class);
+
+  public HadoopScheduledThreadPoolExecutor(int corePoolSize) {
+    super(corePoolSize);
+  }
+
+  public HadoopScheduledThreadPoolExecutor(int corePoolSize,
+      ThreadFactory threadFactory) {
+    super(corePoolSize, threadFactory);
+  }
+
+  public HadoopScheduledThreadPoolExecutor(int corePoolSize,
+      RejectedExecutionHandler handler) {
+    super(corePoolSize, handler);
+  }
+
+  public HadoopScheduledThreadPoolExecutor(int corePoolSize,
+      ThreadFactory threadFactory,
+      RejectedExecutionHandler handler) {
+    super(corePoolSize, threadFactory, handler);
+  }
+
+  @Override
+  protected void beforeExecute(Thread t, Runnable r) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("beforeExecute in thread: " + Thread.currentThread()
+          .getName() + ", runnable type: " + r.getClass().getName());
+    }
+  }
+
+  @Override
+  protected void afterExecute(Runnable r, Throwable t) {
+    super.afterExecute(r, t);
+    ExecutorHelper.logThrowableFromAfterExecute(r, t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java
new file mode 100644
index 0000000..bcf26cb
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java
@@ -0,0 +1,92 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.util.concurrent;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/** An extension of ThreadPoolExecutor that provides additional functionality.
+ *  */
+public final class HadoopThreadPoolExecutor extends ThreadPoolExecutor {
+
+  private static final Log LOG = LogFactory
+      .getLog(HadoopThreadPoolExecutor.class);
+
+  public HadoopThreadPoolExecutor(int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue) {
+    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+  }
+
+  public HadoopThreadPoolExecutor(int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue,
+      ThreadFactory threadFactory) {
+    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+        threadFactory);
+  }
+
+  public HadoopThreadPoolExecutor(int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue,
+      RejectedExecutionHandler handler) {
+    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+        handler);
+  }
+
+  public HadoopThreadPoolExecutor(int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue,
+      ThreadFactory threadFactory,
+      RejectedExecutionHandler handler) {
+    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+        threadFactory, handler);
+  }
+
+  @Override
+  protected void beforeExecute(Thread t, Runnable r) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("beforeExecute in thread: " + Thread.currentThread()
+          .getName() + ", runnable type: " + r.getClass().getName());
+    }
+  }
+
+  @Override
+  protected void afterExecute(Runnable r, Throwable t) {
+    super.afterExecute(r, t);
+    ExecutorHelper.logThrowableFromAfterExecute(r, t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37eb828/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java
new file mode 100644
index 0000000..2effb65
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.util.concurrent;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

Reply via email to