Repository: storm
Updated Branches:
  refs/heads/master 5ac306237 -> 54f6b32f6


STORM-67 Provide API for spouts to know how many pending messages there
are
 - changes to classes implementing ISpoutOutputCollector
 - executor.clj getPendingCount  definition


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3d03b931
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3d03b931
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3d03b931

Branch: refs/heads/master
Commit: 3d03b931b66d43fd5dd383c5e0ac23c55b772232
Parents: 27a3a6b
Author: Shyam Rajendran <srajend...@yahoo-inc.com>
Authored: Thu Jun 11 12:38:49 2015 -0500
Committer: Shyam Rajendran <rshyam....@gmail.com>
Committed: Mon Jul 6 10:42:21 2015 -0500

----------------------------------------------------------------------
 .../spout/SpoutOutputCollectorMock.java         | 13 ++++++++++--
 .../src/clj/backtype/storm/daemon/executor.clj  |  3 +++
 .../storm/spout/ISpoutOutputCollector.java      |  1 +
 .../storm/spout/SpoutOutputCollector.java       |  5 +++++
 .../backtype/storm/testing/SpoutTracker.java    |  6 ++++++
 .../trident/spout/RichSpoutBatchExecutor.java   | 21 ++++++++++++++------
 .../trident/spout/RichSpoutBatchTriggerer.java  | 18 ++++++++++-------
 7 files changed, 52 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
index 02e6830..9f33c89 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -17,9 +17,10 @@
  
*******************************************************************************/
 package org.apache.storm.eventhubs.spout;
 
-import java.util.List;
-
 import backtype.storm.spout.ISpoutOutputCollector;
+import backtype.storm.spout.SpoutOutputCollector;
+
+import java.util.List;
 
 /**
  * Mock of ISpoutOutputCollector
@@ -27,6 +28,7 @@ import backtype.storm.spout.ISpoutOutputCollector;
 public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
   //comma separated offsets
   StringBuilder emittedOffset;
+  SpoutOutputCollector _collector;
   
   public SpoutOutputCollectorMock() {
     emittedOffset = new StringBuilder();
@@ -58,4 +60,11 @@ public class SpoutOutputCollectorMock implements 
ISpoutOutputCollector {
   @Override
   public void reportError(Throwable arg0) {
   }
+
+  @Override
+  public long getPendingCount() {
+    return _collector.getPendingCount();
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj 
b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 454fd0d..4f5cc75 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -558,6 +558,9 @@
                  (:user-context task-data)
                  (SpoutOutputCollector.
                   (reify ISpoutOutputCollector
+                    (^long getPendingCount[this]
+                      (.size pending)
+                      )
                     (^List emit [this ^String stream-id ^List tuple ^Object 
message-id]
                       (send-spout-msg stream-id tuple message-id nil)
                       )

http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java 
b/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
index 3cebe43..26a4843 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
@@ -26,5 +26,6 @@ public interface ISpoutOutputCollector {
     List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
     void emitDirect(int taskId, String streamId, List<Object> tuple, Object 
messageId);
     void reportError(Throwable error);
+    long getPendingCount();
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java 
b/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java
index 7a33026..f23692b 100644
--- a/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java
+++ b/storm-core/src/jvm/backtype/storm/spout/SpoutOutputCollector.java
@@ -131,4 +131,9 @@ public class SpoutOutputCollector implements 
ISpoutOutputCollector {
     public void reportError(Throwable error) {
         _delegate.reportError(error);
     }
+
+    @Override
+    public long getPendingCount() {
+        return _delegate.getPendingCount();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java 
b/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java
index 75ba2b8..c4b5ff1 100644
--- a/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java
+++ b/storm-core/src/jvm/backtype/storm/testing/SpoutTracker.java
@@ -65,6 +65,12 @@ public class SpoutTracker extends BaseRichSpout {
         public void reportError(Throwable error) {
                _collector.reportError(error);
         }
+
+        @Override
+        public long getPendingCount() {
+            return _collector.getPendingCount();
+        }
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java 
b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
index 345a5a0..b81953d 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
@@ -24,13 +24,14 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.RotatingMap;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import storm.trident.operation.TridentCollector;
 import storm.trident.topology.TransactionAttempt;
 import storm.trident.util.TridentUtils;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 public class RichSpoutBatchExecutor implements ITridentSpout {
     public static final String MAX_BATCH_SIZE_CONF = 
"topology.spout.max.batch.size";
 
@@ -81,7 +82,8 @@ public class RichSpoutBatchExecutor implements ITridentSpout {
             idsMap = new RotatingMap(3);
             rotateTime = 1000L * 
((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
         }
-        
+
+
         @Override
         public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, 
TridentCollector collector) {
             long txid = tx.getTransactionId();
@@ -112,6 +114,7 @@ public class RichSpoutBatchExecutor implements 
ITridentSpout {
                 }
             }
             idsMap.put(txid, _collector.ids);
+            _collector.pendingCount = idsMap.size();
 
         }
 
@@ -137,6 +140,8 @@ public class RichSpoutBatchExecutor implements 
ITridentSpout {
                 }
             }
         }
+
+
         
         @Override
         public void close() {
@@ -170,7 +175,7 @@ public class RichSpoutBatchExecutor implements 
ITridentSpout {
         TridentCollector _collector;
         public List<Object> ids;
         public int numEmitted;
-        
+        public long pendingCount;
         public void reset(TridentCollector c) {
             _collector = c;
             ids = new ArrayList<Object>();
@@ -193,7 +198,11 @@ public class RichSpoutBatchExecutor implements 
ITridentSpout {
         public void emitDirect(int task, String stream, List<Object> values, 
Object id) {
             throw new UnsupportedOperationException("Trident does not support 
direct streams");
         }
-        
+
+        @Override
+        public long getPendingCount() {
+            return pendingCount;
+        }
     }
     
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3d03b931/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java 
b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
index 728d51e..ae6fedf 100644
--- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
+++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java
@@ -27,12 +27,12 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.ArrayList;            
+import java.util.HashMap;              
+import java.util.HashSet;              
+import java.util.List;         
+import java.util.Map;          
+import java.util.Random;               
 import java.util.Set;
 import storm.trident.topology.TridentBoltExecutor;
 import storm.trident.tuple.ConsList;
@@ -173,6 +173,10 @@ public class RichSpoutBatchTriggerer implements IRichSpout 
{
         public void reportError(Throwable t) {
             _collector.reportError(t);
         }
-        
+
+        @Override
+        public long getPendingCount() {
+            return _collector.getPendingCount();
+        }
     }
 }

Reply via email to