Change Timer to ScheduledExecutorService

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/31443dcf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/31443dcf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/31443dcf

Branch: refs/heads/master
Commit: 31443dcff2d3d1096621ade38c6b61292cea339d
Parents: 2af791a
Author: Jungtaek Lim <[email protected]>
Authored: Sat Oct 11 23:50:17 2014 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Sat Oct 11 23:50:17 2014 +0900

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/spout/ShellSpout.java    | 16 +++++++++-------
 .../src/jvm/backtype/storm/task/ShellBolt.java      | 12 +++++++-----
 2 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/31443dcf/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java 
b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index f257b42..a91a1ba 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import clojure.lang.RT;
@@ -48,7 +50,7 @@ public class ShellSpout implements ISpout {
     private SpoutMsg _spoutMsg;
 
     private int workerTimeoutMills;
-    private Timer heartBeatTimer;
+    private ScheduledThreadPoolExecutor heartBeatExecutor;
     private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
 
     public ShellSpout(ShellComponent component) {
@@ -71,11 +73,11 @@ public class ShellSpout implements ISpout {
         Number subpid = _process.launch(stormConf, context);
         LOG.info("Launched subprocess with pid " + subpid);
 
-        heartBeatTimer = new Timer(context.getThisTaskId() + 
"-heartbeatTimer", true);
+        heartBeatExecutor = new ScheduledThreadPoolExecutor(5);
     }
 
     public void close() {
-        heartBeatTimer.cancel();
+        heartBeatExecutor.shutdownNow();
         _process.destroy();
     }
 
@@ -208,12 +210,12 @@ public class ShellSpout implements ISpout {
         LOG.info("Start checking heartbeat...");
         // prevent timer to check heartbeat based on last thing before activate
         setHeartbeat();
-        heartBeatTimer.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 
1000, 1 * 1000);
+        heartBeatExecutor.scheduleAtFixedRate(new 
SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
     }
 
     @Override
     public void deactivate() {
-        heartBeatTimer.cancel();
+        heartBeatExecutor.shutdownNow();
     }
 
     private void setHeartbeat() {
@@ -225,7 +227,7 @@ public class ShellSpout implements ISpout {
     }
 
     private void die(Throwable exception) {
-        heartBeatTimer.cancel();
+        heartBeatExecutor.shutdownNow();
 
         LOG.error("Halting process: ShellSpout died.", exception);
         _collector.reportError(exception);
@@ -245,7 +247,7 @@ public class ShellSpout implements ISpout {
             long currentTimeMillis = System.currentTimeMillis();
             long lastHeartbeat = getLastHeartbeat();
 
-            LOG.debug("current time : {}, last heartbeat : {}, worker timeout 
(ms) : ",
+            LOG.debug("current time : {}, last heartbeat : {}, worker timeout 
(ms) : {}",
                     currentTimeMillis, lastHeartbeat, workerTimeoutMills);
 
             if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {

http://git-wip-us.apache.org/repos/asf/storm/blob/31443dcf/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java 
b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 0d9d706..63ed21f 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -33,6 +33,8 @@ import backtype.storm.multilang.ShellMsg;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -88,7 +90,7 @@ public class ShellBolt implements IBolt {
     private TopologyContext _context;
 
     private int workerTimeoutMills;
-    private Timer heartBeatTimer;
+    private ScheduledThreadPoolExecutor heartBeatExecutor;
     private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
 
     public ShellBolt(ShellComponent component) {
@@ -125,8 +127,8 @@ public class ShellBolt implements IBolt {
         _writerThread = new Thread(new BoltWriterRunnable());
         _writerThread.start();
 
-        heartBeatTimer = new Timer(context.getThisTaskId() + 
"-heartbeatTimer", true);
-        heartBeatTimer.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 
1000, 1 * 1000);
+        heartBeatExecutor = new ScheduledThreadPoolExecutor(5);
+        heartBeatExecutor.scheduleAtFixedRate(new 
BoltHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
 
         LOG.info("Start checking heartbeat...");
         setHeartbeat();
@@ -162,7 +164,7 @@ public class ShellBolt implements IBolt {
 
     public void cleanup() {
         _running = false;
-        heartBeatTimer.cancel();
+        heartBeatExecutor.shutdownNow();
         _writerThread.interrupt();
         _readerThread.interrupt();
         _process.destroy();
@@ -299,7 +301,7 @@ public class ShellBolt implements IBolt {
             long currentTimeMillis = System.currentTimeMillis();
             long lastHeartbeat = getLastHeartbeat();
 
-            LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker 
timeout (ms) : ",
+            LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker 
timeout (ms) : {}",
                     currentTimeMillis, lastHeartbeat, workerTimeoutMills);
 
             if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {

Reply via email to