cmccabe commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r633910968
########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -624,50 +617,71 @@ public String toString() { return event.future(); } - class QuorumMetaLogListener implements MetaLogListener { + class QuorumMetaLogListener implements RaftClient.Listener<ApiMessageAndVersion> { + @Override - public void handleCommits(long offset, List<ApiMessage> messages) { - appendControlEvent("handleCommits[" + offset + "]", () -> { - if (curClaimEpoch == -1) { - // If the controller is a standby, replay the records that were - // created by the active controller. - if (log.isDebugEnabled()) { - if (log.isTraceEnabled()) { - log.trace("Replaying commits from the active node up to " + - "offset {}: {}.", offset, messages.stream(). - map(m -> m.toString()).collect(Collectors.joining(", "))); + public void handleCommit(BatchReader<ApiMessageAndVersion> reader) { + appendControlEvent("handleCommits[baseOffset=" + reader.baseOffset() + "]", () -> { + try { + boolean isActiveController = curClaimEpoch != -1; + while (reader.hasNext()) { + Batch<ApiMessageAndVersion> batch = reader.next(); + long offset = batch.lastOffset(); + List<ApiMessageAndVersion> messages = batch.records(); + + if (isActiveController) { + // If the controller is active, the records were already replayed, + // so we don't need to do it here. + log.debug("Completing purgatory items up to offset {}.", offset); + + // Complete any events in the purgatory that were waiting for this offset. + purgatory.completeUpTo(offset); + + // Delete all the in-memory snapshots that we no longer need. + // If we are writing a new snapshot, then we need to keep that around; + // otherwise, we should delete up to the current committed offset. + snapshotRegistry.deleteSnapshotsUpTo( + Math.min(offset, snapshotGeneratorManager.snapshotEpoch())); + } else { - log.debug("Replaying commits from the active node up to " + - "offset {}.", offset); + // If the controller is a standby, replay the records that were + // created by the active controller. + if (log.isDebugEnabled()) { + if (log.isTraceEnabled()) { + log.trace("Replaying commits from the active node up to " + + "offset {}: {}.", offset, messages.stream() + .map(ApiMessageAndVersion::toString) + .collect(Collectors.joining(", "))); + } else { + log.debug("Replaying commits from the active node up to " + + "offset {}.", offset); + } + } + for (ApiMessageAndVersion messageAndVersion : messages) { + replay(messageAndVersion.message(), -1, offset); + } } + lastCommittedOffset = offset; } - for (ApiMessage message : messages) { - replay(message, -1, offset); - } - } else { - // If the controller is active, the records were already replayed, - // so we don't need to do it here. - log.debug("Completing purgatory items up to offset {}.", offset); - - // Complete any events in the purgatory that were waiting for this offset. - purgatory.completeUpTo(offset); - - // Delete all the in-memory snapshots that we no longer need. - // If we are writing a new snapshot, then we need to keep that around; - // otherwise, we should delete up to the current committed offset. - snapshotRegistry.deleteSnapshotsUpTo( - Math.min(offset, snapshotGeneratorManager.snapshotEpoch())); + } finally { + reader.close(); } - lastCommittedOffset = offset; }); } @Override - public void handleNewLeader(MetaLogLeader newLeader) { - if (newLeader.nodeId() == nodeId) { - final long newEpoch = newLeader.epoch(); + public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) { Review comment: I think there's some ambiguity in how this API is named. "Handling" a snapshot could mean either writing one or reading it. Judging from the parameter names here, I'm guessing that we want to read the snapshot here. My understanding is that this API will be called if the standby controller falls behind too far, OR right after it starts up and before any other records are read. Does that match up with your understanding? If that's true, then we should be able to put the snapshot loading stuff that currently exists (in the constructor) here, with a TODO for implementing snapshot loading AFTER the controller has already been initialized (which will involve zeroing out all the existing data structures and in-memory snapshots...) I think we should do that since I don't want this patch to move us backwards in snapshot support. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org