HADOOP-12950. ShutdownHookManager should have a timeout for each of the 
Registered shutdown hook. Contributed by Xiaoyu Yao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aac4d65b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aac4d65b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aac4d65b

Branch: refs/heads/HDFS-7240
Commit: aac4d65bf9c6d68f53610e5fe9997a391e3fa053
Parents: 1963978
Author: Xiaoyu Yao <x...@apache.org>
Authored: Thu Mar 31 15:20:09 2016 -0700
Committer: Xiaoyu Yao <x...@apache.org>
Committed: Thu Mar 31 15:22:24 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/util/ShutdownHookManager.java | 116 +++++++++++++++----
 .../hadoop/util/TestShutdownHookManager.java    |  57 ++++++++-
 2 files changed, 150 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/aac4d65b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java
index 843454b..33f942f 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.util;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -26,6 +28,10 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -42,7 +48,12 @@ public class ShutdownHookManager {
   private static final ShutdownHookManager MGR = new ShutdownHookManager();
 
   private static final Log LOG = LogFactory.getLog(ShutdownHookManager.class);
+  private static final long TIMEOUT_DEFAULT = 10;
+  private static final TimeUnit TIME_UNIT_DEFAULT = TimeUnit.SECONDS;
 
+  private static final ExecutorService EXECUTOR =
+      HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+          .setDaemon(true).build());
   static {
     try {
       Runtime.getRuntime().addShutdownHook(
@@ -50,14 +61,33 @@ public class ShutdownHookManager {
           @Override
           public void run() {
             MGR.shutdownInProgress.set(true);
-            for (Runnable hook: MGR.getShutdownHooksInOrder()) {
+            for (HookEntry entry: MGR.getShutdownHooksInOrder()) {
+              Future<?> future = EXECUTOR.submit(entry.getHook());
               try {
-                hook.run();
+                future.get(entry.getTimeout(), entry.getTimeUnit());
+              } catch (TimeoutException ex) {
+                future.cancel(true);
+                LOG.warn("ShutdownHook '" + entry.getHook().getClass().
+                    getSimpleName() + "' timeout, " + ex.toString(), ex);
               } catch (Throwable ex) {
-                LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() +
-                         "' failed, " + ex.toString(), ex);
+                LOG.warn("ShutdownHook '" + entry.getHook().getClass().
+                    getSimpleName() + "' failed, " + ex.toString(), ex);
               }
             }
+            try {
+              EXECUTOR.shutdown();
+              if (!EXECUTOR.awaitTermination(TIMEOUT_DEFAULT,
+                  TIME_UNIT_DEFAULT)) {
+                LOG.error("ShutdownHookManger shutdown forcefully.");
+                EXECUTOR.shutdownNow();
+              }
+              LOG.info("ShutdownHookManger complete shutdown.");
+            } catch (InterruptedException ex) {
+              LOG.error("ShutdownHookManger interrupted while waiting for " +
+                  "termination.", ex);
+              EXECUTOR.shutdownNow();
+              Thread.currentThread().interrupt();
+            }
           }
         }
       );
@@ -77,15 +107,24 @@ public class ShutdownHookManager {
   }
 
   /**
-   * Private structure to store ShutdownHook and its priority.
+   * Private structure to store ShutdownHook, its priority and timeout
+   * settings.
    */
-  private static class HookEntry {
-    Runnable hook;
-    int priority;
+  static class HookEntry {
+    private final Runnable hook;
+    private final int priority;
+    private final long timeout;
+    private final TimeUnit unit;
+
+    HookEntry(Runnable hook, int priority) {
+      this(hook, priority, TIMEOUT_DEFAULT, TIME_UNIT_DEFAULT);
+    }
 
-    public HookEntry(Runnable hook, int priority) {
+    HookEntry(Runnable hook, int priority, long timeout, TimeUnit unit) {
       this.hook = hook;
       this.priority = priority;
+      this.timeout = timeout;
+      this.unit = unit;
     }
 
     @Override
@@ -104,10 +143,25 @@ public class ShutdownHookManager {
       return eq;
     }
 
+    Runnable getHook() {
+      return hook;
+    }
+
+    int getPriority() {
+      return priority;
+    }
+
+    long getTimeout() {
+      return timeout;
+    }
+
+    TimeUnit getTimeUnit() {
+      return unit;
+    }
   }
 
-  private Set<HookEntry> hooks =
-    Collections.synchronizedSet(new HashSet<HookEntry>());
+  private final Set<HookEntry> hooks =
+      Collections.synchronizedSet(new HashSet<HookEntry>());
 
   private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
 
@@ -121,7 +175,7 @@ public class ShutdownHookManager {
    *
    * @return the list of shutdownHooks in order of execution.
    */
-  List<Runnable> getShutdownHooksInOrder() {
+  List<HookEntry> getShutdownHooksInOrder() {
     List<HookEntry> list;
     synchronized (MGR.hooks) {
       list = new ArrayList<HookEntry>(MGR.hooks);
@@ -134,11 +188,7 @@ public class ShutdownHookManager {
         return o2.priority - o1.priority;
       }
     });
-    List<Runnable> ordered = new ArrayList<Runnable>();
-    for (HookEntry entry: list) {
-      ordered.add(entry.hook);
-    }
-    return ordered;
+    return list;
   }
 
   /**
@@ -154,12 +204,37 @@ public class ShutdownHookManager {
       throw new IllegalArgumentException("shutdownHook cannot be NULL");
     }
     if (shutdownInProgress.get()) {
-      throw new IllegalStateException("Shutdown in progress, cannot add a 
shutdownHook");
+      throw new IllegalStateException("Shutdown in progress, cannot add a " +
+          "shutdownHook");
     }
     hooks.add(new HookEntry(shutdownHook, priority));
   }
 
   /**
+   *
+   * Adds a shutdownHook with a priority and timeout the higher the priority
+   * the earlier will run. ShutdownHooks with same priority run
+   * in a non-deterministic order. The shutdown hook will be terminated if it
+   * has not been finished in the specified period of time.
+   *
+   * @param shutdownHook shutdownHook <code>Runnable</code>
+   * @param priority priority of the shutdownHook
+   * @param timeout timeout of the shutdownHook
+   * @param unit unit of the timeout <code>TimeUnit</code>
+   */
+  public void addShutdownHook(Runnable shutdownHook, int priority, long 
timeout,
+      TimeUnit unit) {
+    if (shutdownHook == null) {
+      throw new IllegalArgumentException("shutdownHook cannot be NULL");
+    }
+    if (shutdownInProgress.get()) {
+      throw new IllegalStateException("Shutdown in progress, cannot add a " +
+          "shutdownHook");
+    }
+    hooks.add(new HookEntry(shutdownHook, priority, timeout, unit));
+  }
+
+  /**
    * Removes a shutdownHook.
    *
    * @param shutdownHook shutdownHook to remove.
@@ -168,7 +243,8 @@ public class ShutdownHookManager {
    */
   public boolean removeShutdownHook(Runnable shutdownHook) {
     if (shutdownInProgress.get()) {
-      throw new IllegalStateException("Shutdown in progress, cannot remove a 
shutdownHook");
+      throw new IllegalStateException("Shutdown in progress, cannot remove a " 
+
+          "shutdownHook");
     }
     return hooks.remove(new HookEntry(shutdownHook, 0));
   }
@@ -198,4 +274,4 @@ public class ShutdownHookManager {
   public void clearShutdownHooks() {
     hooks.clear();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aac4d65b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java
index 586b899..2aa5e95 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java
@@ -17,10 +17,19 @@
  */
 package org.apache.hadoop.util;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.LoggerFactory;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Thread.sleep;
 
 public class TestShutdownHookManager {
+  static final Logger LOG =
+      LoggerFactory.getLogger(TestShutdownHookManager.class.getName());
 
   @Test
   public void shutdownHookManager() {
@@ -30,18 +39,48 @@ public class TestShutdownHookManager {
     Runnable hook1 = new Runnable() {
       @Override
       public void run() {
+        LOG.info("Shutdown hook1 complete.");
       }
     };
     Runnable hook2 = new Runnable() {
       @Override
       public void run() {
+        LOG.info("Shutdown hook2 complete.");
+      }
+    };
+
+    Runnable hook3 = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          sleep(3000);
+          LOG.info("Shutdown hook3 complete.");
+        } catch (InterruptedException ex) {
+          LOG.info("Shutdown hook3 interrupted exception:",
+              ExceptionUtils.getStackTrace(ex));
+          Assert.fail("Hook 3 should not timeout.");
+        }
+      }
+    };
+
+    Runnable hook4 = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          sleep(3500);
+          LOG.info("Shutdown hook4 complete.");
+          Assert.fail("Hook 4 should timeout");
+        } catch (InterruptedException ex) {
+          LOG.info("Shutdown hook4 interrupted exception:",
+              ExceptionUtils.getStackTrace(ex));
+        }
       }
     };
 
     mgr.addShutdownHook(hook1, 0);
     Assert.assertTrue(mgr.hasShutdownHook(hook1));
     Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
-    Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0));
+    Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0).getHook());
     mgr.removeShutdownHook(hook1);
     Assert.assertFalse(mgr.hasShutdownHook(hook1));
 
@@ -55,8 +94,20 @@ public class TestShutdownHookManager {
     Assert.assertTrue(mgr.hasShutdownHook(hook1));
     Assert.assertTrue(mgr.hasShutdownHook(hook2));
     Assert.assertEquals(2, mgr.getShutdownHooksInOrder().size());
-    Assert.assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0));
-    Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1));
+    Assert.assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0).getHook());
+    Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1).getHook());
+
+    // Test hook finish without timeout
+    mgr.addShutdownHook(hook3, 2, 4, TimeUnit.SECONDS);
+    Assert.assertTrue(mgr.hasShutdownHook(hook3));
+    Assert.assertEquals(hook3, mgr.getShutdownHooksInOrder().get(0).getHook());
+    Assert.assertEquals(4, mgr.getShutdownHooksInOrder().get(0).getTimeout());
 
+    // Test hook finish with timeout
+    mgr.addShutdownHook(hook4, 3, 2, TimeUnit.SECONDS);
+    Assert.assertTrue(mgr.hasShutdownHook(hook4));
+    Assert.assertEquals(hook4, mgr.getShutdownHooksInOrder().get(0).getHook());
+    Assert.assertEquals(2, mgr.getShutdownHooksInOrder().get(0).getTimeout());
+    LOG.info("Shutdown starts here");
   }
 }

Reply via email to