Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r158215973
--- Diff:
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -127,74 +141,130 @@ public void expire(Long key, TupleInfo tupleInfo) {
spoutObject.open(topoConf, taskData.getUserContext(),
outputCollector);
}
openOrPrepareWasCalled.set(true);
- LOG.info("Opened spout {}:{}", componentId, idToTask.keySet());
+ LOG.info("Opened spout {}:{}", componentId, taskIds);
setupMetrics();
}
@Override
- public Callable<Object> call() throws Exception {
- init(idToTask);
-
- return new Callable<Object>() {
+ public Callable<Long> call() throws Exception {
+ init(idToTask, idToTaskBase);
+ return new Callable<Long>() {
+ int i=0;
+ final int recvqCheckSkipCount = getSpoutRecvqCheckSkipCount();
+ int bpIdleCount = 0;
+ int rmspCount = 0;
@Override
- public Object call() throws Exception {
- receiveQueue.consumeBatch(SpoutExecutor.this);
-
- final long currCount = emittedCount.get();
- final boolean throttleOn = backPressureEnabled &&
SpoutExecutor.this.throttleOn.get();
- final boolean reachedMaxSpoutPending = (maxSpoutPending !=
0) && (pending.size() >= maxSpoutPending);
- final boolean isActive = stormActive.get();
+ public Long call() throws Exception {
+ int receiveCount = 0;
+ if (i++ == recvqCheckSkipCount) {
+ receiveCount =
receiveQueue.consume(SpoutExecutor.this);
+ i=0;
+ }
+ long currCount = emittedCount.get();
+ boolean reachedMaxSpoutPending = (maxSpoutPending != 0) &&
(pending.size() >= maxSpoutPending);
+ boolean isActive = stormActive.get();
+ boolean noEmits = true;
if (isActive) {
if (!lastActive.get()) {
lastActive.set(true);
- LOG.info("Activating spout {}:{}", componentId,
idToTask.keySet());
+ LOG.info("Activating spout {}:{}", componentId,
taskIds);
for (ISpout spout : spouts) {
spout.activate();
}
}
- if (!transferQueue.isFull() && !throttleOn &&
!reachedMaxSpoutPending) {
- for (ISpout spout : spouts) {
- spout.nextTuple();
+ boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
+
+ long emptyStretch = 0;
+ if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
+ for (int j = 0; j < spouts.size(); j++) { // in
critical path. don't use iterators.
+ spouts.get(j).nextTuple();
+ }
+ noEmits = (currCount == emittedCount.get());
+ if (noEmits) {
+ emptyEmitStreak.increment();
+ } else {
+ emptyStretch = emptyEmitStreak.get();
+ emptyEmitStreak.set(0);
}
}
+ if (reachedMaxSpoutPending) {
+ if(rmspCount==0)
+ LOG.debug("Reached max spout pending");
+ rmspCount++;
+ } else {
+ if (rmspCount>0)
+ LOG.debug("Ended max spout pending stretch of
{} iterations", rmspCount);
+ rmspCount = 0;
+ }
+
+ if ( receiveCount>1 ) {
--- End diff --
Is it intentional to compare this with 1? I guess it looks natural to
compare with 0, but you may have some reasons to do it.
---