cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295073210


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -954,11 +985,14 @@ public void 
handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                             // so we don't need to do it here.
                             log.debug("Completing purgatory items up to offset 
{} and epoch {}.", offset, epoch);
 
-                            // Complete any events in the purgatory that were 
waiting for this offset.
-                            deferredEventQueue.completeUpTo(offset);
+                            // Advance the committed and stable offsets then 
complete any pending purgatory
+                            // items that were waiting for these offsets.
+                            offsetControl.handleCommitBatch(batch);
+                            
deferredEventQueue.completeUpTo(offsetControl.lastStableOffset());
+                            
deferredUnstableEventQueue.completeUpTo(offsetControl.lastCommittedOffset());
 
                             // The active controller can delete up to the 
current committed offset.
-                            snapshotRegistry.deleteSnapshotsUpTo(offset);
+                            
snapshotRegistry.deleteSnapshotsUpTo(offsetControl.lastStableOffset());

Review Comment:
   I guess we need to call this one out as a bug fix.
   
   Good find.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to