Repository: storm Updated Branches: refs/heads/master 5ac306237 -> 54f6b32f6
STORM-67 Provide API for spouts to know how many pending messages there are - changes to classes implementing ISpoutOutputCollector - executor.clj getPendingCount definition Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3d03b931 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3d03b931 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3d03b931 Branch: refs/heads/master Commit: 3d03b931b66d43fd5dd383c5e0ac23c55b772232 Parents: 27a3a6b Author: Shyam Rajendran <srajend...@yahoo-inc.com> Authored: Thu Jun 11 12:38:49 2015 -0500 Committer: Shyam Rajendran <rshyam....@gmail.com> Committed: Mon Jul 6 10:42:21 2015 -0500 ---------------------------------------------------------------------- .../spout/SpoutOutputCollectorMock.java | 13 ++++++++++-- .../src/clj/backtype/storm/daemon/executor.clj | 3 +++ .../storm/spout/ISpoutOutputCollector.java | 1 + .../storm/spout/SpoutOutputCollector.java | 5 +++++ .../backtype/storm/testing/SpoutTracker.java | 6 ++++++ .../trident/spout/RichSpoutBatchExecutor.java | 21 ++++++++++++++------ .../trident/spout/RichSpoutBatchTriggerer.java | 18 ++++++++++------- 7 files changed, 52 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java index 02e6830..9f33c89 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java @@ -17,9 +17,10 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import java.util.List; - import backtype.storm.spout.ISpoutOutputCollector; +import backtype.storm.spout.SpoutOutputCollector; + +import java.util.List; /** * Mock of ISpoutOutputCollector @@ -27,6 +28,7 @@ import backtype.storm.spout.ISpoutOutputCollector; public class SpoutOutputCollectorMock implements ISpoutOutputCollector { //comma separated offsets StringBuilder emittedOffset; + SpoutOutputCollector _collector; public SpoutOutputCollectorMock() { emittedOffset = new StringBuilder(); @@ -58,4 +60,11 @@ public class SpoutOutputCollectorMock implements ISpoutOutputCollector { @Override public void reportError(Throwable arg0) { } + + @Override + public long getPendingCount() { + return _collector.getPendingCount(); + } + + } http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/clj/backtype/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj index 454fd0d..4f5cc75 100644 --- a/storm-core/src/clj/backtype/storm/daemon/executor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj @@ -558,6 +558,9 @@ (:user-context task-data) (SpoutOutputCollector. (reify ISpoutOutputCollector + (^long getPendingCount[this] + (.size pending) + ) (^List emit [this ^String stream-id ^List tuple ^Object message-id] (send-spout-msg stream-id tuple message-id nil) ) http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java b/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java index 3cebe43..26a4843 100644 --- a/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java +++ b/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java @@ -26,5 +26,6 @@ public interface ISpoutOutputCollector { List<Integer> emit(String streamId, List<Object> tuple, Object messageId); void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId); void reportError(Throwable error); + long getPendingCount(); } http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java b/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java index 7a33026..f23692b 100644 --- a/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java +++ b/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java @@ -131,4 +131,9 @@ public class SpoutOutputCollector implements ISpoutOutputCollector { public void reportError(Throwable error) { _delegate.reportError(error); } + + @Override + public long getPendingCount() { + return _delegate.getPendingCount(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java b/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java index 75ba2b8..c4b5ff1 100644 --- a/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java +++ b/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java @@ -65,6 +65,12 @@ public class SpoutTracker extends BaseRichSpout { public void reportError(Throwable error) { _collector.reportError(error); } + + @Override + public long getPendingCount() { + return _collector.getPendingCount(); + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java index 345a5a0..b81953d 100644 --- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java +++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java @@ -24,13 +24,14 @@ import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.utils.RotatingMap; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import storm.trident.operation.TridentCollector; import storm.trident.topology.TransactionAttempt; import storm.trident.util.TridentUtils; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + public class RichSpoutBatchExecutor implements ITridentSpout { public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size"; @@ -81,7 +82,8 @@ public class RichSpoutBatchExecutor implements ITridentSpout { idsMap = new RotatingMap(3); rotateTime = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); } - + + @Override public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) { long txid = tx.getTransactionId(); @@ -112,6 +114,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout { } } idsMap.put(txid, _collector.ids); + _collector.pendingCount = idsMap.size(); } @@ -137,6 +140,8 @@ public class RichSpoutBatchExecutor implements ITridentSpout { } } } + + @Override public void close() { @@ -170,7 +175,7 @@ public class RichSpoutBatchExecutor implements ITridentSpout { TridentCollector _collector; public List<Object> ids; public int numEmitted; - + public long pendingCount; public void reset(TridentCollector c) { _collector = c; ids = new ArrayList<Object>(); @@ -193,7 +198,11 @@ public class RichSpoutBatchExecutor implements ITridentSpout { public void emitDirect(int task, String stream, List<Object> values, Object id) { throw new UnsupportedOperationException("Trident does not support direct streams"); } - + + @Override + public long getPendingCount() { + return pendingCount; + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java index 728d51e..ae6fedf 100644 --- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java +++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java @@ -27,12 +27,12 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.Set; import storm.trident.topology.TridentBoltExecutor; import storm.trident.tuple.ConsList; @@ -173,6 +173,10 @@ public class RichSpoutBatchTriggerer implements IRichSpout { public void reportError(Throwable t) { _collector.reportError(t); } - + + @Override + public long getPendingCount() { + return _collector.getPendingCount(); + } } }