sync copies of storm.rb

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a9333360
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a9333360
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a9333360

Branch: refs/heads/master
Commit: a9333360678107954209a2f04e0e4836230ad2ad
Parents: b9322a5
Author: P. Taylor Goetz <ptgo...@gmail.com>
Authored: Fri Nov 14 16:46:43 2014 -0500
Committer: P. Taylor Goetz <ptgo...@gmail.com>
Committed: Fri Nov 14 16:46:43 2014 -0500

----------------------------------------------------------------------
 storm-core/src/dev/resources/storm.rb | 55 +++++++++++++++++-------------
 1 file changed, 32 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a9333360/storm-core/src/dev/resources/storm.rb
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.rb 
b/storm-core/src/dev/resources/storm.rb
index 17232d1..816694e 100644
--- a/storm-core/src/dev/resources/storm.rb
+++ b/storm-core/src/dev/resources/storm.rb
@@ -41,26 +41,26 @@ module Storm
 
     def read_task_ids
       Storm::Protocol.pending_taskids.shift ||
-        begin
-          msg = read_message
-          until msg.is_a? Array
-            Storm::Protocol.pending_commands.push(msg)
+          begin
             msg = read_message
+            until msg.is_a? Array
+              Storm::Protocol.pending_commands.push(msg)
+              msg = read_message
+            end
+            msg
           end
-          msg
-        end
     end
 
     def read_command
       Storm::Protocol.pending_commands.shift ||
-        begin
-          msg = read_message
-          while msg.is_a? Array
-            Storm::Protocol.pending_taskids.push(msg)
+          begin
             msg = read_message
+            while msg.is_a? Array
+              Storm::Protocol.pending_taskids.push(msg)
+              msg = read_message
+            end
+            msg
           end
-          msg
-        end
     end
 
     def send_msg_to_parent(msg)
@@ -105,10 +105,10 @@ module Storm
 
     def emit(*args)
       case Storm::Protocol.mode
-      when 'spout'
-        emit_spout(*args)
-      when 'bolt'
-        emit_bolt(*args)
+        when 'spout'
+          emit_spout(*args)
+        when 'bolt'
+          emit_bolt(*args)
       end
     end
 
@@ -169,6 +169,10 @@ module Storm
     def self.from_hash(hash)
       Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
     end
+
+    def is_heartbeat
+      task == -1 and stream == '__heartbeat'
+    end
   end
 
   class Bolt
@@ -183,7 +187,12 @@ module Storm
       prepare(*handshake)
       begin
         while true
-          process Tuple.from_hash(read_command)
+          tuple = Tuple.from_hash(read_command)
+          if tuple.is_heartbeat
+            sync
+          else
+            process tuple
+          end
         end
       rescue Exception => e
         reportError 'Exception in bolt: ' + e.message + ' - ' + 
e.backtrace.join('\n')
@@ -210,12 +219,12 @@ module Storm
         while true
           msg = read_command
           case msg['command']
-          when 'next'
-            nextTuple
-          when 'ack'
-            ack(msg['id'])
-          when 'fail'
-            fail(msg['id'])
+            when 'next'
+              nextTuple
+            when 'ack'
+              ack(msg['id'])
+            when 'fail'
+              fail(msg['id'])
           end
           sync
         end

Reply via email to