fires heartbeat request without using _pendingWrites * size of _pendingWrites is Config.TOPOLOGY_SHELLBOLT_MAX_PENDING * If users set this to 1 or strict, heartbeat request can affect performance
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/12d31b4d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/12d31b4d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/12d31b4d Branch: refs/heads/0.9.3-branch Commit: 12d31b4dd7b067fec5ccba794f640a3e77f1b05e Parents: b6fda45 Author: Jungtaek Lim <[email protected]> Authored: Fri Oct 24 19:28:37 2014 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Fri Oct 24 19:28:37 2014 +0900 ---------------------------------------------------------------------- .../src/jvm/backtype/storm/task/ShellBolt.java | 36 ++++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/12d31b4d/storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index d52b8d9..308ec67 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import static java.util.concurrent.TimeUnit.SECONDS; @@ -87,6 +88,7 @@ public class ShellBolt implements IBolt { private int workerTimeoutMills; private ScheduledExecutorService heartBeatExecutorService; private AtomicLong lastHeartbeatTimestamp = new AtomicLong(); + private AtomicBoolean sendHeartbeatFlag = new AtomicBoolean(false); public ShellBolt(ShellComponent component) { this(component.get_execution_command(), component.get_script()); @@ -303,23 +305,10 @@ public class ShellBolt implements IBolt { bolt.die(new RuntimeException("subprocess heartbeat timeout")); } - String genId = Long.toString(_rand.nextLong()); - try { - _pendingWrites.put(createHeartbeatBoltMessage(genId)); - } catch(InterruptedException e) { - String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString(); - bolt.die(new RuntimeException("Error during multilang processing " + processInfo, e)); - } + sendHeartbeatFlag.compareAndSet(false, true); } - private BoltMsg createHeartbeatBoltMessage(String genId) { - BoltMsg msg = new BoltMsg(); - msg.setId(genId); - msg.setTask(Constants.SYSTEM_TASK_ID); - msg.setStream(HEARTBEAT_STREAM_ID); - msg.setTuple(new ArrayList<Object>()); - return msg; - } + } private class BoltReaderRunnable implements Runnable { @@ -359,6 +348,14 @@ public class ShellBolt implements IBolt { public void run() { while (_running) { try { + if (sendHeartbeatFlag.get()) { + LOG.debug("BOLT - sending heartbeat request to subprocess"); + + String genId = Long.toString(_rand.nextLong()); + _process.writeBoltMsg(createHeartbeatBoltMessage(genId)); + sendHeartbeatFlag.compareAndSet(true, false); + } + Object write = _pendingWrites.poll(1, SECONDS); if (write instanceof BoltMsg) { _process.writeBoltMsg((BoltMsg) write); @@ -373,5 +370,14 @@ public class ShellBolt implements IBolt { } } } + + private BoltMsg createHeartbeatBoltMessage(String genId) { + BoltMsg msg = new BoltMsg(); + msg.setId(genId); + msg.setTask(Constants.SYSTEM_TASK_ID); + msg.setStream(HEARTBEAT_STREAM_ID); + msg.setTuple(new ArrayList<Object>()); + return msg; + } } }
