Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r158940303
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
    @@ -127,74 +141,130 @@ public void expire(Long key, TupleInfo tupleInfo) {
                 spoutObject.open(topoConf, taskData.getUserContext(), 
outputCollector);
             }
             openOrPrepareWasCalled.set(true);
    -        LOG.info("Opened spout {}:{}", componentId, idToTask.keySet());
    +        LOG.info("Opened spout {}:{}", componentId, taskIds);
             setupMetrics();
         }
     
         @Override
    -    public Callable<Object> call() throws Exception {
    -        init(idToTask);
    -
    -        return new Callable<Object>() {
    +    public Callable<Long> call() throws Exception {
    +        init(idToTask, idToTaskBase);
    +        return new Callable<Long>() {
    +            int i=0;
    +            final int recvqCheckSkipCount = getSpoutRecvqCheckSkipCount();
    +            int bpIdleCount = 0;
    +            int rmspCount = 0;
                 @Override
    -            public Object call() throws Exception {
    -                receiveQueue.consumeBatch(SpoutExecutor.this);
    -
    -                final long currCount = emittedCount.get();
    -                final boolean throttleOn = backPressureEnabled && 
SpoutExecutor.this.throttleOn.get();
    -                final boolean reachedMaxSpoutPending = (maxSpoutPending != 
0) && (pending.size() >= maxSpoutPending);
    -                final boolean isActive = stormActive.get();
    +            public Long call() throws Exception {
    +                int receiveCount = 0;
    +                if (i++ == recvqCheckSkipCount) {
    +                    receiveCount = 
receiveQueue.consume(SpoutExecutor.this);
    +                    i=0;
    +                }
    +                long currCount = emittedCount.get();
    +                boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && 
(pending.size() >= maxSpoutPending);
    +                boolean isActive = stormActive.get();
    +                boolean noEmits = true;
                     if (isActive) {
                         if (!lastActive.get()) {
                             lastActive.set(true);
    -                        LOG.info("Activating spout {}:{}", componentId, 
idToTask.keySet());
    +                        LOG.info("Activating spout {}:{}", componentId, 
taskIds);
                             for (ISpout spout : spouts) {
                                 spout.activate();
                             }
                         }
    -                    if (!transferQueue.isFull() && !throttleOn && 
!reachedMaxSpoutPending) {
    -                        for (ISpout spout : spouts) {
    -                            spout.nextTuple();
    +                    boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
    +
    +                    long emptyStretch = 0;
    +                    if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
    +                        for (int j = 0; j < spouts.size(); j++) { // in 
critical path. don't use iterators.
    +                            spouts.get(j).nextTuple();
    +                        }
    +                        noEmits = (currCount == emittedCount.get());
    +                        if (noEmits) {
    +                            emptyEmitStreak.increment();
    +                        } else {
    +                            emptyStretch = emptyEmitStreak.get();
    +                            emptyEmitStreak.set(0);
                             }
                         }
    +                    if (reachedMaxSpoutPending) {
    +                        if(rmspCount==0)
    +                            LOG.debug("Reached max spout pending");
    +                        rmspCount++;
    +                    } else {
    +                        if (rmspCount>0)
    +                            LOG.debug("Ended max spout pending stretch of 
{} iterations", rmspCount);
    +                        rmspCount = 0;
    +                    }
    +
    +                    if ( receiveCount>1 ) {
    --- End diff --
    
    good observation. Actually it used to be 0. Then I noticed it was somewhat 
common case for there to be just one item in the recvQ (metrics tick, flush 
tuple etc) during idle stretches. Once we consume that,  we idle immediately, 
rather than check again. a minor optimization.


---

Reply via email to