jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581905289
########## raft/src/main/java/org/apache/kafka/raft/internals/PartitionListener.java: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.Optional; +import java.util.OptionalLong; +import org.apache.kafka.common.message.KRaftVersionRecord; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.ControlRecord; +import org.apache.kafka.raft.Isolation; +import org.apache.kafka.raft.LogFetchInfo; +import org.apache.kafka.raft.ReplicatedLog; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.snapshot.RawSnapshotReader; +import org.apache.kafka.snapshot.RecordsSnapshotReader; +import org.apache.kafka.snapshot.SnapshotReader; +import org.slf4j.Logger; + +/** + * The KRaft state machine for tracking control records in the topic partition. + * + * This type keeps track of changes to the finalized kraft.version and the sets of voters. + */ +final public class PartitionListener { + private final ReplicatedLog log; + private final RecordSerde<?> serde; + private final BufferSupplier bufferSupplier; + private final Logger logger; + private final int maxBatchSizeBytes; + + // These are objects are synchronized using the perspective object monitor. The two actors + // are the KRaft driver and the RaftClient callers + private final VoterSetHistory voterSetHistory; + private final History<Short> kraftVersionHistory = new TreeMapHistory<>(); + + // This synchronization is enough because + // 1. The write operation updateListener only sets the value without reading and updates to + // voterSetHistory or kraftVersionHistory are done before setting the nextOffset + // + // 2. The read operations lastVoterSet, voterSetAtOffset and kraftVersionAtOffset read + // the nextOffset first before reading voterSetHistory or kraftVersionHistory + private volatile long nextOffset = 0; + + /** + * Constructs an internal log listener + * + * @param staticVoterSet the set of voter statically configured + * @param log the on disk topic partition + * @param serde the record decoder for data records + * @param bufferSupplier the supplier of byte buffers + * @param maxBatchSizeBytes the maximum size of record batch + * @param logContext the log context + */ + public PartitionListener( + Optional<VoterSet> staticVoterSet, + ReplicatedLog log, + RecordSerde<?> serde, + BufferSupplier bufferSupplier, + int maxBatchSizeBytes, + LogContext logContext + ) { + this.log = log; + this.voterSetHistory = new VoterSetHistory(staticVoterSet); + this.serde = serde; + this.bufferSupplier = bufferSupplier; + this.maxBatchSizeBytes = maxBatchSizeBytes; + this.logger = logContext.logger(this.getClass()); + } + + /** + * Must be called whenever the {@code log} has changed. + */ + public void updateListener() { + maybeLoadSnapshot(); + maybeLoadLog(); + } + + /** + * Remove the head of the log until the given offset. + * + * @param endOffset the end offset (exclusive) + */ + public void truncateTo(long endOffset) { + synchronized (voterSetHistory) { + voterSetHistory.truncateTo(endOffset); + } + synchronized (kraftVersionHistory) { + kraftVersionHistory.truncateTo(endOffset); + } + } + + /** + * Remove the tail of the log until the given offset. + * + * @param @startOffset the start offset (inclusive) + */ + public void trimPrefixTo(long startOffset) { + synchronized (voterSetHistory) { + voterSetHistory.trimPrefixTo(startOffset); + } + synchronized (kraftVersionHistory) { + kraftVersionHistory.trimPrefixTo(startOffset); + } + } + + /** + * Returns the last voter set. + */ + public VoterSet lastVoterSet() { + synchronized (voterSetHistory) { + return voterSetHistory.lastValue(); + } + } + + /** + * Rturns the voter set at a given offset. + * + * @param offset the offset (inclusive) + * @return the voter set if one exist, otherwise {@code Optional.empty()} + */ + public Optional<VoterSet> voterSetAtOffset(long offset) { + long fixedNextOffset = nextOffset; + if (offset >= fixedNextOffset) { + throw new IllegalArgumentException( + String.format( + "Attempting the read the voter set at an offset (%d) which kraft hasn't seen (%d)", + offset, + fixedNextOffset - 1 + ) + ); + } + + synchronized (voterSetHistory) { + return voterSetHistory.valueAtOrBefore(offset); + } + } + + /** + * Returns the finalized kraft version at a given offset. + * + * @param offset the offset (inclusive) + * @return the finalized kraft version if one exist, otherwise 0 + */ + public short kraftVersionAtOffset(long offset) { Review Comment: What do you mean? I created `checkOffsetIsValid` is that what you meant? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org