Repository: storm
Updated Branches:
  refs/heads/0.9.3-branch 69b5478fd -> a725b2397


STORM-513 check heartbeat from multilang subprocess

* Spout
** ShellSpout sends "next" to subprocess continuously
** subprocess sends "sync" to ShellSpout when "next" is received
** so we can treat "sync", or any messages to heartbeat
* Bolt
** ShellBolt sends tuples to subprocess if it's available
** so we need to send "heartbeat" tuple
** subprocess sends "sync" to ShellBolt when "heartbeat" tuple is
received


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ca5874cd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ca5874cd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ca5874cd

Branch: refs/heads/0.9.3-branch
Commit: ca5874cdf11af8d835335d228b643f28aeb3f9c3
Parents: 5aaea84
Author: Jungtaek Lim <[email protected]>
Authored: Wed Oct 8 22:48:01 2014 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Wed Oct 8 22:48:01 2014 +0900

----------------------------------------------------------------------
 storm-core/src/dev/resources/storm.js           |   6 +
 .../jvm/backtype/storm/spout/ShellSpout.java    |  67 +++++-
 .../src/jvm/backtype/storm/task/ShellBolt.java  | 207 +++++++++++++------
 storm-core/src/multilang/py/storm.py            |  17 +-
 storm-core/src/multilang/rb/storm.rb            |  11 +-
 5 files changed, 233 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ca5874cd/storm-core/src/dev/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.js 
b/storm-core/src/dev/resources/storm.js
index 5c78072..50cc920 100755
--- a/storm-core/src/dev/resources/storm.js
+++ b/storm-core/src/dev/resources/storm.js
@@ -243,6 +243,12 @@ BasicBolt.prototype.__emit = function(commandDetails) {
 BasicBolt.prototype.handleNewCommand = function(command) {
     var self = this;
     var tup = new Tuple(command["id"], command["comp"], command["stream"], 
command["task"], command["tuple"]);
+
+    if (tup.task == -1 && tup.stream == "__heartbeat") {
+        self.sync();
+        return;
+    }
+
     var callback = function(err) {
           if (err) {
               self.fail(tup, err);

http://git-wip-us.apache.org/repos/asf/storm/blob/ca5874cd/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java 
b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 9818f30..494ac6c 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -17,6 +17,7 @@
  */
 package backtype.storm.spout;
 
+import backtype.storm.Config;
 import backtype.storm.generated.ShellComponent;
 import backtype.storm.metric.api.IMetric;
 import backtype.storm.metric.api.rpc.IShellMetric;
@@ -26,8 +27,11 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.ShellProcess;
 import java.util.Map;
 import java.util.List;
-import java.io.IOException;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
 
+import clojure.lang.RT;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +47,11 @@ public class ShellSpout implements ISpout {
     
     private SpoutMsg _spoutMsg;
 
-    public ShellSpout(ShellComponent component) {
+       private int workerTimeoutMills;
+       private Timer heartBeatTimer;
+       private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
+
+       public ShellSpout(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
     }
 
@@ -56,13 +64,18 @@ public class ShellSpout implements ISpout {
         _collector = collector;
         _context = context;
 
+               workerTimeoutMills = 1000 * 
RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+
         _process = new ShellProcess(_command);
 
         Number subpid = _process.launch(stormConf, context);
         LOG.info("Launched subprocess with pid " + subpid);
+
+               heartBeatTimer = new Timer(context.getThisTaskId() + 
"-heartbeatTimer", true);
     }
 
     public void close() {
+               heartBeatTimer.cancel();
         _process.destroy();
     }
 
@@ -131,7 +144,10 @@ public class ShellSpout implements ISpout {
                 if (command == null) {
                     throw new IllegalArgumentException("Command not found in 
spout message: " + shellMsg);
                 }
-                if (command.equals("sync")) {
+
+                               setHeartbeat();
+
+                               if (command.equals("sync")) {
                     return;
                 } else if (command.equals("log")) {
                     handleLog(shellMsg);
@@ -160,7 +176,7 @@ public class ShellSpout implements ISpout {
         }
     }
 
-    private void handleLog(ShellMsg shellMsg) {
+       private void handleLog(ShellMsg shellMsg) {
         String msg = shellMsg.getMsg();
         msg = "ShellLog " + _process.getProcessInfoString() + " " + msg;
         ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel();
@@ -189,9 +205,52 @@ public class ShellSpout implements ISpout {
 
     @Override
     public void activate() {
+               LOG.info("Start checking heartbeat...");
+               // prevent timer to check heartbeat based on last thing before 
activate
+               setHeartbeat();
+               heartBeatTimer.scheduleAtFixedRate(new 
SpoutHeartbeatTimerTask(this), 1000, 1 * 1000);
     }
 
     @Override
     public void deactivate() {
+               heartBeatTimer.cancel();
     }
+
+       private void setHeartbeat() {
+               lastHeartbeatTimestamp.set(System.currentTimeMillis());
+       }
+
+       private long getLastHeartbeat() {
+               return lastHeartbeatTimestamp.get();
+       }
+
+       private void die(Throwable exception) {
+               heartBeatTimer.cancel();
+
+               LOG.error("Halting process: ShellSpout died.", exception);
+               _collector.reportError(exception);
+               System.exit(11);
+       }
+
+       private class SpoutHeartbeatTimerTask extends TimerTask {
+               private ShellSpout spout;
+
+               public SpoutHeartbeatTimerTask(ShellSpout spout) {
+                       this.spout = spout;
+               }
+
+               @Override
+               public void run() {
+                       long currentTimeMillis = System.currentTimeMillis();
+                       long lastHeartbeat = getLastHeartbeat();
+
+                       LOG.debug("current time : " + currentTimeMillis + ", 
last heartbeat : " + lastHeartbeat
+                                       + ", worker timeout (ms) : " + 
workerTimeoutMills);
+
+                       if (currentTimeMillis - lastHeartbeat > 
workerTimeoutMills) {
+                               spout.die(new RuntimeException("subprocess 
heartbeat timeout"));
+                       }
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ca5874cd/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java 
b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 9e90121..cbb8e2b 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -18,23 +18,26 @@
 package backtype.storm.task;
 
 import backtype.storm.Config;
+import backtype.storm.Constants;
 import backtype.storm.generated.ShellComponent;
 import backtype.storm.metric.api.IMetric;
 import backtype.storm.metric.api.rpc.IShellMetric;
 import backtype.storm.topology.ReportedFailedException;
 import backtype.storm.tuple.MessageId;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
 import backtype.storm.utils.ShellProcess;
 import backtype.storm.multilang.BoltMsg;
 import backtype.storm.multilang.ShellMsg;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
 import static java.util.concurrent.TimeUnit.SECONDS;
-import java.util.Map;
-import java.util.Random;
+
+import clojure.lang.RT;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +69,8 @@ import org.slf4j.LoggerFactory;
  * </pre>
  */
 public class ShellBolt implements IBolt {
-    public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class);
+       public static final String HEARTBEAT_STREAM_ID = "__heartbeat";
+       public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class);
     Process _subprocess;
     OutputCollector _collector;
     Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();
@@ -83,7 +87,11 @@ public class ShellBolt implements IBolt {
     
     private TopologyContext _context;
 
-    public ShellBolt(ShellComponent component) {
+       private int workerTimeoutMills;
+       private Timer heartBeatTimer;
+       private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
+
+       public ShellBolt(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
     }
 
@@ -102,6 +110,8 @@ public class ShellBolt implements IBolt {
 
         _context = context;
 
+               workerTimeoutMills = 1000 * 
RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+
         _process = new ShellProcess(_command);
 
         //subprocesses must send their pid first thing
@@ -109,61 +119,18 @@ public class ShellBolt implements IBolt {
         LOG.info("Launched subprocess with pid " + subpid);
 
         // reader
-        _readerThread = new Thread(new Runnable() {
-            public void run() {
-                while (_running) {
-                    try {
-                        ShellMsg shellMsg = _process.readShellMsg();
-
-                        String command = shellMsg.getCommand();
-                        if (command == null) {
-                            throw new IllegalArgumentException("Command not 
found in bolt message: " + shellMsg);
-                        }
-                        if(command.equals("ack")) {
-                            handleAck(shellMsg.getId());
-                        } else if (command.equals("fail")) {
-                            handleFail(shellMsg.getId());
-                        } else if (command.equals("error")) {
-                            handleError(shellMsg.getMsg());
-                        } else if (command.equals("log")) {
-                            handleLog(shellMsg);
-                        } else if (command.equals("emit")) {
-                            handleEmit(shellMsg);
-                        } else if (command.equals("metrics")) {
-                            handleMetrics(shellMsg);
-                        }
-                    } catch (InterruptedException e) {
-                    } catch (Throwable t) {
-                        die(t);
-                    }
-                }
-            }
-        });
-
+        _readerThread = new Thread(new BoltReaderRunnable());
         _readerThread.start();
 
-        _writerThread = new Thread(new Runnable() {
-            public void run() {
-                while (_running) {
-                    try {
-                        Object write = _pendingWrites.poll(1, SECONDS);
-                        if (write instanceof BoltMsg) {
-                            _process.writeBoltMsg((BoltMsg)write);
-                        } else if (write instanceof List<?>) {
-                            _process.writeTaskIds((List<Integer>)write);
-                        } else if (write != null) {
-                            throw new RuntimeException("Unknown class type to 
write: " + write.getClass().getName());
-                        }
-                    } catch (InterruptedException e) {
-                    } catch (Throwable t) {
-                        die(t);
-                    }
-                }
-            }
-        });
-
+        _writerThread = new Thread(new BoltWriterRunnable());
         _writerThread.start();
-    }
+
+               heartBeatTimer = new Timer(context.getThisTaskId() + 
"-heartbeatTimer", true);
+               heartBeatTimer.scheduleAtFixedRate(new 
BoltHeartbeatTimerTask(this), 1000, 1 * 1000);
+
+               LOG.info("Start checking heartbeat...");
+               setHeartbeat();
+       }
 
     public void execute(Tuple input) {
         if (_exception != null) {
@@ -174,12 +141,7 @@ public class ShellBolt implements IBolt {
         String genId = Long.toString(_rand.nextLong());
         _inputs.put(genId, input);
         try {
-            BoltMsg boltMsg = new BoltMsg();
-            boltMsg.setId(genId);
-            boltMsg.setComp(input.getSourceComponent());
-            boltMsg.setStream(input.getSourceStreamId());
-            boltMsg.setTask(input.getSourceTask());
-            boltMsg.setTuple(input.getValues());
+                       BoltMsg boltMsg = createBoltMessage(input, genId);
 
             _pendingWrites.put(boltMsg);
         } catch(InterruptedException e) {
@@ -188,8 +150,19 @@ public class ShellBolt implements IBolt {
         }
     }
 
-    public void cleanup() {
+       private BoltMsg createBoltMessage(Tuple input, String genId) {
+               BoltMsg boltMsg = new BoltMsg();
+               boltMsg.setId(genId);
+               boltMsg.setComp(input.getSourceComponent());
+               boltMsg.setStream(input.getSourceStreamId());
+               boltMsg.setTask(input.getSourceTask());
+               boltMsg.setTuple(input.getValues());
+               return boltMsg;
+       }
+
+       public void cleanup() {
         _running = false;
+               heartBeatTimer.cancel();
         _writerThread.interrupt();
         _readerThread.interrupt();
         _process.destroy();
@@ -197,6 +170,9 @@ public class ShellBolt implements IBolt {
     }
 
     private void handleAck(Object id) {
+               // FIXME : debug, should be removed
+               LOG.debug("Ack... id : " + id);
+
         Tuple acked = _inputs.remove(id);
         if(acked==null) {
             throw new RuntimeException("Acked a non-existent or already 
acked/failed id: " + id);
@@ -296,7 +272,15 @@ public class ShellBolt implements IBolt {
         }       
     }
 
-    private void die(Throwable exception) {
+       private void setHeartbeat() {
+               lastHeartbeatTimestamp.set(System.currentTimeMillis());
+       }
+
+       private long getLastHeartbeat() {
+               return lastHeartbeatTimestamp.get();
+       }
+
+       private void die(Throwable exception) {
         String processInfo = _process.getProcessInfoString() + 
_process.getProcessTerminationInfoString();
         _exception = new RuntimeException(processInfo, exception);
         LOG.error("Halting process: ShellBolt died.", exception);
@@ -305,4 +289,95 @@ public class ShellBolt implements IBolt {
             System.exit(11);
         }
     }
+
+       private class BoltHeartbeatTimerTask extends TimerTask {
+               private ShellBolt bolt;
+
+               public BoltHeartbeatTimerTask(ShellBolt bolt) {
+                       this.bolt = bolt;
+               }
+
+               @Override
+               public void run() {
+                       long currentTimeMillis = System.currentTimeMillis();
+                       long lastHeartbeat = getLastHeartbeat();
+
+                       LOG.debug("BOLT - current time : " + currentTimeMillis 
+ ", last heartbeat : " + lastHeartbeat
+                                       + ", worker timeout (ms) : " + 
workerTimeoutMills);
+
+                       if (currentTimeMillis - lastHeartbeat > 
workerTimeoutMills) {
+                               bolt.die(new RuntimeException("subprocess 
heartbeat timeout"));
+                       }
+
+                       String genId = Long.toString(_rand.nextLong());
+                       try {
+                               
_pendingWrites.put(createHeartbeatBoltMessage(genId));
+                       } catch(InterruptedException e) {
+                               String processInfo = 
_process.getProcessInfoString() + _process.getProcessTerminationInfoString();
+                               bolt.die(new RuntimeException("Error during 
multilang processing " + processInfo, e));
+                       }
+               }
+
+               private BoltMsg createHeartbeatBoltMessage(String genId) {
+                       BoltMsg msg = new BoltMsg();
+                       msg.setId(genId);
+                       msg.setTask(Constants.SYSTEM_TASK_ID);
+                       msg.setStream(HEARTBEAT_STREAM_ID);
+                       msg.setTuple(new ArrayList<Object>());
+                       return msg;
+               }
+       }
+
+       private class BoltReaderRunnable implements Runnable {
+               public void run() {
+                       while (_running) {
+                               try {
+                                       ShellMsg shellMsg = 
_process.readShellMsg();
+
+                                       String command = shellMsg.getCommand();
+                                       if (command == null) {
+                                               throw new 
IllegalArgumentException("Command not found in bolt message: " + shellMsg);
+                                       }
+                                       if (command.equals("sync")) {
+                                               setHeartbeat();
+                                       } else if(command.equals("ack")) {
+                                               handleAck(shellMsg.getId());
+                                       } else if (command.equals("fail")) {
+                                               handleFail(shellMsg.getId());
+                                       } else if (command.equals("error")) {
+                                               handleError(shellMsg.getMsg());
+                                       } else if (command.equals("log")) {
+                                               handleLog(shellMsg);
+                                       } else if (command.equals("emit")) {
+                                               handleEmit(shellMsg);
+                                       } else if (command.equals("metrics")) {
+                                               handleMetrics(shellMsg);
+                                       }
+                               } catch (InterruptedException e) {
+                               } catch (Throwable t) {
+                                       die(t);
+                               }
+                       }
+               }
+       }
+
+       private class BoltWriterRunnable implements Runnable {
+               public void run() {
+                       while (_running) {
+                               try {
+                                       Object write = _pendingWrites.poll(1, 
SECONDS);
+                                       if (write instanceof BoltMsg) {
+                                               _process.writeBoltMsg((BoltMsg) 
write);
+                                       } else if (write instanceof List<?>) {
+                                               
_process.writeTaskIds((List<Integer>)write);
+                                       } else if (write != null) {
+                                               throw new 
RuntimeException("Unknown class type to write: " + write.getClass().getName());
+                                       }
+                               } catch (InterruptedException e) {
+                               } catch (Throwable t) {
+                                       die(t);
+                               }
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ca5874cd/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py 
b/storm-core/src/multilang/py/storm.py
index d2a3082..c866858 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -174,6 +174,9 @@ class Tuple(object):
                 self.__class__.__name__,
                 ''.join(' %s=%r' % (k, self.__dict__[k]) for k in 
sorted(self.__dict__.keys())))
 
+    def is_heartbeat_tuple(self):
+        return self.task == -1 and self.stream == "__heartbeat"
+
 class Bolt(object):
     def initialize(self, stormconf, context):
         pass
@@ -189,7 +192,10 @@ class Bolt(object):
             self.initialize(conf, context)
             while True:
                 tup = readTuple()
-                self.process(tup)
+                if tup.is_heartbeat_tuple():
+                    sync()
+                else:
+                    self.process(tup)
         except Exception, e:
             reportError(traceback.format_exc(e))
 
@@ -209,9 +215,12 @@ class BasicBolt(object):
             self.initialize(conf, context)
             while True:
                 tup = readTuple()
-                ANCHOR_TUPLE = tup
-                self.process(tup)
-                ack(tup)
+                if tup.is_heartbeat_tuple():
+                    sync()
+                else:
+                    ANCHOR_TUPLE = tup
+                    self.process(tup)
+                    ack(tup)
         except Exception, e:
             reportError(traceback.format_exc(e))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ca5874cd/storm-core/src/multilang/rb/storm.rb
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/rb/storm.rb 
b/storm-core/src/multilang/rb/storm.rb
index 17232d1..d177937 100644
--- a/storm-core/src/multilang/rb/storm.rb
+++ b/storm-core/src/multilang/rb/storm.rb
@@ -169,6 +169,10 @@ module Storm
     def self.from_hash(hash)
       Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
     end
+
+    def is_heartbeat
+      task == -1 and stream == '__heartbeat'
+    end
   end
 
   class Bolt
@@ -183,7 +187,12 @@ module Storm
       prepare(*handshake)
       begin
         while true
-          process Tuple.from_hash(read_command)
+          tuple = Tuple.from_hash(read_command)
+          if tuple.is_heartbeat
+            sync
+          else
+            process tuple
+          end
         end
       rescue Exception => e
         reportError 'Exception in bolt: ' + e.message + ' - ' + 
e.backtrace.join('\n')

Reply via email to