Apply heartbeat changes to diverged py/rb/js files

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2af791a7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2af791a7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2af791a7

Branch: refs/heads/0.9.3-branch
Commit: 2af791a7f116812a396141bf737f959ea1b51f4c
Parents: 6c09a9e
Author: Jungtaek Lim <[email protected]>
Authored: Sat Oct 11 23:02:37 2014 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Sat Oct 11 23:02:37 2014 +0900

----------------------------------------------------------------------
 .../storm-starter/multilang/resources/storm.js  | 20 +++--
 .../storm-starter/multilang/resources/storm.py  | 83 ++++++++++++------
 .../storm-starter/multilang/resources/storm.rb  | 90 ++++++++++++++------
 storm-core/src/multilang/py/storm.py            |  4 +-
 storm-core/src/multilang/rb/storm.rb            | 44 +++++-----
 5 files changed, 159 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2af791a7/examples/storm-starter/multilang/resources/storm.js
----------------------------------------------------------------------
diff --git a/examples/storm-starter/multilang/resources/storm.js 
b/examples/storm-starter/multilang/resources/storm.js
index 5c78072..623abf4 100755
--- a/examples/storm-starter/multilang/resources/storm.js
+++ b/examples/storm-starter/multilang/resources/storm.js
@@ -80,7 +80,7 @@ Storm.prototype.handleNewChunk = function(chunk) {
         }
     }
     return messages;
- }
+}
 
 Storm.prototype.isTaskIds = function(msg) {
     return (msg instanceof Array);
@@ -243,13 +243,19 @@ BasicBolt.prototype.__emit = function(commandDetails) {
 BasicBolt.prototype.handleNewCommand = function(command) {
     var self = this;
     var tup = new Tuple(command["id"], command["comp"], command["stream"], 
command["task"], command["tuple"]);
+
+    if (tup.task === -1 && tup.stream === "__heartbeat") {
+        self.sync();
+        return;
+    }
+
     var callback = function(err) {
-          if (err) {
-              self.fail(tup, err);
-              return;
-          }
-          self.ack(tup);
-      }
+        if (err) {
+            self.fail(tup, err);
+            return;
+        }
+        self.ack(tup);
+    }
     this.process(tup, callback);
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2af791a7/examples/storm-starter/multilang/resources/storm.py
----------------------------------------------------------------------
diff --git a/examples/storm-starter/multilang/resources/storm.py 
b/examples/storm-starter/multilang/resources/storm.py
index fdf7751..acc02ad 100644
--- a/examples/storm-starter/multilang/resources/storm.py
+++ b/examples/storm-starter/multilang/resources/storm.py
@@ -1,3 +1,5 @@
+# -*- coding: utf-8 -*-
+
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -13,6 +15,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 import sys
 import os
 import traceback
@@ -30,10 +33,12 @@ json_decode = lambda x: json.loads(x)
 def readMsg():
     msg = ""
     while True:
-        line = sys.stdin.readline()[0:-1]
-        if line == "end":
+        line = sys.stdin.readline()
+        if not line:
+            raise Exception('Read EOF from stdin')
+        if line[0:-1] == "end":
             break
-        msg = msg + line + "\n"
+        msg = msg + line
     return json_decode(msg[0:-1])
 
 MODE = None
@@ -80,14 +85,14 @@ def sync():
 def sendpid(heartbeatdir):
     pid = os.getpid()
     sendMsgToParent({'pid':pid})
-    open(heartbeatdir + "/" + str(pid), "w").close()    
+    open(heartbeatdir + "/" + str(pid), "w").close()
 
 def emit(*args, **kwargs):
     __emit(*args, **kwargs)
     return readTaskIds()
 
 def emitDirect(task, *args, **kwargs):
-    kwargs['directTask'] = task
+    kwargs["directTask"] = task
     __emit(*args, **kwargs)
 
 def __emit(*args, **kwargs):
@@ -109,7 +114,7 @@ def emitBolt(tup, stream=None, anchors = [], 
directTask=None):
         m["task"] = directTask
     m["tuple"] = tup
     sendMsgToParent(m)
-    
+
 def emitSpout(tup, stream=None, id=None, directTask=None):
     m = {"command": "emit"}
     if id is not None:
@@ -127,15 +132,36 @@ def ack(tup):
 def fail(tup):
     sendMsgToParent({"command": "fail", "id": tup.id})
 
-def log(msg):
-    sendMsgToParent({"command": "log", "msg": msg})
+def reportError(msg):
+    sendMsgToParent({"command": "error", "msg": msg})
+
+def log(msg, level=2):
+    sendMsgToParent({"command": "log", "msg": msg, "level":level})
+
+def logTrace(msg):
+    log(msg, 0)
+
+def logDebug(msg):
+    log(msg, 1)
+
+def logInfo(msg):
+    log(msg, 2)
+
+def logWarn(msg):
+    log(msg, 3)
+
+def logError(msg):
+    log(msg, 4)
+
+def rpcMetrics(name, params):
+    sendMsgToParent({"command": "metrics", "name": name, "params": params})
 
 def initComponent():
     setupInfo = readMsg()
     sendpid(setupInfo['pidDir'])
     return [setupInfo['conf'], setupInfo['context']]
 
-class Tuple:
+class Tuple(object):
     def __init__(self, id, component, stream, task, values):
         self.id = id
         self.component = component
@@ -145,10 +171,13 @@ class Tuple:
 
     def __repr__(self):
         return '<%s%s>' % (
-                self.__class__.__name__,
-                ''.join(' %s=%r' % (k, self.__dict__[k]) for k in 
sorted(self.__dict__.keys())))
+            self.__class__.__name__,
+            ''.join(' %s=%r' % (k, self.__dict__[k]) for k in 
sorted(self.__dict__.keys())))
+
+    def is_heartbeat_tuple(self):
+        return self.task == -1 and self.stream == "__heartbeat"
 
-class Bolt:
+class Bolt(object):
     def initialize(self, stormconf, context):
         pass
 
@@ -159,15 +188,18 @@ class Bolt:
         global MODE
         MODE = Bolt
         conf, context = initComponent()
-        self.initialize(conf, context)
         try:
+            self.initialize(conf, context)
             while True:
                 tup = readTuple()
-                self.process(tup)
+                if tup.is_heartbeat_tuple():
+                    sync()
+                else:
+                    self.process(tup)
         except Exception, e:
-            log(traceback.format_exc(e))
+            reportError(traceback.format_exc(e))
 
-class BasicBolt:
+class BasicBolt(object):
     def initialize(self, stormconf, context):
         pass
 
@@ -179,17 +211,20 @@ class BasicBolt:
         MODE = Bolt
         global ANCHOR_TUPLE
         conf, context = initComponent()
-        self.initialize(conf, context)
         try:
+            self.initialize(conf, context)
             while True:
                 tup = readTuple()
-                ANCHOR_TUPLE = tup
-                self.process(tup)
-                ack(tup)
+                if tup.is_heartbeat_tuple():
+                    sync()
+                else:
+                    ANCHOR_TUPLE = tup
+                    self.process(tup)
+                    ack(tup)
         except Exception, e:
-            log(traceback.format_exc(e))
+            reportError(traceback.format_exc(e))
 
-class Spout:
+class Spout(object):
     def initialize(self, conf, context):
         pass
 
@@ -206,8 +241,8 @@ class Spout:
         global MODE
         MODE = Spout
         conf, context = initComponent()
-        self.initialize(conf, context)
         try:
+            self.initialize(conf, context)
             while True:
                 msg = readCommand()
                 if msg["command"] == "next":
@@ -218,4 +253,4 @@ class Spout:
                     self.fail(msg["id"])
                 sync()
         except Exception, e:
-            log(traceback.format_exc(e))
+            reportError(traceback.format_exc(e))

http://git-wip-us.apache.org/repos/asf/storm/blob/2af791a7/examples/storm-starter/multilang/resources/storm.rb
----------------------------------------------------------------------
diff --git a/examples/storm-starter/multilang/resources/storm.rb 
b/examples/storm-starter/multilang/resources/storm.rb
index 985b412..816694e 100644
--- a/examples/storm-starter/multilang/resources/storm.rb
+++ b/examples/storm-starter/multilang/resources/storm.rb
@@ -1,3 +1,5 @@
+# -*- coding: utf-8 -*-
+
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -13,6 +15,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 require "rubygems"
 require "json"
 
@@ -38,26 +41,26 @@ module Storm
 
     def read_task_ids
       Storm::Protocol.pending_taskids.shift ||
-        begin
-          msg = read_message
-          until msg.is_a? Array
-            Storm::Protocol.pending_commands.push(msg)
+          begin
             msg = read_message
+            until msg.is_a? Array
+              Storm::Protocol.pending_commands.push(msg)
+              msg = read_message
+            end
+            msg
           end
-          msg
-        end
     end
 
     def read_command
       Storm::Protocol.pending_commands.shift ||
-        begin
-          msg = read_message
-          while msg.is_a? Array
-            Storm::Protocol.pending_taskids.push(msg)
+          begin
             msg = read_message
+            while msg.is_a? Array
+              Storm::Protocol.pending_taskids.push(msg)
+              msg = read_message
+            end
+            msg
           end
-          msg
-        end
     end
 
     def send_msg_to_parent(msg)
@@ -102,10 +105,10 @@ module Storm
 
     def emit(*args)
       case Storm::Protocol.mode
-      when 'spout'
-        emit_spout(*args)
-      when 'bolt'
-        emit_bolt(*args)
+        when 'spout'
+          emit_spout(*args)
+        when 'bolt'
+          emit_bolt(*args)
       end
     end
 
@@ -117,8 +120,32 @@ module Storm
       send_msg_to_parent :command => :fail, :id => tup.id
     end
 
-    def log(msg)
-      send_msg_to_parent :command => :log, :msg => msg.to_s
+    def reportError(msg)
+      send_msg_to_parent :command => :error, :msg => msg.to_s
+    end
+
+    def log(msg, level=2)
+      send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
+    end
+
+    def logTrace(msg)
+      log(msg, 0)
+    end
+
+    def logDebug(msg)
+      log(msg, 1)
+    end
+
+    def logInfo(msg)
+      log(msg, 2)
+    end
+
+    def logWarn(msg)
+      log(msg, 3)
+    end
+
+    def logError(msg)
+      log(msg, 4)
     end
 
     def handshake
@@ -142,6 +169,10 @@ module Storm
     def self.from_hash(hash)
       Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
     end
+
+    def is_heartbeat
+      task == -1 and stream == '__heartbeat'
+    end
   end
 
   class Bolt
@@ -156,10 +187,15 @@ module Storm
       prepare(*handshake)
       begin
         while true
-          process Tuple.from_hash(read_command)
+          tuple = Tuple.from_hash(read_command)
+          if tuple.is_heartbeat
+            sync
+          else
+            process tuple
+          end
         end
       rescue Exception => e
-        log 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
+        reportError 'Exception in bolt: ' + e.message + ' - ' + 
e.backtrace.join('\n')
       end
     end
   end
@@ -183,17 +219,17 @@ module Storm
         while true
           msg = read_command
           case msg['command']
-          when 'next'
-            nextTuple
-          when 'ack'
-            ack(msg['id'])
-          when 'fail'
-            fail(msg['id'])
+            when 'next'
+              nextTuple
+            when 'ack'
+              ack(msg['id'])
+            when 'fail'
+              fail(msg['id'])
           end
           sync
         end
       rescue Exception => e
-        log 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
+        reportError 'Exception in spout: ' + e.message + ' - ' + 
e.backtrace.join('\n')
       end
     end
   end

http://git-wip-us.apache.org/repos/asf/storm/blob/2af791a7/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py 
b/storm-core/src/multilang/py/storm.py
index c866858..acc02ad 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -171,8 +171,8 @@ class Tuple(object):
 
     def __repr__(self):
         return '<%s%s>' % (
-                self.__class__.__name__,
-                ''.join(' %s=%r' % (k, self.__dict__[k]) for k in 
sorted(self.__dict__.keys())))
+            self.__class__.__name__,
+            ''.join(' %s=%r' % (k, self.__dict__[k]) for k in 
sorted(self.__dict__.keys())))
 
     def is_heartbeat_tuple(self):
         return self.task == -1 and self.stream == "__heartbeat"

http://git-wip-us.apache.org/repos/asf/storm/blob/2af791a7/storm-core/src/multilang/rb/storm.rb
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/rb/storm.rb 
b/storm-core/src/multilang/rb/storm.rb
index d177937..816694e 100644
--- a/storm-core/src/multilang/rb/storm.rb
+++ b/storm-core/src/multilang/rb/storm.rb
@@ -41,26 +41,26 @@ module Storm
 
     def read_task_ids
       Storm::Protocol.pending_taskids.shift ||
-        begin
-          msg = read_message
-          until msg.is_a? Array
-            Storm::Protocol.pending_commands.push(msg)
+          begin
             msg = read_message
+            until msg.is_a? Array
+              Storm::Protocol.pending_commands.push(msg)
+              msg = read_message
+            end
+            msg
           end
-          msg
-        end
     end
 
     def read_command
       Storm::Protocol.pending_commands.shift ||
-        begin
-          msg = read_message
-          while msg.is_a? Array
-            Storm::Protocol.pending_taskids.push(msg)
+          begin
             msg = read_message
+            while msg.is_a? Array
+              Storm::Protocol.pending_taskids.push(msg)
+              msg = read_message
+            end
+            msg
           end
-          msg
-        end
     end
 
     def send_msg_to_parent(msg)
@@ -105,10 +105,10 @@ module Storm
 
     def emit(*args)
       case Storm::Protocol.mode
-      when 'spout'
-        emit_spout(*args)
-      when 'bolt'
-        emit_bolt(*args)
+        when 'spout'
+          emit_spout(*args)
+        when 'bolt'
+          emit_bolt(*args)
       end
     end
 
@@ -219,12 +219,12 @@ module Storm
         while true
           msg = read_command
           case msg['command']
-          when 'next'
-            nextTuple
-          when 'ack'
-            ack(msg['id'])
-          when 'fail'
-            fail(msg['id'])
+            when 'next'
+              nextTuple
+            when 'ack'
+              ack(msg['id'])
+            when 'fail'
+              fail(msg['id'])
           end
           sync
         end

Reply via email to