Github user itaifrenkel commented on a diff in the pull request:
https://github.com/apache/storm/pull/286#discussion_r19326582
--- Diff: storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---
@@ -305,4 +283,95 @@ private void die(Throwable exception) {
System.exit(11);
}
}
+
+ private class BoltHeartbeatTimerTask extends TimerTask {
+ private ShellBolt bolt;
+
+ public BoltHeartbeatTimerTask(ShellBolt bolt) {
+ this.bolt = bolt;
+ }
+
+ @Override
+ public void run() {
+ long currentTimeMillis = System.currentTimeMillis();
+ long lastHeartbeat = getLastHeartbeat();
+
+ LOG.debug("BOLT - current time : {}, last heartbeat : {},
worker timeout (ms) : {}",
+ currentTimeMillis, lastHeartbeat, workerTimeoutMills);
+
+ if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
+ bolt.die(new RuntimeException("subprocess heartbeat
timeout"));
+ }
+
+ String genId = Long.toString(_rand.nextLong());
+ try {
+ _pendingWrites.put(createHeartbeatBoltMessage(genId));
--- End diff --
I reread the code and think that we need here just to flip an atomicboolean
(a priority queue for heartbeats of size 1). The reason is that the size of the
_pendingWrites queue is Config.TOPOLOGY_SHELLBOLT_MAX_PENDING which by its name
is the number of real tuples to retrieve from the disruptor queue. We set it to
1 to optimize for shortest latency... which would cause this thread to
block.... which means you cannot share this thread between bolts event if you
wanted too... which we need to think if this is an issue or not. A stronger
argument in favor of a priority queue for heartbeats is that the rate of
heartbeat messages will not be skewed by the length of the queue.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---