fires heartbeat request without using _pendingWrites

* size of _pendingWrites is Config.TOPOLOGY_SHELLBOLT_MAX_PENDING
* If users set this to 1 or strict, heartbeat request can affect performance


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/12d31b4d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/12d31b4d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/12d31b4d

Branch: refs/heads/0.9.3-branch
Commit: 12d31b4dd7b067fec5ccba794f640a3e77f1b05e
Parents: b6fda45
Author: Jungtaek Lim <[email protected]>
Authored: Fri Oct 24 19:28:37 2014 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Fri Oct 24 19:28:37 2014 +0900

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/task/ShellBolt.java  | 36 ++++++++++++--------
 1 file changed, 21 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/12d31b4d/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java 
b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index d52b8d9..308ec67 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -87,6 +88,7 @@ public class ShellBolt implements IBolt {
     private int workerTimeoutMills;
     private ScheduledExecutorService heartBeatExecutorService;
     private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
+    private AtomicBoolean sendHeartbeatFlag = new AtomicBoolean(false);
 
     public ShellBolt(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
@@ -303,23 +305,10 @@ public class ShellBolt implements IBolt {
                 bolt.die(new RuntimeException("subprocess heartbeat timeout"));
             }
 
-            String genId = Long.toString(_rand.nextLong());
-            try {
-                _pendingWrites.put(createHeartbeatBoltMessage(genId));
-            } catch(InterruptedException e) {
-                String processInfo = _process.getProcessInfoString() + 
_process.getProcessTerminationInfoString();
-                bolt.die(new RuntimeException("Error during multilang 
processing " + processInfo, e));
-            }
+            sendHeartbeatFlag.compareAndSet(false, true);
         }
 
-        private BoltMsg createHeartbeatBoltMessage(String genId) {
-            BoltMsg msg = new BoltMsg();
-            msg.setId(genId);
-            msg.setTask(Constants.SYSTEM_TASK_ID);
-            msg.setStream(HEARTBEAT_STREAM_ID);
-            msg.setTuple(new ArrayList<Object>());
-            return msg;
-        }
+
     }
 
     private class BoltReaderRunnable implements Runnable {
@@ -359,6 +348,14 @@ public class ShellBolt implements IBolt {
         public void run() {
             while (_running) {
                 try {
+                    if (sendHeartbeatFlag.get()) {
+                        LOG.debug("BOLT - sending heartbeat request to 
subprocess");
+
+                        String genId = Long.toString(_rand.nextLong());
+                        
_process.writeBoltMsg(createHeartbeatBoltMessage(genId));
+                        sendHeartbeatFlag.compareAndSet(true, false);
+                    }
+
                     Object write = _pendingWrites.poll(1, SECONDS);
                     if (write instanceof BoltMsg) {
                         _process.writeBoltMsg((BoltMsg) write);
@@ -373,5 +370,14 @@ public class ShellBolt implements IBolt {
                 }
             }
         }
+
+        private BoltMsg createHeartbeatBoltMessage(String genId) {
+            BoltMsg msg = new BoltMsg();
+            msg.setId(genId);
+            msg.setTask(Constants.SYSTEM_TASK_ID);
+            msg.setStream(HEARTBEAT_STREAM_ID);
+            msg.setTuple(new ArrayList<Object>());
+            return msg;
+        }
     }
 }

Reply via email to