Repository: storm Updated Branches: refs/heads/master a0f3b1ace -> 22440747a
STORM-3249: Make sure times shut down and so does shut down thread Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bbfd7cd4 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bbfd7cd4 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bbfd7cd4 Branch: refs/heads/master Commit: bbfd7cd4050ae80d406daaed9cec882b00be1f98 Parents: 20d5581 Author: Robert (Bobby) Evans <ev...@yahoo-inc.com> Authored: Mon Oct 8 13:25:17 2018 -0500 Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com> Committed: Mon Oct 8 13:25:17 2018 -0500 ---------------------------------------------------------------------- .../org/apache/storm/hdfs/spout/HdfsSpout.java | 2 +- .../org/apache/storm/hive/bolt/HiveBolt.java | 5 ++- .../apache/storm/hive/trident/HiveState.java | 3 +- .../src/jvm/org/apache/storm/utils/Utils.java | 43 ++++++++++++-------- .../storm/blobstore/LocalFsBlobStore.java | 5 ++- .../java/org/apache/storm/daemon/drpc/DRPC.java | 6 +-- 6 files changed, 40 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/bbfd7cd4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index 12debe5..bdf9da3 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -390,7 +390,7 @@ public class HdfsSpout extends BaseRichSpout { public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { LOG.info("Opening HDFS Spout"); this.conf = conf; - this.commitTimer = new Timer(); + this.commitTimer = new Timer(context.getThisTaskId() + "-commit-timer", true); this.tracker = new ProgressTracker(); this.hdfsConfig = new Configuration(); this.collector = collector; http://git-wip-us.apache.org/repos/asf/storm/blob/bbfd7cd4/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java index cfabbd6..180f41b 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java @@ -79,7 +79,7 @@ public class HiveBolt extends BaseRichBolt { new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); sendHeartBeat.set(true); - heartBeatTimer = new Timer(); + heartBeatTimer = new Timer(topologyContext.getThisTaskId() + "-hb-timer", true); setupHeartBeatTimer(); } catch (Exception e) { @@ -151,6 +151,9 @@ public class HiveBolt extends BaseRichBolt { } callTimeoutPool = null; + if (heartBeatTimer != null) { + heartBeatTimer.cancel(); + } super.cleanup(); LOG.info("Hive Bolt stopped"); } http://git-wip-us.apache.org/repos/asf/storm/blob/bbfd7cd4/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java index a698e24..6717329 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java @@ -80,7 +80,7 @@ public class HiveState implements State { String timeoutName = "hive-bolt-%d"; this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); - heartBeatTimer = new Timer(); + heartBeatTimer = new Timer("hive-hb-timer", true); setupHeartBeatTimer(); } catch (Exception e) { LOG.warn("unable to make connection to hive ", e); @@ -289,6 +289,7 @@ public class HiveState implements State { LOG.warn("shutdown interrupted on " + execService, ex); } } + heartBeatTimer.cancel(); callTimeoutPool = null; } http://git-wip-us.apache.org/repos/asf/storm/blob/bbfd7cd4/storm-client/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java index a0406e0..9e9196f 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java @@ -33,6 +33,7 @@ import java.io.ObjectOutputStream; import java.io.OutputStreamWriter; import java.io.Serializable; import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; import java.net.InetAddress; import java.net.ServerSocket; import java.net.URL; @@ -300,21 +301,25 @@ public class Utils { * runtime to avoid any zombie process in case cleanup function hangs. */ public static void addShutdownHookWithDelayedForceKill(Runnable func, int numSecs) { - Runnable sleepKill = new Runnable() { - @Override - public void run() { - try { - LOG.info("Halting after {} seconds", numSecs); - Time.sleepSecs(numSecs); - LOG.warn("Forcing Halt..."); - Runtime.getRuntime().halt(20); - } catch (Exception e) { - LOG.warn("Exception in the ShutDownHook", e); - } + final Thread sleepKill = new Thread(() -> { + try { + LOG.info("Halting after {} seconds", numSecs); + Time.sleepSecs(numSecs); + LOG.warn("Forcing Halt... {}", Utils.threadDump()); + Runtime.getRuntime().halt(20); + } catch (InterruptedException ie) { + //Ignored/expected... + } catch (Exception e) { + LOG.warn("Exception in the ShutDownHook", e); } - }; - Runtime.getRuntime().addShutdownHook(new Thread(func)); - Runtime.getRuntime().addShutdownHook(new Thread(sleepKill)); + }); + sleepKill.setDaemon(true); + Thread wrappedFunc = new Thread(() -> { + func.run(); + sleepKill.interrupt(); + }); + Runtime.getRuntime().addShutdownHook(wrappedFunc); + Runtime.getRuntime().addShutdownHook(sleepKill); } public static boolean isSystemId(String id) { @@ -1190,7 +1195,9 @@ public class Utils { final StringBuilder dump = new StringBuilder(); final java.lang.management.ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100); - for (java.lang.management.ThreadInfo threadInfo : threadInfos) { + for (Entry<Thread, StackTraceElement[]> entry: Thread.getAllStackTraces().entrySet()) { + Thread t = entry.getKey(); + ThreadInfo threadInfo = threadMXBean.getThreadInfo(t.getId()); if (threadInfo == null) { //Thread died before we could get the info, skip continue; @@ -1198,6 +1205,9 @@ public class Utils { dump.append('"'); dump.append(threadInfo.getThreadName()); dump.append("\" "); + if (t.isDaemon()) { + dump.append("(DAEMON)"); + } dump.append("\n lock: "); dump.append(threadInfo.getLockName()); dump.append(" owner: "); @@ -1205,8 +1215,7 @@ public class Utils { final Thread.State state = threadInfo.getThreadState(); dump.append("\n java.lang.Thread.State: "); dump.append(state); - final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace(); - for (final StackTraceElement stackTraceElement : stackTraceElements) { + for (final StackTraceElement stackTraceElement : entry.getValue()) { dump.append("\n at "); dump.append(stackTraceElement); } http://git-wip-us.apache.org/repos/asf/storm/blob/bbfd7cd4/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java index a72944b..7724b31 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java @@ -108,7 +108,7 @@ public class LocalFsBlobStore extends BlobStore { } catch (Exception e) { e.printStackTrace(); } - timer = new Timer(); + timer = new Timer("BLOB-STORE-TIMER", true); this.leaderElector = leaderElector; } @@ -411,6 +411,9 @@ public class LocalFsBlobStore extends BlobStore { if (zkClient != null) { zkClient.close(); } + if (timer != null) { + timer.cancel();; + } } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/bbfd7cd4/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java index e8edd7d..2d853f3 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java @@ -71,7 +71,7 @@ public class DRPC implements AutoCloseable { //Waiting to be returned private final ConcurrentHashMap<String, OutstandingRequest> _requests = new ConcurrentHashMap<>(); - private final Timer _timer = new Timer(); + private final Timer timer = new Timer("DRPC-CLEANUP-TIMER", true); private final AtomicLong _ctr = new AtomicLong(0); private final IAuthorizer _auth; @@ -87,7 +87,7 @@ public class DRPC implements AutoCloseable { this.meterResultCalls = metricsRegistry.registerMeter("drpc:num-result-calls"); this.meterFailRequestCalls = metricsRegistry.registerMeter("drpc:num-failRequest-calls"); this.meterFetchRequestCalls = metricsRegistry.registerMeter("drpc:num-fetchRequest-calls"); - _timer.scheduleAtFixedRate(new TimerTask() { + timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { cleanupAll(timeoutMs, TIMED_OUT); @@ -241,7 +241,7 @@ public class DRPC implements AutoCloseable { @Override public void close() { - _timer.cancel(); + timer.cancel(); cleanupAll(0, SHUT_DOWN); } }