Repository: storm Updated Branches: refs/heads/0.9.3-branch 69b5478fd -> a725b2397
STORM-513 check heartbeat from multilang subprocess * Spout ** ShellSpout sends "next" to subprocess continuously ** subprocess sends "sync" to ShellSpout when "next" is received ** so we can treat "sync", or any messages to heartbeat * Bolt ** ShellBolt sends tuples to subprocess if it's available ** so we need to send "heartbeat" tuple ** subprocess sends "sync" to ShellBolt when "heartbeat" tuple is received Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ca5874cd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ca5874cd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ca5874cd Branch: refs/heads/0.9.3-branch Commit: ca5874cdf11af8d835335d228b643f28aeb3f9c3 Parents: 5aaea84 Author: Jungtaek Lim <[email protected]> Authored: Wed Oct 8 22:48:01 2014 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Wed Oct 8 22:48:01 2014 +0900 ---------------------------------------------------------------------- storm-core/src/dev/resources/storm.js | 6 + .../jvm/backtype/storm/spout/ShellSpout.java | 67 +++++- .../src/jvm/backtype/storm/task/ShellBolt.java | 207 +++++++++++++------ storm-core/src/multilang/py/storm.py | 17 +- storm-core/src/multilang/rb/storm.rb | 11 +- 5 files changed, 233 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/ca5874cd/storm-core/src/dev/resources/storm.js ---------------------------------------------------------------------- diff --git a/storm-core/src/dev/resources/storm.js b/storm-core/src/dev/resources/storm.js index 5c78072..50cc920 100755 --- a/storm-core/src/dev/resources/storm.js +++ b/storm-core/src/dev/resources/storm.js @@ -243,6 +243,12 @@ BasicBolt.prototype.__emit = function(commandDetails) { BasicBolt.prototype.handleNewCommand = function(command) { var self = this; var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); + + if (tup.task == -1 && tup.stream == "__heartbeat") { + self.sync(); + return; + } + var callback = function(err) { if (err) { self.fail(tup, err); http://git-wip-us.apache.org/repos/asf/storm/blob/ca5874cd/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java index 9818f30..494ac6c 100644 --- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java @@ -17,6 +17,7 @@ */ package backtype.storm.spout; +import backtype.storm.Config; import backtype.storm.generated.ShellComponent; import backtype.storm.metric.api.IMetric; import backtype.storm.metric.api.rpc.IShellMetric; @@ -26,8 +27,11 @@ import backtype.storm.task.TopologyContext; import backtype.storm.utils.ShellProcess; import java.util.Map; import java.util.List; -import java.io.IOException; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicLong; +import clojure.lang.RT; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +47,11 @@ public class ShellSpout implements ISpout { private SpoutMsg _spoutMsg; - public ShellSpout(ShellComponent component) { + private int workerTimeoutMills; + private Timer heartBeatTimer; + private AtomicLong lastHeartbeatTimestamp = new AtomicLong(); + + public ShellSpout(ShellComponent component) { this(component.get_execution_command(), component.get_script()); } @@ -56,13 +64,18 @@ public class ShellSpout implements ISpout { _collector = collector; _context = context; + workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); + _process = new ShellProcess(_command); Number subpid = _process.launch(stormConf, context); LOG.info("Launched subprocess with pid " + subpid); + + heartBeatTimer = new Timer(context.getThisTaskId() + "-heartbeatTimer", true); } public void close() { + heartBeatTimer.cancel(); _process.destroy(); } @@ -131,7 +144,10 @@ public class ShellSpout implements ISpout { if (command == null) { throw new IllegalArgumentException("Command not found in spout message: " + shellMsg); } - if (command.equals("sync")) { + + setHeartbeat(); + + if (command.equals("sync")) { return; } else if (command.equals("log")) { handleLog(shellMsg); @@ -160,7 +176,7 @@ public class ShellSpout implements ISpout { } } - private void handleLog(ShellMsg shellMsg) { + private void handleLog(ShellMsg shellMsg) { String msg = shellMsg.getMsg(); msg = "ShellLog " + _process.getProcessInfoString() + " " + msg; ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel(); @@ -189,9 +205,52 @@ public class ShellSpout implements ISpout { @Override public void activate() { + LOG.info("Start checking heartbeat..."); + // prevent timer to check heartbeat based on last thing before activate + setHeartbeat(); + heartBeatTimer.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1000, 1 * 1000); } @Override public void deactivate() { + heartBeatTimer.cancel(); } + + private void setHeartbeat() { + lastHeartbeatTimestamp.set(System.currentTimeMillis()); + } + + private long getLastHeartbeat() { + return lastHeartbeatTimestamp.get(); + } + + private void die(Throwable exception) { + heartBeatTimer.cancel(); + + LOG.error("Halting process: ShellSpout died.", exception); + _collector.reportError(exception); + System.exit(11); + } + + private class SpoutHeartbeatTimerTask extends TimerTask { + private ShellSpout spout; + + public SpoutHeartbeatTimerTask(ShellSpout spout) { + this.spout = spout; + } + + @Override + public void run() { + long currentTimeMillis = System.currentTimeMillis(); + long lastHeartbeat = getLastHeartbeat(); + + LOG.debug("current time : " + currentTimeMillis + ", last heartbeat : " + lastHeartbeat + + ", worker timeout (ms) : " + workerTimeoutMills); + + if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) { + spout.die(new RuntimeException("subprocess heartbeat timeout")); + } + } + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/ca5874cd/storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index 9e90121..cbb8e2b 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -18,23 +18,26 @@ package backtype.storm.task; import backtype.storm.Config; +import backtype.storm.Constants; import backtype.storm.generated.ShellComponent; import backtype.storm.metric.api.IMetric; import backtype.storm.metric.api.rpc.IShellMetric; import backtype.storm.topology.ReportedFailedException; import backtype.storm.tuple.MessageId; import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.TupleImpl; import backtype.storm.utils.ShellProcess; import backtype.storm.multilang.BoltMsg; import backtype.storm.multilang.ShellMsg; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + import static java.util.concurrent.TimeUnit.SECONDS; -import java.util.Map; -import java.util.Random; + +import clojure.lang.RT; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +69,8 @@ import org.slf4j.LoggerFactory; * </pre> */ public class ShellBolt implements IBolt { - public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class); + public static final String HEARTBEAT_STREAM_ID = "__heartbeat"; + public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class); Process _subprocess; OutputCollector _collector; Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>(); @@ -83,7 +87,11 @@ public class ShellBolt implements IBolt { private TopologyContext _context; - public ShellBolt(ShellComponent component) { + private int workerTimeoutMills; + private Timer heartBeatTimer; + private AtomicLong lastHeartbeatTimestamp = new AtomicLong(); + + public ShellBolt(ShellComponent component) { this(component.get_execution_command(), component.get_script()); } @@ -102,6 +110,8 @@ public class ShellBolt implements IBolt { _context = context; + workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); + _process = new ShellProcess(_command); //subprocesses must send their pid first thing @@ -109,61 +119,18 @@ public class ShellBolt implements IBolt { LOG.info("Launched subprocess with pid " + subpid); // reader - _readerThread = new Thread(new Runnable() { - public void run() { - while (_running) { - try { - ShellMsg shellMsg = _process.readShellMsg(); - - String command = shellMsg.getCommand(); - if (command == null) { - throw new IllegalArgumentException("Command not found in bolt message: " + shellMsg); - } - if(command.equals("ack")) { - handleAck(shellMsg.getId()); - } else if (command.equals("fail")) { - handleFail(shellMsg.getId()); - } else if (command.equals("error")) { - handleError(shellMsg.getMsg()); - } else if (command.equals("log")) { - handleLog(shellMsg); - } else if (command.equals("emit")) { - handleEmit(shellMsg); - } else if (command.equals("metrics")) { - handleMetrics(shellMsg); - } - } catch (InterruptedException e) { - } catch (Throwable t) { - die(t); - } - } - } - }); - + _readerThread = new Thread(new BoltReaderRunnable()); _readerThread.start(); - _writerThread = new Thread(new Runnable() { - public void run() { - while (_running) { - try { - Object write = _pendingWrites.poll(1, SECONDS); - if (write instanceof BoltMsg) { - _process.writeBoltMsg((BoltMsg)write); - } else if (write instanceof List<?>) { - _process.writeTaskIds((List<Integer>)write); - } else if (write != null) { - throw new RuntimeException("Unknown class type to write: " + write.getClass().getName()); - } - } catch (InterruptedException e) { - } catch (Throwable t) { - die(t); - } - } - } - }); - + _writerThread = new Thread(new BoltWriterRunnable()); _writerThread.start(); - } + + heartBeatTimer = new Timer(context.getThisTaskId() + "-heartbeatTimer", true); + heartBeatTimer.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1000, 1 * 1000); + + LOG.info("Start checking heartbeat..."); + setHeartbeat(); + } public void execute(Tuple input) { if (_exception != null) { @@ -174,12 +141,7 @@ public class ShellBolt implements IBolt { String genId = Long.toString(_rand.nextLong()); _inputs.put(genId, input); try { - BoltMsg boltMsg = new BoltMsg(); - boltMsg.setId(genId); - boltMsg.setComp(input.getSourceComponent()); - boltMsg.setStream(input.getSourceStreamId()); - boltMsg.setTask(input.getSourceTask()); - boltMsg.setTuple(input.getValues()); + BoltMsg boltMsg = createBoltMessage(input, genId); _pendingWrites.put(boltMsg); } catch(InterruptedException e) { @@ -188,8 +150,19 @@ public class ShellBolt implements IBolt { } } - public void cleanup() { + private BoltMsg createBoltMessage(Tuple input, String genId) { + BoltMsg boltMsg = new BoltMsg(); + boltMsg.setId(genId); + boltMsg.setComp(input.getSourceComponent()); + boltMsg.setStream(input.getSourceStreamId()); + boltMsg.setTask(input.getSourceTask()); + boltMsg.setTuple(input.getValues()); + return boltMsg; + } + + public void cleanup() { _running = false; + heartBeatTimer.cancel(); _writerThread.interrupt(); _readerThread.interrupt(); _process.destroy(); @@ -197,6 +170,9 @@ public class ShellBolt implements IBolt { } private void handleAck(Object id) { + // FIXME : debug, should be removed + LOG.debug("Ack... id : " + id); + Tuple acked = _inputs.remove(id); if(acked==null) { throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id); @@ -296,7 +272,15 @@ public class ShellBolt implements IBolt { } } - private void die(Throwable exception) { + private void setHeartbeat() { + lastHeartbeatTimestamp.set(System.currentTimeMillis()); + } + + private long getLastHeartbeat() { + return lastHeartbeatTimestamp.get(); + } + + private void die(Throwable exception) { String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString(); _exception = new RuntimeException(processInfo, exception); LOG.error("Halting process: ShellBolt died.", exception); @@ -305,4 +289,95 @@ public class ShellBolt implements IBolt { System.exit(11); } } + + private class BoltHeartbeatTimerTask extends TimerTask { + private ShellBolt bolt; + + public BoltHeartbeatTimerTask(ShellBolt bolt) { + this.bolt = bolt; + } + + @Override + public void run() { + long currentTimeMillis = System.currentTimeMillis(); + long lastHeartbeat = getLastHeartbeat(); + + LOG.debug("BOLT - current time : " + currentTimeMillis + ", last heartbeat : " + lastHeartbeat + + ", worker timeout (ms) : " + workerTimeoutMills); + + if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) { + bolt.die(new RuntimeException("subprocess heartbeat timeout")); + } + + String genId = Long.toString(_rand.nextLong()); + try { + _pendingWrites.put(createHeartbeatBoltMessage(genId)); + } catch(InterruptedException e) { + String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString(); + bolt.die(new RuntimeException("Error during multilang processing " + processInfo, e)); + } + } + + private BoltMsg createHeartbeatBoltMessage(String genId) { + BoltMsg msg = new BoltMsg(); + msg.setId(genId); + msg.setTask(Constants.SYSTEM_TASK_ID); + msg.setStream(HEARTBEAT_STREAM_ID); + msg.setTuple(new ArrayList<Object>()); + return msg; + } + } + + private class BoltReaderRunnable implements Runnable { + public void run() { + while (_running) { + try { + ShellMsg shellMsg = _process.readShellMsg(); + + String command = shellMsg.getCommand(); + if (command == null) { + throw new IllegalArgumentException("Command not found in bolt message: " + shellMsg); + } + if (command.equals("sync")) { + setHeartbeat(); + } else if(command.equals("ack")) { + handleAck(shellMsg.getId()); + } else if (command.equals("fail")) { + handleFail(shellMsg.getId()); + } else if (command.equals("error")) { + handleError(shellMsg.getMsg()); + } else if (command.equals("log")) { + handleLog(shellMsg); + } else if (command.equals("emit")) { + handleEmit(shellMsg); + } else if (command.equals("metrics")) { + handleMetrics(shellMsg); + } + } catch (InterruptedException e) { + } catch (Throwable t) { + die(t); + } + } + } + } + + private class BoltWriterRunnable implements Runnable { + public void run() { + while (_running) { + try { + Object write = _pendingWrites.poll(1, SECONDS); + if (write instanceof BoltMsg) { + _process.writeBoltMsg((BoltMsg) write); + } else if (write instanceof List<?>) { + _process.writeTaskIds((List<Integer>)write); + } else if (write != null) { + throw new RuntimeException("Unknown class type to write: " + write.getClass().getName()); + } + } catch (InterruptedException e) { + } catch (Throwable t) { + die(t); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/ca5874cd/storm-core/src/multilang/py/storm.py ---------------------------------------------------------------------- diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py index d2a3082..c866858 100755 --- a/storm-core/src/multilang/py/storm.py +++ b/storm-core/src/multilang/py/storm.py @@ -174,6 +174,9 @@ class Tuple(object): self.__class__.__name__, ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) + def is_heartbeat_tuple(self): + return self.task == -1 and self.stream == "__heartbeat" + class Bolt(object): def initialize(self, stormconf, context): pass @@ -189,7 +192,10 @@ class Bolt(object): self.initialize(conf, context) while True: tup = readTuple() - self.process(tup) + if tup.is_heartbeat_tuple(): + sync() + else: + self.process(tup) except Exception, e: reportError(traceback.format_exc(e)) @@ -209,9 +215,12 @@ class BasicBolt(object): self.initialize(conf, context) while True: tup = readTuple() - ANCHOR_TUPLE = tup - self.process(tup) - ack(tup) + if tup.is_heartbeat_tuple(): + sync() + else: + ANCHOR_TUPLE = tup + self.process(tup) + ack(tup) except Exception, e: reportError(traceback.format_exc(e)) http://git-wip-us.apache.org/repos/asf/storm/blob/ca5874cd/storm-core/src/multilang/rb/storm.rb ---------------------------------------------------------------------- diff --git a/storm-core/src/multilang/rb/storm.rb b/storm-core/src/multilang/rb/storm.rb index 17232d1..d177937 100644 --- a/storm-core/src/multilang/rb/storm.rb +++ b/storm-core/src/multilang/rb/storm.rb @@ -169,6 +169,10 @@ module Storm def self.from_hash(hash) Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple")) end + + def is_heartbeat + task == -1 and stream == '__heartbeat' + end end class Bolt @@ -183,7 +187,12 @@ module Storm prepare(*handshake) begin while true - process Tuple.from_hash(read_command) + tuple = Tuple.from_hash(read_command) + if tuple.is_heartbeat + sync + else + process tuple + end end rescue Exception => e reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
