Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r158215083
--- Diff:
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -127,74 +141,130 @@ public void expire(Long key, TupleInfo tupleInfo) {
spoutObject.open(topoConf, taskData.getUserContext(),
outputCollector);
}
openOrPrepareWasCalled.set(true);
- LOG.info("Opened spout {}:{}", componentId, idToTask.keySet());
+ LOG.info("Opened spout {}:{}", componentId, taskIds);
setupMetrics();
}
@Override
- public Callable<Object> call() throws Exception {
- init(idToTask);
-
- return new Callable<Object>() {
+ public Callable<Long> call() throws Exception {
+ init(idToTask, idToTaskBase);
+ return new Callable<Long>() {
+ int i=0;
+ final int recvqCheckSkipCount = getSpoutRecvqCheckSkipCount();
+ int bpIdleCount = 0;
+ int rmspCount = 0;
@Override
- public Object call() throws Exception {
- receiveQueue.consumeBatch(SpoutExecutor.this);
-
- final long currCount = emittedCount.get();
- final boolean throttleOn = backPressureEnabled &&
SpoutExecutor.this.throttleOn.get();
- final boolean reachedMaxSpoutPending = (maxSpoutPending !=
0) && (pending.size() >= maxSpoutPending);
- final boolean isActive = stormActive.get();
+ public Long call() throws Exception {
+ int receiveCount = 0;
+ if (i++ == recvqCheckSkipCount) {
--- End diff --
minor: we may be able to have longer but better variable name other than
`i`, since it's not a temporary short scope variable.
---