cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295073210
########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -954,11 +985,14 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) { // so we don't need to do it here. log.debug("Completing purgatory items up to offset {} and epoch {}.", offset, epoch); - // Complete any events in the purgatory that were waiting for this offset. - deferredEventQueue.completeUpTo(offset); + // Advance the committed and stable offsets then complete any pending purgatory + // items that were waiting for these offsets. + offsetControl.handleCommitBatch(batch); + deferredEventQueue.completeUpTo(offsetControl.lastStableOffset()); + deferredUnstableEventQueue.completeUpTo(offsetControl.lastCommittedOffset()); // The active controller can delete up to the current committed offset. - snapshotRegistry.deleteSnapshotsUpTo(offset); + snapshotRegistry.deleteSnapshotsUpTo(offsetControl.lastStableOffset()); Review Comment: I guess we need to call this one out as a bug fix. Good find. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org