Repository: storm Updated Branches: refs/heads/master 7f52aecb1 -> 4d15d4c38
STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d36be51a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d36be51a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d36be51a Branch: refs/heads/master Commit: d36be51a39abb03ac47e01eb2e1fda31f9f9110b Parents: d42c437 Author: Stig Rohde Døssing <[email protected]> Authored: Sun Feb 14 02:39:42 2016 +0100 Committer: Stig Døssing <[email protected]> Committed: Tue Mar 1 23:52:36 2016 +0100 ---------------------------------------------------------------------- .../src/clj/org/apache/storm/clojure.clj | 3 ++ .../src/clj/org/apache/storm/daemon/common.clj | 8 +++ .../clj/org/apache/storm/daemon/executor.clj | 11 +++- .../clj/org/apache/storm/internal/clojure.clj | 3 ++ .../storm/coordination/CoordinatedBolt.java | 4 ++ .../src/jvm/org/apache/storm/daemon/Acker.java | 15 +++--- .../org/apache/storm/task/IOutputCollector.java | 1 + .../org/apache/storm/task/OutputCollector.java | 10 ++++ .../storm/topology/BasicOutputCollector.java | 4 ++ .../storm/topology/IBasicOutputCollector.java | 2 + .../trident/topology/TridentBoltExecutor.java | 4 ++ .../org/apache/storm/integration_test.clj | 53 ++++++++++++++++++-- 12 files changed, 108 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-clojure/src/clj/org/apache/storm/clojure.clj ---------------------------------------------------------------------- diff --git a/storm-clojure/src/clj/org/apache/storm/clojure.clj b/storm-clojure/src/clj/org/apache/storm/clojure.clj index 9e1836f..607fc24 100644 --- a/storm-clojure/src/clj/org/apache/storm/clojure.clj +++ b/storm-clojure/src/clj/org/apache/storm/clojure.clj @@ -179,6 +179,9 @@ (defn fail! [collector ^Tuple tuple] (.fail ^OutputCollector (:output-collector collector) tuple)) +(defn reset-timeout! [collector ^Tuple tuple] + (.resetTimeout ^OutputCollector (:output-collector collector) tuple)) + (defn report-error! [collector ^Tuple tuple] (.reportError ^OutputCollector (:output-collector collector) tuple)) http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/clj/org/apache/storm/daemon/common.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj index 65cf233..49b0bb9 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/common.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj @@ -51,6 +51,7 @@ (def ACKER-INIT-STREAM-ID Acker/ACKER_INIT_STREAM_ID) (def ACKER-ACK-STREAM-ID Acker/ACKER_ACK_STREAM_ID) (def ACKER-FAIL-STREAM-ID Acker/ACKER_FAIL_STREAM_ID) +(def ACKER-RESET-TIMEOUT-STREAM-ID Acker/ACKER_RESET_TIMEOUT_STREAM_ID) (def SYSTEM-STREAM-ID "__system") @@ -202,6 +203,8 @@ {(Utils/getGlobalStreamId id ACKER-ACK-STREAM-ID) (Thrift/prepareFieldsGrouping ["id"]) (Utils/getGlobalStreamId id ACKER-FAIL-STREAM-ID) + (Thrift/prepareFieldsGrouping ["id"]) + (Utils/getGlobalStreamId id ACKER-RESET-TIMEOUT-STREAM-ID) (Thrift/prepareFieldsGrouping ["id"])} ))] (merge spout-inputs bolt-inputs))) @@ -233,6 +236,7 @@ (mk-acker-bolt) {ACKER-ACK-STREAM-ID (Thrift/directOutputFields ["id"]) ACKER-FAIL-STREAM-ID (Thrift/directOutputFields ["id"]) + ACKER-RESET-TIMEOUT-STREAM-ID (Thrift/directOutputFields ["id"]) } (Integer. num-executors) {TOPOLOGY-TASKS num-executors @@ -242,6 +246,7 @@ (do (.put_to_streams common ACKER-ACK-STREAM-ID (Thrift/outputFields ["id" "ack-val"])) (.put_to_streams common ACKER-FAIL-STREAM-ID (Thrift/outputFields ["id"])) + (.put_to_streams common ACKER-RESET-TIMEOUT-STREAM-ID (Thrift/outputFields ["id"])) )) (dofor [[_ spout] (.get_spouts ret) :let [common (.get_common spout) @@ -258,6 +263,9 @@ (.put_to_inputs common (GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID) (Thrift/prepareDirectGrouping)) + (.put_to_inputs common + (GlobalStreamId. ACKER-COMPONENT-ID ACKER-RESET-TIMEOUT-STREAM-ID) + (Thrift/prepareDirectGrouping)) )) (.put_to_bolts ret "__acker" acker-bolt) )) http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 9ff93f8..de32544 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -529,6 +529,11 @@ spout-obj (:object task-data)] (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj (.getValue tuple 0)))) + ACKER-RESET-TIMEOUT-STREAM-ID + (let [id (.getValue tuple 0) + pending-for-id (.get pending id)] + (when pending-for-id + (.put pending id pending-for-id))) (let [id (.getValue tuple 0) [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)] (when spout-id @@ -830,9 +835,13 @@ (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) + (^void resetTimeout [this ^Tuple tuple] + (fast-list-iter [root (.. tuple getMessageId getAnchors)] + (task/send-unanchored task-data + ACKER-RESET-TIMEOUT-STREAM-ID + [root]))) (reportError [this error] (report-error error)))))) - (reset! open-or-prepare-was-called? true) (log-message "Prepared bolt " component-id ":" (keys task-datas)) (setup-metrics! executor-data) http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/clj/org/apache/storm/internal/clojure.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/internal/clojure.clj b/storm-core/src/clj/org/apache/storm/internal/clojure.clj index 3f29757..f27ac04 100644 --- a/storm-core/src/clj/org/apache/storm/internal/clojure.clj +++ b/storm-core/src/clj/org/apache/storm/internal/clojure.clj @@ -179,6 +179,9 @@ (defn fail! [collector ^Tuple tuple] (.fail ^OutputCollector (:output-collector collector) tuple)) +(defn reset-timeout! [collector ^Tuple tuple] + (.resetTimeout ^OutputCollector (:output-collector collector) tuple)) + (defn report-error! [collector ^Tuple tuple] (.reportError ^OutputCollector (:output-collector collector) tuple)) http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java b/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java index ee66b09..15ac5e2 100644 --- a/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java +++ b/storm-core/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java @@ -124,6 +124,10 @@ public class CoordinatedBolt implements IRichBolt { checkFinishId(tuple, TupleType.REGULAR); _delegate.fail(tuple); } + + public void resetTimeout(Tuple tuple) { + _delegate.resetTimeout(tuple); + } public void reportError(Throwable error) { _delegate.reportError(error); http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java index 7d05e24..eb14af7 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java @@ -40,6 +40,7 @@ public class Acker implements IBolt { public static final String ACKER_INIT_STREAM_ID = "__ack_init"; public static final String ACKER_ACK_STREAM_ID = "__ack_ack"; public static final String ACKER_FAIL_STREAM_ID = "__ack_fail"; + public static final String ACKER_RESET_TIMEOUT_STREAM_ID = "__ack_reset_timeout"; public static final int TIMEOUT_BUCKET_NUM = 3; @@ -100,6 +101,8 @@ public class Acker implements IBolt { } curr.failed = true; pending.put(id, curr); + } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) { + pending.put(id, curr); } else { LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask()); return; @@ -110,11 +113,11 @@ public class Acker implements IBolt { if (curr.val == 0) { pending.remove(id); collector.emitDirect(task, ACKER_ACK_STREAM_ID, new Values(id)); - } else { - if (curr.failed) { - pending.remove(id); - collector.emitDirect(task, ACKER_FAIL_STREAM_ID, new Values(id)); - } + } else if (curr.failed) { + pending.remove(id); + collector.emitDirect(task, ACKER_FAIL_STREAM_ID, new Values(id)); + } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) { + collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, new Values(id)); } } @@ -125,4 +128,4 @@ public class Acker implements IBolt { public void cleanup() { LOG.info("Acker: cleanup successfully"); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java b/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java index cbbe108..cda4d9f 100644 --- a/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java +++ b/storm-core/src/jvm/org/apache/storm/task/IOutputCollector.java @@ -29,4 +29,5 @@ public interface IOutputCollector extends IErrorReporter { void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple); void ack(Tuple input); void fail(Tuple input); + void resetTimeout(Tuple input); } http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java index e6e54ac..071d8aa 100644 --- a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java +++ b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java @@ -218,6 +218,16 @@ public class OutputCollector implements IOutputCollector { _delegate.fail(input); } + /** + * Resets the message timeout for any tuple trees to which the given tuple belongs. + * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS. + * @param input the tuple to reset timeout for + */ + @Override + public void resetTimeout(Tuple input) { + _delegate.resetTimeout(input); + } + @Override public void reportError(Throwable error) { _delegate.reportError(error); http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java index cedc7c9..343c349 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java +++ b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java @@ -52,6 +52,10 @@ public class BasicOutputCollector implements IBasicOutputCollector { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple); } + public void resetTimeout(Tuple tuple){ + out.resetTimeout(tuple); + } + protected IOutputCollector getOutputter() { return out; } http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java index 60da48a..7b7c9fc 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java +++ b/storm-core/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java @@ -18,10 +18,12 @@ package org.apache.storm.topology; import org.apache.storm.task.IErrorReporter; +import org.apache.storm.tuple.Tuple; import java.util.List; public interface IBasicOutputCollector extends IErrorReporter{ List<Integer> emit(String streamId, List<Object> tuple); void emitDirect(int taskId, String streamId, List<Object> tuple); + void resetTimeout(Tuple tuple); } http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java index d85d217..41feb12 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java @@ -180,6 +180,10 @@ public class TridentBoltExecutor implements IRichBolt { public void fail(Tuple tuple) { throw new IllegalStateException("Method should never be called"); } + + public void resetTimeout(Tuple tuple) { + throw new IllegalStateException("Method should never be called"); + } public void reportError(Throwable error) { _delegate.reportError(error); http://git-wip-us.apache.org/repos/asf/storm/blob/d36be51a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj index 697bdae..6d3b8f0 100644 --- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj +++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj @@ -20,6 +20,7 @@ (:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions]) (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout]) + (:import [org.apache.storm.utils Time]) (:import [org.apache.storm.tuple Fields]) (:import [org.apache.storm.cluster StormClusterStateImpl]) (:use [org.apache.storm.internal clojure]) @@ -97,9 +98,18 @@ (ack! collector tuple) )))))) -(defn assert-loop [afn ids] - (while (not (every? afn ids)) - (Thread/sleep 1))) +(defn assert-loop +([afn ids] (assert-loop afn ids 10)) +([afn ids timeout-secs] + (loop [remaining-time (* timeout-secs 1000)] + (let [start-time (System/currentTimeMillis) + assertion-is-true (every? afn ids)] + (if (or assertion-is-true (neg? remaining-time)) + (is assertion-is-true) + (do + (Thread/sleep 1) + (recur (- remaining-time (- (System/currentTimeMillis) start-time))) + )))))) (defn assert-acked [tracker & ids] (assert-loop #(.isAcked tracker %) ids)) @@ -132,6 +142,43 @@ (assert-failed tracker 2) ))) +(defbolt extend-timeout-twice {} {:prepare true} + [conf context collector] + (let [state (atom -1)] + (bolt + (execute [tuple] + (do + (Time/sleep (* 8 1000)) + (reset-timeout! collector tuple) + (Time/sleep (* 8 1000)) + (reset-timeout! collector tuple) + (Time/sleep (* 8 1000)) + (ack! collector tuple) + ))))) + +(deftest test-reset-timeout + (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}] + (let [feeder (feeder-spout ["field1"]) + tracker (AckFailMapTracker.) + _ (.setAckFailDelegate feeder tracker) + topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails feeder)} + {"2" (Thrift/prepareBoltDetails + {(Utils/getGlobalStreamId "1" nil) + (Thrift/prepareGlobalGrouping)} extend-timeout-twice)})] + (submit-local-topology (:nimbus cluster) + "timeout-tester" + {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} + topology) + (advance-cluster-time cluster 11) + (.feed feeder ["a"] 1) + (advance-cluster-time cluster 21) + (is (not (.isFailed tracker 1))) + (is (not (.isAcked tracker 1))) + (advance-cluster-time cluster 5) + (assert-acked tracker 1) + ))) + (defn mk-validate-topology-1 [] (Thrift/buildTopology {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
