jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595916355


##########
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##########
@@ -245,6 +255,42 @@ private void appendControlMessage(Function<ByteBuffer, 
MemoryRecords> valueCreat
         }
     }
 
+    private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) 
{
+        // Confirm that it is at most one batch and it is a control record

Review Comment:
   Yeah. I fixed the comment and the implementation.



##########
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import org.apache.kafka.common.message.KRaftVersionRecord;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.ControlRecord;
+import org.apache.kafka.raft.Isolation;
+import org.apache.kafka.raft.LogFetchInfo;
+import org.apache.kafka.raft.ReplicatedLog;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.snapshot.RawSnapshotReader;
+import org.apache.kafka.snapshot.RecordsSnapshotReader;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+
+/**
+ * The KRaft state machine for tracking control records in the topic partition.
+ *
+ * This type keeps track of changes to the finalized kraft.version and the 
sets of voters between
+ * the latest snasphot and the log end offset.
+ *
+ * The are two actors/threads for this type. One is the KRaft driver which 
indirectly call a lot of
+ * the public methods. The other are the callers of {@code 
RaftClient::createSnapshot} which
+ * indirectly call {@code voterSetAtOffset} and {@code kraftVersionAtOffset} 
when freezing a snapshot.
+ */
+final public class KRaftControlRecordStateMachine {
+    private final ReplicatedLog log;
+    private final RecordSerde<?> serde;
+    private final BufferSupplier bufferSupplier;
+    private final Logger logger;
+    private final int maxBatchSizeBytes;
+
+    // These objects are synchronized using their respective object monitor. 
The two actors
+    // are the KRaft driver when calling updateState and the RaftClient 
callers when freezing
+    // snapshots
+    private final VoterSetHistory voterSetHistory;
+    private final LogHistory<Short> kraftVersionHistory = new 
TreeMapLogHistory<>();
+
+    // This synchronization is enough because
+    // 1. The write operation updateState only sets the value without reading 
it and updates to
+    // voterSetHistory or kraftVersionHistory are done before setting the 
nextOffset
+    //
+    // 2. The read operations lastVoterSet, voterSetAtOffset and 
kraftVersionAtOffset read
+    // the nextOffset first before reading voterSetHistory or 
kraftVersionHistory
+    private volatile long nextOffset = 0;
+
+    /**
+     * Constructs an internal log listener
+     *
+     * @param staticVoterSet the set of voter statically configured
+     * @param log the on disk topic partition
+     * @param serde the record decoder for data records
+     * @param bufferSupplier the supplier of byte buffers
+     * @param maxBatchSizeBytes the maximum size of record batch
+     * @param logContext the log context
+     */
+    public KRaftControlRecordStateMachine(
+        Optional<VoterSet> staticVoterSet,
+        ReplicatedLog log,
+        RecordSerde<?> serde,
+        BufferSupplier bufferSupplier,
+        int maxBatchSizeBytes,
+        LogContext logContext
+    ) {
+        this.log = log;
+        this.voterSetHistory = new VoterSetHistory(staticVoterSet);
+        this.serde = serde;
+        this.bufferSupplier = bufferSupplier;
+        this.maxBatchSizeBytes = maxBatchSizeBytes;
+        this.logger = logContext.logger(this.getClass());
+    }
+
+    /**
+     * Must be called whenever the {@code log} has changed.
+     */
+    public void updateState() {
+        maybeLoadSnapshot();
+        maybeLoadLog();
+    }
+
+    /**
+     * Remove the head of the log until the given offset.
+     *
+     * @param endOffset the end offset (exclusive)
+     */
+    public void truncateNewEntries(long endOffset) {
+        synchronized (voterSetHistory) {
+            voterSetHistory.truncateNewEntries(endOffset);
+        }
+        synchronized (kraftVersionHistory) {
+            kraftVersionHistory.truncateNewEntries(endOffset);
+        }
+    }
+
+    /**
+     * Remove the tail of the log until the given offset.
+     *
+     * @param @startOffset the start offset (inclusive)

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to