Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2367#discussion_r143914514
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 ---
    @@ -77,57 +78,58 @@ public void addToEmitMsgs(long offset) {
          */
         public OffsetAndMetadata findNextCommitOffset() {
             long currOffset;
    -        long nextEarliestUncommittedOffset = earliestUncommittedOffset;
    +        long nextCommitOffset = committedOffset;
             KafkaSpoutMessageId nextCommitMsg = null;     // this is a 
convenience variable to make it faster to create OffsetAndMetadata
     
             for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // 
complexity is that of a linear scan on a TreeMap
                 currOffset = currAckedMsg.offset();
    -            if (currOffset == nextEarliestUncommittedOffset) {            
// found the next offset to commit
    +            if (currOffset == nextCommitOffset) {            // found the 
next offset to commit
                     nextCommitMsg = currAckedMsg;
    -                nextEarliestUncommittedOffset = currOffset + 1;
    -            } else if (currOffset > nextEarliestUncommittedOffset) {
    -                if 
(emittedOffsets.contains(nextEarliestUncommittedOffset)) {
    -                    LOG.debug("topic-partition [{}] has non-contiguous 
offset [{}]."
    +                nextCommitOffset = currOffset + 1;
    +            } else if (currOffset > nextCommitOffset) {
    +                if (emittedOffsets.contains(nextCommitOffset)) {
    +                    LOG.debug("topic-partition [{}] has non-sequential 
offset [{}]."
                             + " It will be processed in a subsequent batch.", 
tp, currOffset);
                         break;
                     } else {
                         /*
    -                        This case will arise in case of non contiguous 
offset being processed.
    -                        So, if the topic doesn't contain offset = 
committedOffset + 1 (possible
    +                        This case will arise in case of non-sequential 
offset being processed.
    +                        So, if the topic doesn't contain offset = 
nextCommitOffset (possible
                             if the topic is compacted or deleted), the 
consumer should jump to
                             the next logical point in the topic. Next logical 
offset should be the
    -                        first element after committedOffset in the 
ascending ordered emitted set.
    +                        first element after nextEarliestUncommittedOffset 
in the ascending ordered emitted set.
    --- End diff --
    
    Nice catch, fixed.


---

Reply via email to