cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295073210
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -954,11 +985,14 @@ public void
handleCommit(BatchReader<ApiMessageAndVersion> reader) {
// so we don't need to do it here.
log.debug("Completing purgatory items up to offset
{} and epoch {}.", offset, epoch);
- // Complete any events in the purgatory that were
waiting for this offset.
- deferredEventQueue.completeUpTo(offset);
+ // Advance the committed and stable offsets then
complete any pending purgatory
+ // items that were waiting for these offsets.
+ offsetControl.handleCommitBatch(batch);
+
deferredEventQueue.completeUpTo(offsetControl.lastStableOffset());
+
deferredUnstableEventQueue.completeUpTo(offsetControl.lastCommittedOffset());
// The active controller can delete up to the
current committed offset.
- snapshotRegistry.deleteSnapshotsUpTo(offset);
+
snapshotRegistry.deleteSnapshotsUpTo(offsetControl.lastStableOffset());
Review Comment:
I guess we need to call this one out as a bug fix.
Good find.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]