junrao commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1589798416


##########
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import org.apache.kafka.common.message.KRaftVersionRecord;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.ControlRecord;
+import org.apache.kafka.raft.Isolation;
+import org.apache.kafka.raft.LogFetchInfo;
+import org.apache.kafka.raft.ReplicatedLog;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.snapshot.RawSnapshotReader;
+import org.apache.kafka.snapshot.RecordsSnapshotReader;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+
+/**
+ * The KRaft state machine for tracking control records in the topic partition.
+ *
+ * This type keeps track of changes to the finalized kraft.version and the 
sets of voters between
+ * the latest snasphot and the log end offset.
+ *
+ * The are two actors/threads for this type. One is the KRaft driver which 
indirectly call a lot of

Review Comment:
   The are => There are



##########
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java:
##########
@@ -728,83 +730,114 @@ public static MemoryRecords withLeaderChangeMessage(
         ByteBuffer buffer,
         LeaderChangeMessage leaderChangeMessage
     ) {
-        writeLeaderChangeMessage(buffer, initialOffset, timestamp, 
leaderEpoch, leaderChangeMessage);
-        buffer.flip();
-        return MemoryRecords.readableRecords(buffer);
+        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+                initialOffset,
+                timestamp,
+                leaderEpoch,
+                buffer
+            )
+        ) {
+            builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+            return builder.build();
+        }
     }
 
-    private static void writeLeaderChangeMessage(
-        ByteBuffer buffer,
+    public static MemoryRecords withSnapshotHeaderRecord(
         long initialOffset,
         long timestamp,
         int leaderEpoch,
-        LeaderChangeMessage leaderChangeMessage
+        ByteBuffer buffer,
+        SnapshotHeaderRecord snapshotHeaderRecord
     ) {
-        try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
-            buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
-            TimestampType.CREATE_TIME, initialOffset, timestamp,
-            RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
-            false, true, leaderEpoch, buffer.capacity())
+        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+                initialOffset,
+                timestamp,
+                leaderEpoch,
+                buffer
+            )
         ) {
-            builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+            builder.appendSnapshotHeaderMessage(timestamp, 
snapshotHeaderRecord);
+            return builder.build();
         }
     }
 
-    public static MemoryRecords withSnapshotHeaderRecord(
+    public static MemoryRecords withSnapshotFooterRecord(
         long initialOffset,
         long timestamp,
         int leaderEpoch,
         ByteBuffer buffer,
-        SnapshotHeaderRecord snapshotHeaderRecord
+        SnapshotFooterRecord snapshotFooterRecord
     ) {
-        writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, 
leaderEpoch, snapshotHeaderRecord);
-        buffer.flip();
-        return MemoryRecords.readableRecords(buffer);
+        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+                initialOffset,
+                timestamp,
+                leaderEpoch,
+                buffer
+            )
+        ) {
+            builder.appendSnapshotFooterMessage(timestamp, 
snapshotFooterRecord);
+            return builder.build();
+        }
     }
 
-    private static void writeSnapshotHeaderRecord(
-        ByteBuffer buffer,
+    public static MemoryRecords withKRaftVersionRecord(
         long initialOffset,
         long timestamp,
         int leaderEpoch,
-        SnapshotHeaderRecord snapshotHeaderRecord
+        ByteBuffer buffer,
+        KRaftVersionRecord kraftVersionRecord
     ) {
-        try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
-            buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
-            TimestampType.CREATE_TIME, initialOffset, timestamp,
-            RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
-            false, true, leaderEpoch, buffer.capacity())
+        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+                initialOffset,
+                timestamp,
+                leaderEpoch,
+                buffer
+            )
         ) {
-            builder.appendSnapshotHeaderMessage(timestamp, 
snapshotHeaderRecord);
+            builder.appendKRaftVersionMessage(timestamp, kraftVersionRecord);
+            return builder.build();
         }
     }
 
-    public static MemoryRecords withSnapshotFooterRecord(
+    public static MemoryRecords withVotersRecord(
         long initialOffset,
         long timestamp,
         int leaderEpoch,
         ByteBuffer buffer,
-        SnapshotFooterRecord snapshotFooterRecord
+        VotersRecord votersRecord
     ) {
-        writeSnapshotFooterRecord(buffer, initialOffset, timestamp, 
leaderEpoch, snapshotFooterRecord);
-        buffer.flip();
-        return MemoryRecords.readableRecords(buffer);
+        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+                initialOffset,
+                timestamp,
+                leaderEpoch,
+                buffer
+            )
+        ) {
+            builder.appendVotersMessage(timestamp, votersRecord);
+            return builder.build();
+        }
     }
 
-    private static void writeSnapshotFooterRecord(
-        ByteBuffer buffer,
+    private static MemoryRecordsBuilder createKraftControlReccordBuilder(

Review Comment:
   typo Reccord



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1470,6 +1524,10 @@ private boolean handleFetchSnapshotResponse(
                     quorum.leaderIdOrSentinel()
                 );
 
+                // This will aways reload the snapshot because the internal 
next offset

Review Comment:
   typo aways



##########
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * from the static configuration.
+ */
+final public class VoterSet {
+    private final Map<Integer, VoterNode> voters;
+
+    VoterSet(Map<Integer, VoterNode> voters) {
+        if (voters.isEmpty()) {
+            throw new IllegalArgumentException("Voters cannot be empty");
+        }
+
+        this.voters = voters;
+    }
+
+    /**
+     * Returns the socket address for a given voter at a given listener.
+     *
+     * @param voter the id of the voter
+     * @param listener the name of the listener
+     * @return the socket address if it exists, otherwise {@code 
Optional.empty()}
+     */
+    public Optional<InetSocketAddress> voterAddress(int voter, String 
listener) {
+        return Optional.ofNullable(voters.get(voter))
+            .flatMap(voterNode -> voterNode.address(listener));
+    }
+
+    /**
+     * Returns all of the voter ids.
+     */
+    public Set<Integer> voterIds() {
+        return voters.keySet();
+    }
+
+    /**
+     * Adds a voter to the voter set.
+     *
+     * This object is immutable. A new voter set is returned if the voter was 
added.
+     *
+     * A new voter can be added to a voter set if its id doesn't already exist 
in the voter set.
+     *
+     * @param voter the new voter to add
+     * @return a new voter set if the voter was added, otherwise {@code 
Optional.empty()}
+     */
+    public Optional<VoterSet> addVoter(VoterNode voter) {
+        if (voters.containsKey(voter.voterKey().id())) {
+            return Optional.empty();
+        }
+
+        HashMap<Integer, VoterNode> newVoters = new HashMap<>(voters);
+        newVoters.put(voter.voterKey().id(), voter);
+
+        return Optional.of(new VoterSet(newVoters));
+    }
+
+    /**
+     * Remove a voter from the voter set.
+     *
+     * This object is immutable. A new voter set is returned if the voter was 
removed.
+     *
+     * A voter can be removed from the voter set if its id and directory id 
match.
+     *
+     * @param voterKey the voter key
+     * @return a new voter set if the voter was removed, otherwise {@code 
Optional.empty()}
+     */
+    public Optional<VoterSet> removeVoter(VoterKey voterKey) {
+        VoterNode oldVoter = voters.get(voterKey.id());
+        if (oldVoter != null && Objects.equals(oldVoter.voterKey(), voterKey)) 
{
+            HashMap<Integer, VoterNode> newVoters = new HashMap<>(voters);
+            newVoters.remove(voterKey.id());
+
+            return Optional.of(new VoterSet(newVoters));
+        }
+
+        return Optional.empty();
+    }
+
+    /**
+     * Converts a voter set to a voters record for a given version.
+     *
+     * @param version the version of the voters record
+     */
+    public VotersRecord toVotersRecord(short version) {
+        Function<VoterNode, VotersRecord.Voter> voterConvertor = voter -> {
+            Iterator<VotersRecord.Endpoint> endpoints = voter
+                .listeners()
+                .entrySet()
+                .stream()
+                .map(entry ->
+                    new VotersRecord.Endpoint()
+                        .setName(entry.getKey())
+                        .setHost(entry.getValue().getHostString())
+                        .setPort(entry.getValue().getPort())
+                )
+                .iterator();
+
+            VotersRecord.KRaftVersionFeature kraftVersionFeature = new 
VotersRecord.KRaftVersionFeature()
+                .setMinSupportedVersion(voter.supportedKRaftVersion().min())
+                .setMaxSupportedVersion(voter.supportedKRaftVersion().max());
+
+            return new VotersRecord.Voter()
+                .setVoterId(voter.voterKey().id())
+                
.setVoterDirectoryId(voter.voterKey().directoryId().orElse(Uuid.ZERO_UUID))
+                .setEndpoints(new VotersRecord.EndpointCollection(endpoints))
+                .setKRaftVersionFeature(kraftVersionFeature);
+        };
+
+        List<VotersRecord.Voter> voterRecordVoters = voters
+            .values()
+            .stream()
+            .map(voterConvertor)
+            .collect(Collectors.toList());
+
+        return new VotersRecord()
+            .setVersion(version)
+            .setVoters(voterRecordVoters);
+    }
+
+    /**
+     * Determines if two sets of voters have an overlapping majority.
+     *
+     * An overlapping majority means that for all majorities in {@code this} 
set of voters and for
+     * all majority in {@code that} set of voters, they have at least one 
voter in common.
+     *
+     * If this function returns true is means that one of the voter set 
commits an offset, it means
+     * that the other voter set cannot commit a conflicting offset.

Review Comment:
   This sentence doesn't read well.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -370,8 +368,52 @@ private void maybeFireLeaderChange() {
         }
     }
 
-    @Override
-    public void initialize() {
+    public void initialize(
+        Map<Integer, InetSocketAddress> voterAddresses,
+        String listenerName,
+        QuorumStateStore quorumStateStore,
+        Metrics metrics
+    ) {
+        partitionState = new KRaftControlRecordStateMachine(
+            Optional.of(VoterSet.fromInetSocketAddresses(listenerName, 
voterAddresses)),
+            log,
+            serde,
+            BufferSupplier.create(),
+            MAX_BATCH_SIZE_BYTES,
+            logContext
+        );
+        // Read the entire log
+        logger.info("Reading KRaft snapshot and log as part of the 
initialization");
+        partitionState.updateState();
+
+        requestManager = new RequestManager(
+            partitionState.lastVoterSet().voterIds(),
+            quorumConfig.retryBackoffMs(),
+            quorumConfig.requestTimeoutMs(),
+            random
+        );
+
+        quorum = new QuorumState(
+            nodeId,
+            partitionState.lastVoterSet().voterIds(),
+            quorumConfig.electionTimeoutMs(),
+            quorumConfig.fetchTimeoutMs(),
+            quorumStateStore,
+            time,
+            logContext,
+            random
+        );
+
+        kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum);
+        // All Raft voters are statically configured and known at startup
+        // so there are no unknown voter connections. Report this metric as 0.
+        kafkaRaftMetrics.updateNumUnknownVoterConnections(0);
+
+        VoterSet lastVoterSet = partitionState.lastVoterSet();

Review Comment:
   Would it be better to move this to before line 389 to avoid calling 
partitionState.lastVoterSet() multiple times?



##########
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting

Review Comment:
   typo providees



##########
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##########
@@ -245,6 +255,42 @@ private void appendControlMessage(Function<ByteBuffer, 
MemoryRecords> valueCreat
         }
     }
 
+    private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) 
{
+        // Confirm that it is at most one batch and it is a control record

Review Comment:
   I guess we support a batch with more than one control record?



##########
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import org.apache.kafka.common.message.KRaftVersionRecord;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.ControlRecord;
+import org.apache.kafka.raft.Isolation;
+import org.apache.kafka.raft.LogFetchInfo;
+import org.apache.kafka.raft.ReplicatedLog;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.snapshot.RawSnapshotReader;
+import org.apache.kafka.snapshot.RecordsSnapshotReader;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+
+/**
+ * The KRaft state machine for tracking control records in the topic partition.
+ *
+ * This type keeps track of changes to the finalized kraft.version and the 
sets of voters between
+ * the latest snasphot and the log end offset.
+ *
+ * The are two actors/threads for this type. One is the KRaft driver which 
indirectly call a lot of
+ * the public methods. The other are the callers of {@code 
RaftClient::createSnapshot} which
+ * indirectly call {@code voterSetAtOffset} and {@code kraftVersionAtOffset} 
when freezing a snapshot.
+ */
+final public class KRaftControlRecordStateMachine {
+    private final ReplicatedLog log;
+    private final RecordSerde<?> serde;
+    private final BufferSupplier bufferSupplier;
+    private final Logger logger;
+    private final int maxBatchSizeBytes;
+
+    // These objects are synchronized using their respective object monitor. 
The two actors
+    // are the KRaft driver when calling updateState and the RaftClient 
callers when freezing
+    // snapshots
+    private final VoterSetHistory voterSetHistory;
+    private final LogHistory<Short> kraftVersionHistory = new 
TreeMapLogHistory<>();
+
+    // This synchronization is enough because
+    // 1. The write operation updateState only sets the value without reading 
it and updates to
+    // voterSetHistory or kraftVersionHistory are done before setting the 
nextOffset
+    //
+    // 2. The read operations lastVoterSet, voterSetAtOffset and 
kraftVersionAtOffset read
+    // the nextOffset first before reading voterSetHistory or 
kraftVersionHistory
+    private volatile long nextOffset = 0;
+
+    /**
+     * Constructs an internal log listener
+     *
+     * @param staticVoterSet the set of voter statically configured
+     * @param log the on disk topic partition
+     * @param serde the record decoder for data records
+     * @param bufferSupplier the supplier of byte buffers
+     * @param maxBatchSizeBytes the maximum size of record batch
+     * @param logContext the log context
+     */
+    public KRaftControlRecordStateMachine(
+        Optional<VoterSet> staticVoterSet,
+        ReplicatedLog log,
+        RecordSerde<?> serde,
+        BufferSupplier bufferSupplier,
+        int maxBatchSizeBytes,
+        LogContext logContext
+    ) {
+        this.log = log;
+        this.voterSetHistory = new VoterSetHistory(staticVoterSet);
+        this.serde = serde;
+        this.bufferSupplier = bufferSupplier;
+        this.maxBatchSizeBytes = maxBatchSizeBytes;
+        this.logger = logContext.logger(this.getClass());
+    }
+
+    /**
+     * Must be called whenever the {@code log} has changed.
+     */
+    public void updateState() {
+        maybeLoadSnapshot();
+        maybeLoadLog();
+    }
+
+    /**
+     * Remove the head of the log until the given offset.
+     *
+     * @param endOffset the end offset (exclusive)
+     */
+    public void truncateNewEntries(long endOffset) {
+        synchronized (voterSetHistory) {
+            voterSetHistory.truncateNewEntries(endOffset);
+        }
+        synchronized (kraftVersionHistory) {
+            kraftVersionHistory.truncateNewEntries(endOffset);
+        }
+    }
+
+    /**
+     * Remove the tail of the log until the given offset.
+     *
+     * @param @startOffset the start offset (inclusive)

Review Comment:
   @startOffset => startOffset



##########
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * from the static configuration.
+ */
+final public class VoterSet {
+    private final Map<Integer, VoterNode> voters;

Review Comment:
   To support replacing disks, we need to support two voters with the same id, 
but different directory id. This means that voters need to be keyed on both id 
and directory id, right?



##########
raft/src/main/java/org/apache/kafka/raft/internals/TreeMapLogHistory.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.Map;
+
+/**
+ * A implementation for {@code LogHistory} which uses a red-black tree to 
store values sorted by offset.
+ */
+final public class TreeMapLogHistory<T> implements LogHistory<T> {
+    private final NavigableMap<Long, T> history = new TreeMap<>();
+
+    @Override
+    public void addAt(long offset, T value) {
+        if (offset < 0) {
+            throw new IllegalArgumentException(
+                String.format("Next offset %d must be greater than or equal to 
0", offset)
+            );
+        }
+
+        Map.Entry<Long, ?> lastEntry = history.lastEntry();
+        if (lastEntry != null && offset <= lastEntry.getKey()) {
+            throw new IllegalArgumentException(
+                String.format("Next offset %d must be greater than the last 
offset %d", offset, lastEntry.getKey())
+            );
+        }
+
+        history.put(offset, value);
+    }
+
+    @Override
+    public Optional<T> valueAtOrBefore(long offset) {
+        return 
Optional.ofNullable(history.floorEntry(offset)).map(Map.Entry::getValue);
+    }
+
+    @Override
+    public Optional<Entry<T>> lastEntry() {
+        return Optional.ofNullable(history.lastEntry()).map(entry -> new 
Entry<>(entry.getKey(), entry.getValue()));
+    }
+
+    @Override
+    public void truncateNewEntries(long endOffset) {
+        history.tailMap(endOffset, true).clear();
+    }
+
+    @Override
+    public void truncateOldEntries(long startOffset) {
+        NavigableMap<Long, T> lesserValues = history.headMap(startOffset, 
true);
+        while (lesserValues.size() > 1) {

Review Comment:
   The javadoc says this method preserves the last entry, but it doesn't seem 
it's implemented?



##########
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java:
##########
@@ -145,9 +147,7 @@ private Optional<Batch<T>> nextBatch() {
                 );
             }
 
-            if (!batch.records().isEmpty()) {
-                return Optional.of(batch);
-            }
+            return Optional.of(batch);

Review Comment:
   With this change, the `while` statement doesn't loop. Should we remove 
`while`?



##########
raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+
+/**
+ * A type for storing the historical value of the set of voters.
+ *
+ * This type can be use to keep track in-memory the sets for voters stored in 
the latest snapshot
+ * and log. This is useful when generating a new snapshot at a given offset or 
when evaulating
+ * the latest set of voters.
+ */
+final public class VoterSetHistory {
+    private final Optional<VoterSet> staticVoterSet;
+    private final LogHistory<VoterSet> votersHistory = new 
TreeMapLogHistory<>();
+
+    VoterSetHistory(Optional<VoterSet> staticVoterSet) {
+        this.staticVoterSet = staticVoterSet;
+    }
+
+    /**
+     * Add a new value at a given offset.
+     *
+     * The provided {@code offset} must be greater than or equal to 0 and must 
be greater than the
+     * offset of all previous calls to this method.
+     *
+     * @param offset the offset
+     * @param value the value to store
+     * @throws IllegalArgumentException if the offset is not greater than all 
previous offsets
+     */
+    public void addAt(long offset, VoterSet voters) {
+        Optional<LogHistory.Entry<VoterSet>> lastEntry = 
votersHistory.lastEntry();
+        if (lastEntry.isPresent() && lastEntry.get().offset() >= 0) {
+            // If the last voter set comes from the replicated log then the 
majorities must overlap.
+            // This ignores the static voter set and the bootstrapped voter 
set since they come from
+            // the configuration and the KRaft leader never guaranteed that 
they are the same across
+            // all replicas.
+            VoterSet lastVoterSet = lastEntry.get().value();
+            if (!lastVoterSet.hasOverlappingMajority(voters)) {
+                throw new IllegalArgumentException(
+                    String.format(
+                        "Last voter set %s doesn't have an overlapping 
majority with the new voter set %s",
+                        lastVoterSet,
+                        voters
+                    )
+                );
+            }
+        }
+
+        votersHistory.addAt(offset, voters);
+    }
+
+    /**
+     * Computes the value of the voter set at a given offset.
+     *
+     * This function will only return values provided through {@code addAt} 
and it would never
+     * include the {@code staticVoterSet} provided through the constructoer.

Review Comment:
   typo constructoer



##########
raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java:
##########
@@ -236,14 +235,14 @@ default long truncateToEndOffset(OffsetAndEpoch 
endOffset) {
      * snapshot already exists or it is less than log start offset then return 
an
      * {@link Optional#empty()}.
      *
-     * Snapshots created using this method will be validated against the 
existing snapshots
-     * and the replicated log.
+     * The snapshot id will be validated against the existing snapshots and 
the log. The snapshot id
+     * must not alread exist, it must be greater than the log start offset, it 
must be less than

Review Comment:
   typo alread



##########
raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+
+/**
+ * A type for storing the historical value of the set of voters.
+ *
+ * This type can be use to keep track in-memory the sets for voters stored in 
the latest snapshot
+ * and log. This is useful when generating a new snapshot at a given offset or 
when evaulating

Review Comment:
   typo evaulating



##########
raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+
+/**
+ * A type for storing the historical value of the set of voters.
+ *
+ * This type can be use to keep track in-memory the sets for voters stored in 
the latest snapshot

Review Comment:
   be use => be used



##########
raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+
+/**
+ * A type for storing the historical value of the set of voters.
+ *
+ * This type can be use to keep track in-memory the sets for voters stored in 
the latest snapshot
+ * and log. This is useful when generating a new snapshot at a given offset or 
when evaulating
+ * the latest set of voters.
+ */
+final public class VoterSetHistory {
+    private final Optional<VoterSet> staticVoterSet;
+    private final LogHistory<VoterSet> votersHistory = new 
TreeMapLogHistory<>();
+
+    VoterSetHistory(Optional<VoterSet> staticVoterSet) {
+        this.staticVoterSet = staticVoterSet;
+    }
+
+    /**
+     * Add a new value at a given offset.
+     *
+     * The provided {@code offset} must be greater than or equal to 0 and must 
be greater than the
+     * offset of all previous calls to this method.
+     *
+     * @param offset the offset
+     * @param value the value to store

Review Comment:
   value => voters



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to