cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633910968



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -624,50 +617,71 @@ public String toString() {
         return event.future();
     }
 
-    class QuorumMetaLogListener implements MetaLogListener {
+    class QuorumMetaLogListener implements 
RaftClient.Listener<ApiMessageAndVersion> {
+
         @Override
-        public void handleCommits(long offset, List<ApiMessage> messages) {
-            appendControlEvent("handleCommits[" + offset + "]", () -> {
-                if (curClaimEpoch == -1) {
-                    // If the controller is a standby, replay the records that 
were
-                    // created by the active controller.
-                    if (log.isDebugEnabled()) {
-                        if (log.isTraceEnabled()) {
-                            log.trace("Replaying commits from the active node 
up to " +
-                                "offset {}: {}.", offset, messages.stream().
-                                map(m -> 
m.toString()).collect(Collectors.joining(", ")));
+        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
+            appendControlEvent("handleCommits[baseOffset=" + 
reader.baseOffset() + "]", () -> {
+                try {
+                    boolean isActiveController = curClaimEpoch != -1;
+                    while (reader.hasNext()) {
+                        Batch<ApiMessageAndVersion> batch = reader.next();
+                        long offset = batch.lastOffset();
+                        List<ApiMessageAndVersion> messages = batch.records();
+
+                        if (isActiveController) {
+                            // If the controller is active, the records were 
already replayed,
+                            // so we don't need to do it here.
+                            log.debug("Completing purgatory items up to offset 
{}.", offset);
+
+                            // Complete any events in the purgatory that were 
waiting for this offset.
+                            purgatory.completeUpTo(offset);
+
+                            // Delete all the in-memory snapshots that we no 
longer need.
+                            // If we are writing a new snapshot, then we need 
to keep that around;
+                            // otherwise, we should delete up to the current 
committed offset.
+                            snapshotRegistry.deleteSnapshotsUpTo(
+                                Math.min(offset, 
snapshotGeneratorManager.snapshotEpoch()));
+
                         } else {
-                            log.debug("Replaying commits from the active node 
up to " +
-                                "offset {}.", offset);
+                            // If the controller is a standby, replay the 
records that were
+                            // created by the active controller.
+                            if (log.isDebugEnabled()) {
+                                if (log.isTraceEnabled()) {
+                                    log.trace("Replaying commits from the 
active node up to " +
+                                        "offset {}: {}.", offset, 
messages.stream()
+                                        .map(ApiMessageAndVersion::toString)
+                                        .collect(Collectors.joining(", ")));
+                                } else {
+                                    log.debug("Replaying commits from the 
active node up to " +
+                                        "offset {}.", offset);
+                                }
+                            }
+                            for (ApiMessageAndVersion messageAndVersion : 
messages) {
+                                replay(messageAndVersion.message(), -1, 
offset);
+                            }
                         }
+                        lastCommittedOffset = offset;
                     }
-                    for (ApiMessage message : messages) {
-                        replay(message, -1, offset);
-                    }
-                } else {
-                    // If the controller is active, the records were already 
replayed,
-                    // so we don't need to do it here.
-                    log.debug("Completing purgatory items up to offset {}.", 
offset);
-
-                    // Complete any events in the purgatory that were waiting 
for this offset.
-                    purgatory.completeUpTo(offset);
-
-                    // Delete all the in-memory snapshots that we no longer 
need.
-                    // If we are writing a new snapshot, then we need to keep 
that around;
-                    // otherwise, we should delete up to the current committed 
offset.
-                    snapshotRegistry.deleteSnapshotsUpTo(
-                        Math.min(offset, 
snapshotGeneratorManager.snapshotEpoch()));
+                } finally {
+                    reader.close();
                 }
-                lastCommittedOffset = offset;
             });
         }
 
         @Override
-        public void handleNewLeader(MetaLogLeader newLeader) {
-            if (newLeader.nodeId() == nodeId) {
-                final long newEpoch = newLeader.epoch();
+        public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> 
reader) {

Review comment:
       I think there's some ambiguity in how this API is named. "Handling" a 
snapshot could mean either writing one or reading it. Judging from the 
parameter names here, I'm guessing that we want to read the snapshot here.
   
   My understanding is that this API will be called if the standby controller 
falls behind too far, OR right after it starts up and before any other records 
are read. Does that match up with your understanding?
   
   If that's true, then we should be able to put the snapshot loading stuff 
that currently exists (in the constructor) here, with a TODO for implementing 
snapshot loading AFTER the controller has already been initialized (which will 
involve zeroing out all the existing data structures and in-memory 
snapshots...) I think we should do that since I don't want this patch to move 
us backwards in snapshot support.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to