STORM-1549: Update branch to use java StormCommons
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e0b874f9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e0b874f9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e0b874f9 Branch: refs/heads/master Commit: e0b874f99314e45ec8f74f9b08fe1d742454d419 Parents: e2d118a Author: Stig Døssing <[email protected]> Authored: Tue Mar 15 21:15:19 2016 +0100 Committer: Stig Døssing <[email protected]> Committed: Tue Mar 15 21:15:19 2016 +0100 ---------------------------------------------------------------------- storm-core/src/clj/org/apache/storm/daemon/executor.clj | 4 ++-- storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e0b874f9/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 075f72b..086955f 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -529,7 +529,7 @@ spout-obj (:object task-data)] (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj (.getValue tuple 0)))) - ACKER-RESET-TIMEOUT-STREAM-ID + Acker/ACKER_RESET_TIMEOUT_STREAM_ID (let [id (.getValue tuple 0) pending-for-id (.get pending id)] (when pending-for-id @@ -838,7 +838,7 @@ (^void resetTimeout [this ^Tuple tuple] (fast-list-iter [root (.. tuple getMessageId getAnchors)] (task/send-unanchored task-data - ACKER-RESET-TIMEOUT-STREAM-ID + Acker/ACKER_RESET_TIMEOUT_STREAM_ID [root]))) (reportError [this error] (report-error error)))))) http://git-wip-us.apache.org/repos/asf/storm/blob/e0b874f9/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java index 85568ec..7792052 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java @@ -257,6 +257,7 @@ public class StormCommon { for(String id : boltIds) { inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); + inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); } return inputs; } @@ -275,6 +276,7 @@ public class StormCommon { Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>(); outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id"))); outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id"))); + outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id"))); Map<String, Object> ackerConf = new HashMap<String, Object>(); ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum); @@ -286,6 +288,7 @@ public class StormCommon { ComponentCommon common = bolt.get_common(); common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val"))); common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id"))); + common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id"))); } for (SpoutSpec spout : topology.get_spouts().values()) { @@ -296,6 +299,7 @@ public class StormCommon { common.put_to_streams(Acker.ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task"))); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping()); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping()); + common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareDirectGrouping()); } topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
