Repository: storm
Updated Branches:
  refs/heads/master 7f52aecb1 -> 4d15d4c38


STORM-1549: Add support for resetting tuple timeout from bolts via the 
OutputCollector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d36be51a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d36be51a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d36be51a

Branch: refs/heads/master
Commit: d36be51a39abb03ac47e01eb2e1fda31f9f9110b
Parents: d42c437
Author: Stig Rohde Døssing <[email protected]>
Authored: Sun Feb 14 02:39:42 2016 +0100
Committer: Stig Døssing <[email protected]>
Committed: Tue Mar 1 23:52:36 2016 +0100

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/clojure.clj        |  3 ++
 .../src/clj/org/apache/storm/daemon/common.clj  |  8 +++
 .../clj/org/apache/storm/daemon/executor.clj    | 11 +++-
 .../clj/org/apache/storm/internal/clojure.clj   |  3 ++
 .../storm/coordination/CoordinatedBolt.java     |  4 ++
 .../src/jvm/org/apache/storm/daemon/Acker.java  | 15 +++---
 .../org/apache/storm/task/IOutputCollector.java |  1 +
 .../org/apache/storm/task/OutputCollector.java  | 10 ++++
 .../storm/topology/BasicOutputCollector.java    |  4 ++
 .../storm/topology/IBasicOutputCollector.java   |  2 +
 .../trident/topology/TridentBoltExecutor.java   |  4 ++
 .../org/apache/storm/integration_test.clj       | 53 ++++++++++++++++++--
 12 files changed, 108 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-clojure/src/clj/org/apache/storm/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/src/clj/org/apache/storm/clojure.clj 
b/storm-clojure/src/clj/org/apache/storm/clojure.clj
index 9e1836f..607fc24 100644
--- a/storm-clojure/src/clj/org/apache/storm/clojure.clj
+++ b/storm-clojure/src/clj/org/apache/storm/clojure.clj
@@ -179,6 +179,9 @@
 (defn fail! [collector ^Tuple tuple]
   (.fail ^OutputCollector (:output-collector collector) tuple))
 
+(defn reset-timeout! [collector ^Tuple tuple]
+  (.resetTimeout ^OutputCollector (:output-collector collector) tuple))
+
 (defn report-error! [collector ^Tuple tuple]
   (.reportError ^OutputCollector (:output-collector collector) tuple))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj 
b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 65cf233..49b0bb9 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -51,6 +51,7 @@
 (def ACKER-INIT-STREAM-ID Acker/ACKER_INIT_STREAM_ID)
 (def ACKER-ACK-STREAM-ID Acker/ACKER_ACK_STREAM_ID)
 (def ACKER-FAIL-STREAM-ID Acker/ACKER_FAIL_STREAM_ID)
+(def ACKER-RESET-TIMEOUT-STREAM-ID Acker/ACKER_RESET_TIMEOUT_STREAM_ID)
 
 (def SYSTEM-STREAM-ID "__system")
 
@@ -202,6 +203,8 @@
                              {(Utils/getGlobalStreamId id ACKER-ACK-STREAM-ID)
                               (Thrift/prepareFieldsGrouping ["id"])
                               (Utils/getGlobalStreamId id ACKER-FAIL-STREAM-ID)
+                              (Thrift/prepareFieldsGrouping ["id"])
+                              (Utils/getGlobalStreamId id 
ACKER-RESET-TIMEOUT-STREAM-ID)
                               (Thrift/prepareFieldsGrouping ["id"])}
                              ))]
     (merge spout-inputs bolt-inputs)))
@@ -233,6 +236,7 @@
                                                         (mk-acker-bolt)
                                                         {ACKER-ACK-STREAM-ID 
(Thrift/directOutputFields ["id"])
                                                          ACKER-FAIL-STREAM-ID 
(Thrift/directOutputFields ["id"])
+                                                         
ACKER-RESET-TIMEOUT-STREAM-ID (Thrift/directOutputFields ["id"])
                                                         }
                                                         (Integer. 
num-executors)
                                                         {TOPOLOGY-TASKS 
num-executors
@@ -242,6 +246,7 @@
            (do
              (.put_to_streams common ACKER-ACK-STREAM-ID (Thrift/outputFields 
["id" "ack-val"]))
              (.put_to_streams common ACKER-FAIL-STREAM-ID (Thrift/outputFields 
["id"]))
+             (.put_to_streams common ACKER-RESET-TIMEOUT-STREAM-ID 
(Thrift/outputFields ["id"]))
              ))
     (dofor [[_ spout] (.get_spouts ret)
             :let [common (.get_common spout)
@@ -258,6 +263,9 @@
         (.put_to_inputs common
                         (GlobalStreamId. ACKER-COMPONENT-ID 
ACKER-FAIL-STREAM-ID)
                         (Thrift/prepareDirectGrouping))
+        (.put_to_inputs common
+                        (GlobalStreamId. ACKER-COMPONENT-ID 
ACKER-RESET-TIMEOUT-STREAM-ID)
+                        (Thrift/prepareDirectGrouping))
         ))
     (.put_to_bolts ret "__acker" acker-bolt)
     ))

http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 9ff93f8..de32544 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -529,6 +529,11 @@
                                       spout-obj (:object task-data)]
                                   (when (instance? ICredentialsListener 
spout-obj)
                                     (.setCredentials spout-obj (.getValue 
tuple 0))))
+                              ACKER-RESET-TIMEOUT-STREAM-ID 
+                                (let [id (.getValue tuple 0)
+                                      pending-for-id (.get pending id)]
+                                   (when pending-for-id
+                                     (.put pending id pending-for-id))) 
                               (let [id (.getValue tuple 0)
                                     [stored-task-id spout-id 
tuple-finished-info start-time-ms] (.remove pending id)]
                                 (when spout-id
@@ -830,9 +835,13 @@
                                                                           
(.getSourceComponent tuple)
                                                                           
(.getSourceStreamId tuple)
                                                                           
delta))))
+                                          (^void resetTimeout [this ^Tuple 
tuple]
+                                            (fast-list-iter [root (.. tuple 
getMessageId getAnchors)]
+                                                            
(task/send-unanchored task-data
+                                                                               
   ACKER-RESET-TIMEOUT-STREAM-ID
+                                                                               
   [root])))
                                           (reportError [this error]
                                             (report-error error))))))
-
                            (reset! open-or-prepare-was-called? true)
                            (log-message "Prepared bolt " component-id ":" 
(keys task-datas))
                            (setup-metrics! executor-data)

http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/clj/org/apache/storm/internal/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/internal/clojure.clj 
b/storm-core/src/clj/org/apache/storm/internal/clojure.clj
index 3f29757..f27ac04 100644
--- a/storm-core/src/clj/org/apache/storm/internal/clojure.clj
+++ b/storm-core/src/clj/org/apache/storm/internal/clojure.clj
@@ -179,6 +179,9 @@
 (defn fail! [collector ^Tuple tuple]
   (.fail ^OutputCollector (:output-collector collector) tuple))
 
+(defn reset-timeout! [collector ^Tuple tuple]
+  (.resetTimeout ^OutputCollector (:output-collector collector) tuple))
+
 (defn report-error! [collector ^Tuple tuple]
   (.reportError ^OutputCollector (:output-collector collector) tuple))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java 
b/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
index ee66b09..15ac5e2 100644
--- a/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
@@ -124,6 +124,10 @@ public class CoordinatedBolt implements IRichBolt {
             checkFinishId(tuple, TupleType.REGULAR);
             _delegate.fail(tuple);
         }
+
+        public void resetTimeout(Tuple tuple) {
+            _delegate.resetTimeout(tuple);
+        }
         
         public void reportError(Throwable error) {
             _delegate.reportError(error);

http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java 
b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
index 7d05e24..eb14af7 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
@@ -40,6 +40,7 @@ public class Acker implements IBolt {
     public static final String ACKER_INIT_STREAM_ID = "__ack_init";
     public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
     public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
+    public static final String ACKER_RESET_TIMEOUT_STREAM_ID = 
"__ack_reset_timeout";
 
     public static final int TIMEOUT_BUCKET_NUM = 3;
 
@@ -100,6 +101,8 @@ public class Acker implements IBolt {
             }
             curr.failed = true;
             pending.put(id, curr);
+        } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+            pending.put(id, curr);
         } else {
             LOG.warn("Unknown source stream {} from task-{}", streamId, 
input.getSourceTask());
             return;
@@ -110,11 +113,11 @@ public class Acker implements IBolt {
             if (curr.val == 0) {
                 pending.remove(id);
                 collector.emitDirect(task, ACKER_ACK_STREAM_ID, new 
Values(id));
-            } else {
-                if (curr.failed) {
-                    pending.remove(id);
-                    collector.emitDirect(task, ACKER_FAIL_STREAM_ID, new 
Values(id));
-                }
+            } else if (curr.failed) {
+                pending.remove(id);
+                collector.emitDirect(task, ACKER_FAIL_STREAM_ID, new 
Values(id));
+            } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+                collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, new 
Values(id));
             }
         }
 
@@ -125,4 +128,4 @@ public class Acker implements IBolt {
     public void cleanup() {
         LOG.info("Acker: cleanup successfully");
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java 
b/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java
index cbbe108..cda4d9f 100644
--- a/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java
@@ -29,4 +29,5 @@ public interface IOutputCollector extends IErrorReporter {
     void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, 
List<Object> tuple);
     void ack(Tuple input);
     void fail(Tuple input);
+    void resetTimeout(Tuple input);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java 
b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
index e6e54ac..071d8aa 100644
--- a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
@@ -218,6 +218,16 @@ public class OutputCollector implements IOutputCollector {
         _delegate.fail(input);
     }
 
+    /**
+    * Resets the message timeout for any tuple trees to which the given tuple 
belongs.
+    * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+    * @param input the tuple to reset timeout for
+    */
+    @Override
+    public void resetTimeout(Tuple input) {
+        _delegate.resetTimeout(input);
+    }
+
     @Override
     public void reportError(Throwable error) {
         _delegate.reportError(error);

http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java 
b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
index cedc7c9..343c349 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
@@ -52,6 +52,10 @@ public class BasicOutputCollector implements 
IBasicOutputCollector {
         emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
     }
 
+    public void resetTimeout(Tuple tuple){
+        out.resetTimeout(tuple);
+    }
+
     protected IOutputCollector getOutputter() {
         return out;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java 
b/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
index 60da48a..7b7c9fc 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java
@@ -18,10 +18,12 @@
 package org.apache.storm.topology;
 
 import org.apache.storm.task.IErrorReporter;
+import org.apache.storm.tuple.Tuple;
 
 import java.util.List;
 
 public interface IBasicOutputCollector extends IErrorReporter{
     List<Integer> emit(String streamId, List<Object> tuple);
     void emitDirect(int taskId, String streamId, List<Object> tuple);
+    void resetTimeout(Tuple tuple);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java 
b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
index d85d217..41feb12 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java
@@ -180,6 +180,10 @@ public class TridentBoltExecutor implements IRichBolt {
         public void fail(Tuple tuple) {
             throw new IllegalStateException("Method should never be called");
         }
+
+        public void resetTimeout(Tuple tuple) {
+            throw new IllegalStateException("Method should never be called");
+        }
         
         public void reportError(Throwable error) {
             _delegate.reportError(error);

http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj 
b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 697bdae..6d3b8f0 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -20,6 +20,7 @@
   (:import [org.apache.storm.generated InvalidTopologyException SubmitOptions 
TopologyInitialStatus RebalanceOptions])
   (:import [org.apache.storm.testing TestWordCounter TestWordSpout 
TestGlobalCount
             TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker 
TestPlannerSpout])
+  (:import [org.apache.storm.utils Time])
   (:import [org.apache.storm.tuple Fields])
   (:import [org.apache.storm.cluster StormClusterStateImpl])
   (:use [org.apache.storm.internal clojure])
@@ -97,9 +98,18 @@
             (ack! collector tuple)
             ))))))
 
-(defn assert-loop [afn ids]
-  (while (not (every? afn ids))
-    (Thread/sleep 1)))
+(defn assert-loop 
+([afn ids] (assert-loop afn ids 10))
+([afn ids timeout-secs]
+  (loop [remaining-time (* timeout-secs 1000)]
+    (let [start-time (System/currentTimeMillis)
+          assertion-is-true (every? afn ids)]
+      (if (or assertion-is-true (neg? remaining-time))
+        (is assertion-is-true)
+        (do
+          (Thread/sleep 1)
+          (recur (- remaining-time (- (System/currentTimeMillis) start-time)))
+        ))))))
 
 (defn assert-acked [tracker & ids]
   (assert-loop #(.isAcked tracker %) ids))
@@ -132,6 +142,43 @@
       (assert-failed tracker 2)
       )))
 
+(defbolt extend-timeout-twice {} {:prepare true}
+  [conf context collector]
+  (let [state (atom -1)]
+    (bolt
+      (execute [tuple]
+        (do
+          (Time/sleep (* 8 1000))
+          (reset-timeout! collector tuple)
+          (Time/sleep (* 8 1000))
+          (reset-timeout! collector tuple)
+          (Time/sleep (* 8 1000))
+          (ack! collector tuple)
+        )))))
+
+(deftest test-reset-timeout
+  (with-simulated-time-local-cluster [cluster :daemon-conf 
{TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
+    (let [feeder (feeder-spout ["field1"])
+          tracker (AckFailMapTracker.)
+          _ (.setAckFailDelegate feeder tracker)
+          topology (Thrift/buildTopology
+                     {"1" (Thrift/prepareSpoutDetails feeder)}
+                     {"2" (Thrift/prepareBoltDetails 
+                            {(Utils/getGlobalStreamId "1" nil)
+                             (Thrift/prepareGlobalGrouping)} 
extend-timeout-twice)})]
+    (submit-local-topology (:nimbus cluster)
+                           "timeout-tester"
+                           {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+                           topology)
+    (advance-cluster-time cluster 11)
+    (.feed feeder ["a"] 1)
+    (advance-cluster-time cluster 21)
+    (is (not (.isFailed tracker 1)))
+    (is (not (.isAcked tracker 1)))
+    (advance-cluster-time cluster 5)
+    (assert-acked tracker 1)
+    )))
+
 (defn mk-validate-topology-1 []
   (Thrift/buildTopology
                     {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) 
(Integer. 3))}

Reply via email to