Repository: storm
Updated Branches:
  refs/heads/master a0f3b1ace -> 22440747a


STORM-3249: Make sure times shut down and so does shut down thread


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bbfd7cd4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bbfd7cd4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bbfd7cd4

Branch: refs/heads/master
Commit: bbfd7cd4050ae80d406daaed9cec882b00be1f98
Parents: 20d5581
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Oct 8 13:25:17 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Oct 8 13:25:17 2018 -0500

----------------------------------------------------------------------
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  |  2 +-
 .../org/apache/storm/hive/bolt/HiveBolt.java    |  5 ++-
 .../apache/storm/hive/trident/HiveState.java    |  3 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 43 ++++++++++++--------
 .../storm/blobstore/LocalFsBlobStore.java       |  5 ++-
 .../java/org/apache/storm/daemon/drpc/DRPC.java |  6 +--
 6 files changed, 40 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bbfd7cd4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 12debe5..bdf9da3 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -390,7 +390,7 @@ public class HdfsSpout extends BaseRichSpout {
     public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
         LOG.info("Opening HDFS Spout");
         this.conf = conf;
-        this.commitTimer = new Timer();
+        this.commitTimer = new Timer(context.getThisTaskId() + 
"-commit-timer", true);
         this.tracker = new ProgressTracker();
         this.hdfsConfig = new Configuration();
         this.collector = collector;

http://git-wip-us.apache.org/repos/asf/storm/blob/bbfd7cd4/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java 
b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
index cfabbd6..180f41b 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -79,7 +79,7 @@ public class HiveBolt extends BaseRichBolt {
                                                                 new 
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
 
             sendHeartBeat.set(true);
-            heartBeatTimer = new Timer();
+            heartBeatTimer = new Timer(topologyContext.getThisTaskId() + 
"-hb-timer", true);
             setupHeartBeatTimer();
 
         } catch (Exception e) {
@@ -151,6 +151,9 @@ public class HiveBolt extends BaseRichBolt {
         }
 
         callTimeoutPool = null;
+        if (heartBeatTimer != null) {
+            heartBeatTimer.cancel();
+        }
         super.cleanup();
         LOG.info("Hive Bolt stopped");
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bbfd7cd4/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
 
b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
index a698e24..6717329 100644
--- 
a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
+++ 
b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
@@ -80,7 +80,7 @@ public class HiveState implements State {
             String timeoutName = "hive-bolt-%d";
             this.callTimeoutPool = Executors.newFixedThreadPool(1,
                                                                 new 
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
-            heartBeatTimer = new Timer();
+            heartBeatTimer = new Timer("hive-hb-timer", true);
             setupHeartBeatTimer();
         } catch (Exception e) {
             LOG.warn("unable to make connection to hive ", e);
@@ -289,6 +289,7 @@ public class HiveState implements State {
                 LOG.warn("shutdown interrupted on " + execService, ex);
             }
         }
+        heartBeatTimer.cancel();
         callTimeoutPool = null;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bbfd7cd4/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java 
b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index a0406e0..9e9196f 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -33,6 +33,7 @@ import java.io.ObjectOutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.URL;
@@ -300,21 +301,25 @@ public class Utils {
      * runtime to avoid any zombie process in case cleanup function hangs.
      */
     public static void addShutdownHookWithDelayedForceKill(Runnable func, int 
numSecs) {
-        Runnable sleepKill = new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    LOG.info("Halting after {} seconds", numSecs);
-                    Time.sleepSecs(numSecs);
-                    LOG.warn("Forcing Halt...");
-                    Runtime.getRuntime().halt(20);
-                } catch (Exception e) {
-                    LOG.warn("Exception in the ShutDownHook", e);
-                }
+        final Thread sleepKill = new Thread(() -> {
+            try {
+                LOG.info("Halting after {} seconds", numSecs);
+                Time.sleepSecs(numSecs);
+                LOG.warn("Forcing Halt... {}", Utils.threadDump());
+                Runtime.getRuntime().halt(20);
+            } catch (InterruptedException ie) {
+                //Ignored/expected...
+            } catch (Exception e) {
+                LOG.warn("Exception in the ShutDownHook", e);
             }
-        };
-        Runtime.getRuntime().addShutdownHook(new Thread(func));
-        Runtime.getRuntime().addShutdownHook(new Thread(sleepKill));
+        });
+        sleepKill.setDaemon(true);
+        Thread wrappedFunc = new Thread(() -> {
+            func.run();
+            sleepKill.interrupt();
+        });
+        Runtime.getRuntime().addShutdownHook(wrappedFunc);
+        Runtime.getRuntime().addShutdownHook(sleepKill);
     }
 
     public static boolean isSystemId(String id) {
@@ -1190,7 +1195,9 @@ public class Utils {
         final StringBuilder dump = new StringBuilder();
         final java.lang.management.ThreadMXBean threadMXBean = 
ManagementFactory.getThreadMXBean();
         final java.lang.management.ThreadInfo[] threadInfos = 
threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
-        for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
+        for (Entry<Thread, StackTraceElement[]> entry: 
Thread.getAllStackTraces().entrySet()) {
+            Thread t = entry.getKey();
+            ThreadInfo threadInfo = threadMXBean.getThreadInfo(t.getId());
             if (threadInfo == null) {
                 //Thread died before we could get the info, skip
                 continue;
@@ -1198,6 +1205,9 @@ public class Utils {
             dump.append('"');
             dump.append(threadInfo.getThreadName());
             dump.append("\" ");
+            if (t.isDaemon()) {
+                dump.append("(DAEMON)");
+            }
             dump.append("\n   lock: ");
             dump.append(threadInfo.getLockName());
             dump.append(" owner: ");
@@ -1205,8 +1215,7 @@ public class Utils {
             final Thread.State state = threadInfo.getThreadState();
             dump.append("\n   java.lang.Thread.State: ");
             dump.append(state);
-            final StackTraceElement[] stackTraceElements = 
threadInfo.getStackTrace();
-            for (final StackTraceElement stackTraceElement : 
stackTraceElements) {
+            for (final StackTraceElement stackTraceElement : entry.getValue()) 
{
                 dump.append("\n        at ");
                 dump.append(stackTraceElement);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/bbfd7cd4/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java 
b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
index a72944b..7724b31 100644
--- 
a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ 
b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -108,7 +108,7 @@ public class LocalFsBlobStore extends BlobStore {
         } catch (Exception e) {
             e.printStackTrace();
         }
-        timer = new Timer();
+        timer = new Timer("BLOB-STORE-TIMER", true);
         this.leaderElector = leaderElector;
     }
 
@@ -411,6 +411,9 @@ public class LocalFsBlobStore extends BlobStore {
         if (zkClient != null) {
             zkClient.close();
         }
+        if (timer != null) {
+            timer.cancel();;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/bbfd7cd4/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java 
b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java
index e8edd7d..2d853f3 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java
@@ -71,7 +71,7 @@ public class DRPC implements AutoCloseable {
     //Waiting to be returned
     private final ConcurrentHashMap<String, OutstandingRequest> _requests =
         new ConcurrentHashMap<>();
-    private final Timer _timer = new Timer();
+    private final Timer timer = new Timer("DRPC-CLEANUP-TIMER", true);
     private final AtomicLong _ctr = new AtomicLong(0);
     private final IAuthorizer _auth;
 
@@ -87,7 +87,7 @@ public class DRPC implements AutoCloseable {
         this.meterResultCalls = 
metricsRegistry.registerMeter("drpc:num-result-calls");
         this.meterFailRequestCalls = 
metricsRegistry.registerMeter("drpc:num-failRequest-calls");
         this.meterFetchRequestCalls = 
metricsRegistry.registerMeter("drpc:num-fetchRequest-calls");
-        _timer.scheduleAtFixedRate(new TimerTask() {
+        timer.scheduleAtFixedRate(new TimerTask() {
             @Override
             public void run() {
                 cleanupAll(timeoutMs, TIMED_OUT);
@@ -241,7 +241,7 @@ public class DRPC implements AutoCloseable {
 
     @Override
     public void close() {
-        _timer.cancel();
+        timer.cancel();
         cleanupAll(0, SHUT_DOWN);
     }
 }

Reply via email to