jsancio commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r636340373



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -624,50 +617,71 @@ public String toString() {
         return event.future();
     }
 
-    class QuorumMetaLogListener implements MetaLogListener {
+    class QuorumMetaLogListener implements 
RaftClient.Listener<ApiMessageAndVersion> {
+
         @Override
-        public void handleCommits(long offset, List<ApiMessage> messages) {
-            appendControlEvent("handleCommits[" + offset + "]", () -> {
-                if (curClaimEpoch == -1) {
-                    // If the controller is a standby, replay the records that 
were
-                    // created by the active controller.
-                    if (log.isDebugEnabled()) {
-                        if (log.isTraceEnabled()) {
-                            log.trace("Replaying commits from the active node 
up to " +
-                                "offset {}: {}.", offset, messages.stream().
-                                map(m -> 
m.toString()).collect(Collectors.joining(", ")));
+        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
+            appendControlEvent("handleCommits[baseOffset=" + 
reader.baseOffset() + "]", () -> {
+                try {
+                    boolean isActiveController = curClaimEpoch != -1;
+                    while (reader.hasNext()) {
+                        Batch<ApiMessageAndVersion> batch = reader.next();
+                        long offset = batch.lastOffset();
+                        List<ApiMessageAndVersion> messages = batch.records();
+
+                        if (isActiveController) {
+                            // If the controller is active, the records were 
already replayed,
+                            // so we don't need to do it here.
+                            log.debug("Completing purgatory items up to offset 
{}.", offset);
+
+                            // Complete any events in the purgatory that were 
waiting for this offset.
+                            purgatory.completeUpTo(offset);
+
+                            // Delete all the in-memory snapshots that we no 
longer need.
+                            // If we are writing a new snapshot, then we need 
to keep that around;
+                            // otherwise, we should delete up to the current 
committed offset.
+                            snapshotRegistry.deleteSnapshotsUpTo(
+                                Math.min(offset, 
snapshotGeneratorManager.snapshotEpoch()));
+
                         } else {
-                            log.debug("Replaying commits from the active node 
up to " +
-                                "offset {}.", offset);
+                            // If the controller is a standby, replay the 
records that were
+                            // created by the active controller.
+                            if (log.isDebugEnabled()) {
+                                if (log.isTraceEnabled()) {
+                                    log.trace("Replaying commits from the 
active node up to " +
+                                        "offset {}: {}.", offset, 
messages.stream()
+                                        .map(ApiMessageAndVersion::toString)
+                                        .collect(Collectors.joining(", ")));
+                                } else {
+                                    log.debug("Replaying commits from the 
active node up to " +
+                                        "offset {}.", offset);
+                                }
+                            }
+                            for (ApiMessageAndVersion messageAndVersion : 
messages) {
+                                replay(messageAndVersion.message(), -1, 
offset);
+                            }
                         }
+                        lastCommittedOffset = offset;
                     }
-                    for (ApiMessage message : messages) {
-                        replay(message, -1, offset);
-                    }
-                } else {
-                    // If the controller is active, the records were already 
replayed,
-                    // so we don't need to do it here.
-                    log.debug("Completing purgatory items up to offset {}.", 
offset);
-
-                    // Complete any events in the purgatory that were waiting 
for this offset.
-                    purgatory.completeUpTo(offset);
-
-                    // Delete all the in-memory snapshots that we no longer 
need.
-                    // If we are writing a new snapshot, then we need to keep 
that around;
-                    // otherwise, we should delete up to the current committed 
offset.
-                    snapshotRegistry.deleteSnapshotsUpTo(
-                        Math.min(offset, 
snapshotGeneratorManager.snapshotEpoch()));
+                } finally {
+                    reader.close();
                 }
-                lastCommittedOffset = offset;
             });
         }
 
         @Override
-        public void handleNewLeader(MetaLogLeader newLeader) {
-            if (newLeader.nodeId() == nodeId) {
-                final long newEpoch = newLeader.epoch();
+        public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> 
reader) {

Review comment:
       > If that's true, then we should be able to put the snapshot loading 
stuff that currently exists (in the constructor) here, with a TODO for 
implementing snapshot loading AFTER the controller has already been initialized 
(which will involve zeroing out all the existing data structures and in-memory 
snapshots...) I think we should do that since I don't want this patch to move 
us backwards in snapshot support.
   
   We discussed this in another comment thread in this PR but this PR has a 
partial implementation of snapshot loading in the controller. I am going to 
implement snapshot re-loading as part of 
https://issues.apache.org/jira/browse/KAFKA-12787.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to