STORM-1549: Update branch to use java StormCommons

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e0b874f9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e0b874f9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e0b874f9

Branch: refs/heads/master
Commit: e0b874f99314e45ec8f74f9b08fe1d742454d419
Parents: e2d118a
Author: Stig Døssing <[email protected]>
Authored: Tue Mar 15 21:15:19 2016 +0100
Committer: Stig Døssing <[email protected]>
Committed: Tue Mar 15 21:15:19 2016 +0100

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/daemon/executor.clj     | 4 ++--
 storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java | 4 ++++
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e0b874f9/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 075f72b..086955f 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -529,7 +529,7 @@
                                       spout-obj (:object task-data)]
                                   (when (instance? ICredentialsListener 
spout-obj)
                                     (.setCredentials spout-obj (.getValue 
tuple 0))))
-                              ACKER-RESET-TIMEOUT-STREAM-ID 
+                              Acker/ACKER_RESET_TIMEOUT_STREAM_ID 
                                 (let [id (.getValue tuple 0)
                                       pending-for-id (.get pending id)]
                                    (when pending-for-id
@@ -838,7 +838,7 @@
                                           (^void resetTimeout [this ^Tuple 
tuple]
                                             (fast-list-iter [root (.. tuple 
getMessageId getAnchors)]
                                                             
(task/send-unanchored task-data
-                                                                               
   ACKER-RESET-TIMEOUT-STREAM-ID
+                                                                               
   Acker/ACKER_RESET_TIMEOUT_STREAM_ID
                                                                                
   [root])))
                                           (reportError [this error]
                                             (report-error error))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/e0b874f9/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java 
b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 85568ec..7792052 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -257,6 +257,7 @@ public class StormCommon {
         for(String id : boltIds) {
             inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID), 
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
             inputs.put(Utils.getGlobalStreamId(id, 
Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+            inputs.put(Utils.getGlobalStreamId(id, 
Acker.ACKER_RESET_TIMEOUT_STREAM_ID), 
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
         }
         return inputs;
     }
@@ -275,6 +276,7 @@ public class StormCommon {
         Map<String, StreamInfo> outputStreams = new HashMap<String, 
StreamInfo>();
         outputStreams.put(Acker.ACKER_ACK_STREAM_ID, 
Thrift.directOutputFields(Arrays.asList("id")));
         outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, 
Thrift.directOutputFields(Arrays.asList("id")));
+        outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, 
Thrift.directOutputFields(Arrays.asList("id")));
 
         Map<String, Object> ackerConf = new HashMap<String, Object>();
         ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
@@ -286,6 +288,7 @@ public class StormCommon {
             ComponentCommon common = bolt.get_common();
             common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, 
Thrift.outputFields(Arrays.asList("id", "ack-val")));
             common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, 
Thrift.outputFields(Arrays.asList("id")));
+            common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, 
Thrift.outputFields(Arrays.asList("id")));
         }
 
         for (SpoutSpec spout : topology.get_spouts().values()) {
@@ -296,6 +299,7 @@ public class StormCommon {
             common.put_to_streams(Acker.ACKER_INIT_STREAM_ID, 
Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
             
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, 
Acker.ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping());
             
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, 
Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping());
+            
common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, 
Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareDirectGrouping());
         }
 
         topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);

Reply via email to