mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r441065091



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -409,32 +407,28 @@ public void resume() {
         return committableOffsets;
     }
 
+    /**
+     * This should only be called if the attempted commit succeeded for this 
task
+     */
     @Override
     public void postCommit() {
         commitRequested = false;
         commitNeeded = false;
 
         switch (state()) {
             case RESTORING:
-                writeCheckpointIfNeed();
+            case SUSPENDED:
+                maybeWriteCheckpoint();
 
                 break;
 
             case RUNNING:
-                if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
-                    writeCheckpointIfNeed();
+                if (!eosEnabled) {
+                    maybeWriteCheckpoint();
                 }
 
                 break;
 
-            case SUSPENDED:
-                writeCheckpointIfNeed();
-                // we cannot `clear()` the `PartitionGroup` in `suspend()` 
already, but only after committing,
-                // because otherwise we loose the partition-time information
-                partitionGroup.clear();

Review comment:
       The consumer tracks offset internal, however, we buffer data in our 
internal queue. Thus, the offset tracked by the consumer, might be larger than 
the offset we commit (we take the offset we commit not from the consumer, but 
it's based on the records we did take out of the queue and processed).
   
   In eager rebalancing, the consumer clears its internal state if a partition 
in revoked (and we only suspend the task), including the tracked offsets. If 
the partition in re-assigned, the consumer fetches the last committed offset to 
start fetching. Thus, if we don't clear the queue, we might fetch same data 
that is already in the queue a second time.
   
   Does this make sense?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to