Apply heartbeat changes to diverged py/rb/js files
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2af791a7 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2af791a7 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2af791a7 Branch: refs/heads/0.9.3-branch Commit: 2af791a7f116812a396141bf737f959ea1b51f4c Parents: 6c09a9e Author: Jungtaek Lim <[email protected]> Authored: Sat Oct 11 23:02:37 2014 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Sat Oct 11 23:02:37 2014 +0900 ---------------------------------------------------------------------- .../storm-starter/multilang/resources/storm.js | 20 +++-- .../storm-starter/multilang/resources/storm.py | 83 ++++++++++++------ .../storm-starter/multilang/resources/storm.rb | 90 ++++++++++++++------ storm-core/src/multilang/py/storm.py | 4 +- storm-core/src/multilang/rb/storm.rb | 44 +++++----- 5 files changed, 159 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/2af791a7/examples/storm-starter/multilang/resources/storm.js ---------------------------------------------------------------------- diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 5c78072..623abf4 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -80,7 +80,7 @@ Storm.prototype.handleNewChunk = function(chunk) { } } return messages; - } +} Storm.prototype.isTaskIds = function(msg) { return (msg instanceof Array); @@ -243,13 +243,19 @@ BasicBolt.prototype.__emit = function(commandDetails) { BasicBolt.prototype.handleNewCommand = function(command) { var self = this; var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); + + if (tup.task === -1 && tup.stream === "__heartbeat") { + self.sync(); + return; + } + var callback = function(err) { - if (err) { - self.fail(tup, err); - return; - } - self.ack(tup); - } + if (err) { + self.fail(tup, err); + return; + } + self.ack(tup); + } this.process(tup, callback); } http://git-wip-us.apache.org/repos/asf/storm/blob/2af791a7/examples/storm-starter/multilang/resources/storm.py ---------------------------------------------------------------------- diff --git a/examples/storm-starter/multilang/resources/storm.py b/examples/storm-starter/multilang/resources/storm.py index fdf7751..acc02ad 100644 --- a/examples/storm-starter/multilang/resources/storm.py +++ b/examples/storm-starter/multilang/resources/storm.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- + # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -13,6 +15,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import sys import os import traceback @@ -30,10 +33,12 @@ json_decode = lambda x: json.loads(x) def readMsg(): msg = "" while True: - line = sys.stdin.readline()[0:-1] - if line == "end": + line = sys.stdin.readline() + if not line: + raise Exception('Read EOF from stdin') + if line[0:-1] == "end": break - msg = msg + line + "\n" + msg = msg + line return json_decode(msg[0:-1]) MODE = None @@ -80,14 +85,14 @@ def sync(): def sendpid(heartbeatdir): pid = os.getpid() sendMsgToParent({'pid':pid}) - open(heartbeatdir + "/" + str(pid), "w").close() + open(heartbeatdir + "/" + str(pid), "w").close() def emit(*args, **kwargs): __emit(*args, **kwargs) return readTaskIds() def emitDirect(task, *args, **kwargs): - kwargs['directTask'] = task + kwargs["directTask"] = task __emit(*args, **kwargs) def __emit(*args, **kwargs): @@ -109,7 +114,7 @@ def emitBolt(tup, stream=None, anchors = [], directTask=None): m["task"] = directTask m["tuple"] = tup sendMsgToParent(m) - + def emitSpout(tup, stream=None, id=None, directTask=None): m = {"command": "emit"} if id is not None: @@ -127,15 +132,36 @@ def ack(tup): def fail(tup): sendMsgToParent({"command": "fail", "id": tup.id}) -def log(msg): - sendMsgToParent({"command": "log", "msg": msg}) +def reportError(msg): + sendMsgToParent({"command": "error", "msg": msg}) + +def log(msg, level=2): + sendMsgToParent({"command": "log", "msg": msg, "level":level}) + +def logTrace(msg): + log(msg, 0) + +def logDebug(msg): + log(msg, 1) + +def logInfo(msg): + log(msg, 2) + +def logWarn(msg): + log(msg, 3) + +def logError(msg): + log(msg, 4) + +def rpcMetrics(name, params): + sendMsgToParent({"command": "metrics", "name": name, "params": params}) def initComponent(): setupInfo = readMsg() sendpid(setupInfo['pidDir']) return [setupInfo['conf'], setupInfo['context']] -class Tuple: +class Tuple(object): def __init__(self, id, component, stream, task, values): self.id = id self.component = component @@ -145,10 +171,13 @@ class Tuple: def __repr__(self): return '<%s%s>' % ( - self.__class__.__name__, - ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) + self.__class__.__name__, + ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) + + def is_heartbeat_tuple(self): + return self.task == -1 and self.stream == "__heartbeat" -class Bolt: +class Bolt(object): def initialize(self, stormconf, context): pass @@ -159,15 +188,18 @@ class Bolt: global MODE MODE = Bolt conf, context = initComponent() - self.initialize(conf, context) try: + self.initialize(conf, context) while True: tup = readTuple() - self.process(tup) + if tup.is_heartbeat_tuple(): + sync() + else: + self.process(tup) except Exception, e: - log(traceback.format_exc(e)) + reportError(traceback.format_exc(e)) -class BasicBolt: +class BasicBolt(object): def initialize(self, stormconf, context): pass @@ -179,17 +211,20 @@ class BasicBolt: MODE = Bolt global ANCHOR_TUPLE conf, context = initComponent() - self.initialize(conf, context) try: + self.initialize(conf, context) while True: tup = readTuple() - ANCHOR_TUPLE = tup - self.process(tup) - ack(tup) + if tup.is_heartbeat_tuple(): + sync() + else: + ANCHOR_TUPLE = tup + self.process(tup) + ack(tup) except Exception, e: - log(traceback.format_exc(e)) + reportError(traceback.format_exc(e)) -class Spout: +class Spout(object): def initialize(self, conf, context): pass @@ -206,8 +241,8 @@ class Spout: global MODE MODE = Spout conf, context = initComponent() - self.initialize(conf, context) try: + self.initialize(conf, context) while True: msg = readCommand() if msg["command"] == "next": @@ -218,4 +253,4 @@ class Spout: self.fail(msg["id"]) sync() except Exception, e: - log(traceback.format_exc(e)) + reportError(traceback.format_exc(e)) http://git-wip-us.apache.org/repos/asf/storm/blob/2af791a7/examples/storm-starter/multilang/resources/storm.rb ---------------------------------------------------------------------- diff --git a/examples/storm-starter/multilang/resources/storm.rb b/examples/storm-starter/multilang/resources/storm.rb index 985b412..816694e 100644 --- a/examples/storm-starter/multilang/resources/storm.rb +++ b/examples/storm-starter/multilang/resources/storm.rb @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- + # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -13,6 +15,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + require "rubygems" require "json" @@ -38,26 +41,26 @@ module Storm def read_task_ids Storm::Protocol.pending_taskids.shift || - begin - msg = read_message - until msg.is_a? Array - Storm::Protocol.pending_commands.push(msg) + begin msg = read_message + until msg.is_a? Array + Storm::Protocol.pending_commands.push(msg) + msg = read_message + end + msg end - msg - end end def read_command Storm::Protocol.pending_commands.shift || - begin - msg = read_message - while msg.is_a? Array - Storm::Protocol.pending_taskids.push(msg) + begin msg = read_message + while msg.is_a? Array + Storm::Protocol.pending_taskids.push(msg) + msg = read_message + end + msg end - msg - end end def send_msg_to_parent(msg) @@ -102,10 +105,10 @@ module Storm def emit(*args) case Storm::Protocol.mode - when 'spout' - emit_spout(*args) - when 'bolt' - emit_bolt(*args) + when 'spout' + emit_spout(*args) + when 'bolt' + emit_bolt(*args) end end @@ -117,8 +120,32 @@ module Storm send_msg_to_parent :command => :fail, :id => tup.id end - def log(msg) - send_msg_to_parent :command => :log, :msg => msg.to_s + def reportError(msg) + send_msg_to_parent :command => :error, :msg => msg.to_s + end + + def log(msg, level=2) + send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level + end + + def logTrace(msg) + log(msg, 0) + end + + def logDebug(msg) + log(msg, 1) + end + + def logInfo(msg) + log(msg, 2) + end + + def logWarn(msg) + log(msg, 3) + end + + def logError(msg) + log(msg, 4) end def handshake @@ -142,6 +169,10 @@ module Storm def self.from_hash(hash) Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple")) end + + def is_heartbeat + task == -1 and stream == '__heartbeat' + end end class Bolt @@ -156,10 +187,15 @@ module Storm prepare(*handshake) begin while true - process Tuple.from_hash(read_command) + tuple = Tuple.from_hash(read_command) + if tuple.is_heartbeat + sync + else + process tuple + end end rescue Exception => e - log 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n') + reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n') end end end @@ -183,17 +219,17 @@ module Storm while true msg = read_command case msg['command'] - when 'next' - nextTuple - when 'ack' - ack(msg['id']) - when 'fail' - fail(msg['id']) + when 'next' + nextTuple + when 'ack' + ack(msg['id']) + when 'fail' + fail(msg['id']) end sync end rescue Exception => e - log 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n') + reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n') end end end http://git-wip-us.apache.org/repos/asf/storm/blob/2af791a7/storm-core/src/multilang/py/storm.py ---------------------------------------------------------------------- diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py index c866858..acc02ad 100755 --- a/storm-core/src/multilang/py/storm.py +++ b/storm-core/src/multilang/py/storm.py @@ -171,8 +171,8 @@ class Tuple(object): def __repr__(self): return '<%s%s>' % ( - self.__class__.__name__, - ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) + self.__class__.__name__, + ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) def is_heartbeat_tuple(self): return self.task == -1 and self.stream == "__heartbeat" http://git-wip-us.apache.org/repos/asf/storm/blob/2af791a7/storm-core/src/multilang/rb/storm.rb ---------------------------------------------------------------------- diff --git a/storm-core/src/multilang/rb/storm.rb b/storm-core/src/multilang/rb/storm.rb index d177937..816694e 100644 --- a/storm-core/src/multilang/rb/storm.rb +++ b/storm-core/src/multilang/rb/storm.rb @@ -41,26 +41,26 @@ module Storm def read_task_ids Storm::Protocol.pending_taskids.shift || - begin - msg = read_message - until msg.is_a? Array - Storm::Protocol.pending_commands.push(msg) + begin msg = read_message + until msg.is_a? Array + Storm::Protocol.pending_commands.push(msg) + msg = read_message + end + msg end - msg - end end def read_command Storm::Protocol.pending_commands.shift || - begin - msg = read_message - while msg.is_a? Array - Storm::Protocol.pending_taskids.push(msg) + begin msg = read_message + while msg.is_a? Array + Storm::Protocol.pending_taskids.push(msg) + msg = read_message + end + msg end - msg - end end def send_msg_to_parent(msg) @@ -105,10 +105,10 @@ module Storm def emit(*args) case Storm::Protocol.mode - when 'spout' - emit_spout(*args) - when 'bolt' - emit_bolt(*args) + when 'spout' + emit_spout(*args) + when 'bolt' + emit_bolt(*args) end end @@ -219,12 +219,12 @@ module Storm while true msg = read_command case msg['command'] - when 'next' - nextTuple - when 'ack' - ack(msg['id']) - when 'fail' - fail(msg['id']) + when 'next' + nextTuple + when 'ack' + ack(msg['id']) + when 'fail' + fail(msg['id']) end sync end
