Change Timer to ScheduledExecutorService
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/31443dcf Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/31443dcf Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/31443dcf Branch: refs/heads/master Commit: 31443dcff2d3d1096621ade38c6b61292cea339d Parents: 2af791a Author: Jungtaek Lim <[email protected]> Authored: Sat Oct 11 23:50:17 2014 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Sat Oct 11 23:50:17 2014 +0900 ---------------------------------------------------------------------- .../src/jvm/backtype/storm/spout/ShellSpout.java | 16 +++++++++------- .../src/jvm/backtype/storm/task/ShellBolt.java | 12 +++++++----- 2 files changed, 16 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/31443dcf/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java index f257b42..a91a1ba 100644 --- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.List; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import clojure.lang.RT; @@ -48,7 +50,7 @@ public class ShellSpout implements ISpout { private SpoutMsg _spoutMsg; private int workerTimeoutMills; - private Timer heartBeatTimer; + private ScheduledThreadPoolExecutor heartBeatExecutor; private AtomicLong lastHeartbeatTimestamp = new AtomicLong(); public ShellSpout(ShellComponent component) { @@ -71,11 +73,11 @@ public class ShellSpout implements ISpout { Number subpid = _process.launch(stormConf, context); LOG.info("Launched subprocess with pid " + subpid); - heartBeatTimer = new Timer(context.getThisTaskId() + "-heartbeatTimer", true); + heartBeatExecutor = new ScheduledThreadPoolExecutor(5); } public void close() { - heartBeatTimer.cancel(); + heartBeatExecutor.shutdownNow(); _process.destroy(); } @@ -208,12 +210,12 @@ public class ShellSpout implements ISpout { LOG.info("Start checking heartbeat..."); // prevent timer to check heartbeat based on last thing before activate setHeartbeat(); - heartBeatTimer.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1000, 1 * 1000); + heartBeatExecutor.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS); } @Override public void deactivate() { - heartBeatTimer.cancel(); + heartBeatExecutor.shutdownNow(); } private void setHeartbeat() { @@ -225,7 +227,7 @@ public class ShellSpout implements ISpout { } private void die(Throwable exception) { - heartBeatTimer.cancel(); + heartBeatExecutor.shutdownNow(); LOG.error("Halting process: ShellSpout died.", exception); _collector.reportError(exception); @@ -245,7 +247,7 @@ public class ShellSpout implements ISpout { long currentTimeMillis = System.currentTimeMillis(); long lastHeartbeat = getLastHeartbeat(); - LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : ", + LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : {}", currentTimeMillis, lastHeartbeat, workerTimeoutMills); if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) { http://git-wip-us.apache.org/repos/asf/storm/blob/31443dcf/storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index 0d9d706..63ed21f 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -33,6 +33,8 @@ import backtype.storm.multilang.ShellMsg; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static java.util.concurrent.TimeUnit.SECONDS; @@ -88,7 +90,7 @@ public class ShellBolt implements IBolt { private TopologyContext _context; private int workerTimeoutMills; - private Timer heartBeatTimer; + private ScheduledThreadPoolExecutor heartBeatExecutor; private AtomicLong lastHeartbeatTimestamp = new AtomicLong(); public ShellBolt(ShellComponent component) { @@ -125,8 +127,8 @@ public class ShellBolt implements IBolt { _writerThread = new Thread(new BoltWriterRunnable()); _writerThread.start(); - heartBeatTimer = new Timer(context.getThisTaskId() + "-heartbeatTimer", true); - heartBeatTimer.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1000, 1 * 1000); + heartBeatExecutor = new ScheduledThreadPoolExecutor(5); + heartBeatExecutor.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS); LOG.info("Start checking heartbeat..."); setHeartbeat(); @@ -162,7 +164,7 @@ public class ShellBolt implements IBolt { public void cleanup() { _running = false; - heartBeatTimer.cancel(); + heartBeatExecutor.shutdownNow(); _writerThread.interrupt(); _readerThread.interrupt(); _process.destroy(); @@ -299,7 +301,7 @@ public class ShellBolt implements IBolt { long currentTimeMillis = System.currentTimeMillis(); long lastHeartbeat = getLastHeartbeat(); - LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : ", + LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : {}", currentTimeMillis, lastHeartbeat, workerTimeoutMills); if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
