jsancio commented on a change in pull request #10705: URL: https://github.com/apache/kafka/pull/10705#discussion_r636340373
########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -624,50 +617,71 @@ public String toString() { return event.future(); } - class QuorumMetaLogListener implements MetaLogListener { + class QuorumMetaLogListener implements RaftClient.Listener<ApiMessageAndVersion> { + @Override - public void handleCommits(long offset, List<ApiMessage> messages) { - appendControlEvent("handleCommits[" + offset + "]", () -> { - if (curClaimEpoch == -1) { - // If the controller is a standby, replay the records that were - // created by the active controller. - if (log.isDebugEnabled()) { - if (log.isTraceEnabled()) { - log.trace("Replaying commits from the active node up to " + - "offset {}: {}.", offset, messages.stream(). - map(m -> m.toString()).collect(Collectors.joining(", "))); + public void handleCommit(BatchReader<ApiMessageAndVersion> reader) { + appendControlEvent("handleCommits[baseOffset=" + reader.baseOffset() + "]", () -> { + try { + boolean isActiveController = curClaimEpoch != -1; + while (reader.hasNext()) { + Batch<ApiMessageAndVersion> batch = reader.next(); + long offset = batch.lastOffset(); + List<ApiMessageAndVersion> messages = batch.records(); + + if (isActiveController) { + // If the controller is active, the records were already replayed, + // so we don't need to do it here. + log.debug("Completing purgatory items up to offset {}.", offset); + + // Complete any events in the purgatory that were waiting for this offset. + purgatory.completeUpTo(offset); + + // Delete all the in-memory snapshots that we no longer need. + // If we are writing a new snapshot, then we need to keep that around; + // otherwise, we should delete up to the current committed offset. + snapshotRegistry.deleteSnapshotsUpTo( + Math.min(offset, snapshotGeneratorManager.snapshotEpoch())); + } else { - log.debug("Replaying commits from the active node up to " + - "offset {}.", offset); + // If the controller is a standby, replay the records that were + // created by the active controller. + if (log.isDebugEnabled()) { + if (log.isTraceEnabled()) { + log.trace("Replaying commits from the active node up to " + + "offset {}: {}.", offset, messages.stream() + .map(ApiMessageAndVersion::toString) + .collect(Collectors.joining(", "))); + } else { + log.debug("Replaying commits from the active node up to " + + "offset {}.", offset); + } + } + for (ApiMessageAndVersion messageAndVersion : messages) { + replay(messageAndVersion.message(), -1, offset); + } } + lastCommittedOffset = offset; } - for (ApiMessage message : messages) { - replay(message, -1, offset); - } - } else { - // If the controller is active, the records were already replayed, - // so we don't need to do it here. - log.debug("Completing purgatory items up to offset {}.", offset); - - // Complete any events in the purgatory that were waiting for this offset. - purgatory.completeUpTo(offset); - - // Delete all the in-memory snapshots that we no longer need. - // If we are writing a new snapshot, then we need to keep that around; - // otherwise, we should delete up to the current committed offset. - snapshotRegistry.deleteSnapshotsUpTo( - Math.min(offset, snapshotGeneratorManager.snapshotEpoch())); + } finally { + reader.close(); } - lastCommittedOffset = offset; }); } @Override - public void handleNewLeader(MetaLogLeader newLeader) { - if (newLeader.nodeId() == nodeId) { - final long newEpoch = newLeader.epoch(); + public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) { Review comment: > If that's true, then we should be able to put the snapshot loading stuff that currently exists (in the constructor) here, with a TODO for implementing snapshot loading AFTER the controller has already been initialized (which will involve zeroing out all the existing data structures and in-memory snapshots...) I think we should do that since I don't want this patch to move us backwards in snapshot support. We discussed this in another comment thread in this PR but this PR has a partial implementation of snapshot loading in the controller. I am going to implement snapshot re-loading as part of https://issues.apache.org/jira/browse/KAFKA-12787. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org