hachikuji commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r622494008
########## File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java ########## @@ -60,19 +61,21 @@ @Override void close(); - class Batch<T> { + final class Batch<T> implements Iterable<T> { private final long baseOffset; private final int epoch; + private final long lastOffset; private final List<T> records; - public Batch(long baseOffset, int epoch, List<T> records) { + private Batch(long baseOffset, int epoch, long lastOffset, List<T> records) { Review comment: Perhaps it is clear enough already, but maybe we should document that these offsets are inclusive. ########## File path: core/src/main/scala/kafka/tools/TestRaftServer.scala ########## @@ -226,7 +232,11 @@ class TestRaftServer( reader.close() } - case _ => + case HandleSnapshot(reader) => + // Ignore snapshots; only interested on records appended by this leader Review comment: nit: on -> in? ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -311,8 +311,18 @@ private void updateListenersProgress(long highWatermark) { private void updateListenersProgress(List<ListenerContext> listenerContexts, long highWatermark) { for (ListenerContext listenerContext : listenerContexts) { listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> { - if (nextExpectedOffset < log.startOffset()) { - listenerContext.fireHandleSnapshot(log.startOffset()); + if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) { + SnapshotReader<T> snapshot = latestSnapshot().orElseThrow(() -> { + return new IllegalStateException( + String.format( + "Snapshot expected when next offset is %s, log start offset is %s and high-watermark is %s", Review comment: Perhaps it is useful to mention the class of the listener since we are referring to its next expected offset? ########## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ########## @@ -426,48 +422,55 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) { public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {} @Override - public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) { - if (logStartOffset() > logStartSnapshotId.offset || - highWatermark.offset < logStartSnapshotId.offset) { - + public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) { + if (logStartOffset() > snapshotId.offset) { + throw new OffsetOutOfRangeException( + String.format( + "New log start (%s) is less than the curent log start offset (%s)", + snapshotId, + logStartOffset() + ) + ); + } + if (highWatermark.offset < snapshotId.offset) { throw new OffsetOutOfRangeException( String.format( - "New log start (%s) is less than start offset (%s) or is greater than the high watermark (%s)", - logStartSnapshotId, - logStartOffset(), + "New log start (%s) is greater than the high watermark (%s)", + snapshotId, highWatermark.offset ) ); } boolean updated = false; - Optional<OffsetAndEpoch> snapshotIdOpt = latestSnapshotId(); - if (snapshotIdOpt.isPresent()) { - OffsetAndEpoch snapshotId = snapshotIdOpt.get(); - if (startOffset() < logStartSnapshotId.offset && - highWatermark.offset >= logStartSnapshotId.offset && - snapshotId.offset >= logStartSnapshotId.offset) { + if (snapshots.containsKey(snapshotId)) { + snapshots.headMap(snapshotId, false).clear(); - snapshots.headMap(logStartSnapshotId, false).clear(); + // Update the high watermark if it is less than the new log start offset + if (snapshotId.offset > highWatermark.offset) { Review comment: Maybe I am missing something, but how could this be possible given the check above? ########## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ########## @@ -184,6 +187,22 @@ Builder appendToLog(int epoch, List<String> records) { return this; } + Builder withSnapshot(OffsetAndEpoch snapshotId) throws IOException { Review comment: Maybe `withEmptySnapshot`? ########## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ########## @@ -363,33 +362,30 @@ public LogFetchInfo read(long startOffset, Isolation isolation) { } ByteBuffer buffer = ByteBuffer.allocate(512); - LogEntry firstEntry = null; + LogOffsetMetadata batchStartOffset = null; for (LogBatch batch : batches) { // Note that start offset is inclusive while max offset is exclusive. We only return // complete batches, so batches which end at an offset larger than the max offset are // filtered, which is effectively the same as having the consumer drop an incomplete // batch returned in a fetch response. - if (batch.lastOffset() >= startOffset) { - if (batch.lastOffset() < maxOffset) { - buffer = batch.writeTo(buffer); - } + if (batch.lastOffset() >= startOffset && batch.lastOffset() < maxOffset && !batch.entries.isEmpty()) { Review comment: I think this is a good change. I wonder if it creates a sort of reverse problem though where we end up not exercising multi-batch paths. Perhaps we could randomly choose 1-3 batches to return or something like that. ########## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ########## @@ -1081,24 +1103,25 @@ OptionalInt currentClaimedEpoch() { .orElse(null); } - @Override - public void handleClaim(int epoch) { - // We record the next expected offset as the claimed epoch's start - // offset. This is useful to verify that the `handleClaim` callback - // was not received early. - long claimedEpochStartOffset = lastCommitOffset().isPresent() ? - lastCommitOffset().getAsLong() + 1 : 0L; - this.currentClaimedEpoch = OptionalInt.of(epoch); - this.claimedEpochStartOffsets.put(epoch, claimedEpochStartOffset); + Optional<SnapshotReader<String>> takeSnapshot() { Review comment: This name confused me a little bit. I thought it was generating a snapshot. Maybe something like `drainHandledSnapshot` or something like that would be clearer? ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import org.apache.kafka.common.protocol.DataInputStreamReadable; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.BatchReader.Batch; +import org.apache.kafka.raft.RecordSerde; + +public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseable { + private final Records records; + private final RecordSerde<T> serde; + private final BufferSupplier bufferSupplier; + private final int batchSize; + + private Iterator<MutableRecordBatch> nextBatches = Collections.emptyIterator(); + private Optional<Batch<T>> nextBatch = Optional.empty(); + // Buffer used as the backing store for nextBatches if needed + private Optional<ByteBuffer> allocatedBuffer = Optional.empty(); + // Number of bytes from records read up to now + private int bytesRead = 0; + private boolean isClosed = false; + + public RecordsIterator( + Records records, + RecordSerde<T> serde, + BufferSupplier bufferSupplier, + int batchSize + ) { + this.records = records; + this.serde = serde; + this.bufferSupplier = bufferSupplier; + this.batchSize = Math.max(batchSize, Records.HEADER_SIZE_UP_TO_MAGIC); + } + + @Override + public boolean hasNext() { + ensureOpen(); + + if (!nextBatch.isPresent()) { + nextBatch = nextBatch(); + } + + return nextBatch.isPresent(); + } + + @Override + public Batch<T> next() { + if (!hasNext()) { + throw new NoSuchElementException("Batch iterator doesn't have any more elements"); + } + + Batch<T> batch = nextBatch.get(); + nextBatch = Optional.empty(); + + return batch; + } + + @Override + public void close() { + isClosed = true; + allocatedBuffer.ifPresent(bufferSupplier::release); + allocatedBuffer = Optional.empty(); + } + + private void ensureOpen() { + if (isClosed) { + throw new IllegalStateException("Serde record batch itererator was closed"); + } + } + + private MemoryRecords readFileRecords(FileRecords fileRecords, ByteBuffer buffer) { + int start = buffer.position(); + try { + fileRecords.readInto(buffer, bytesRead); + } catch (IOException e) { + throw new RuntimeException("Failed to read records into memory", e); + } + + bytesRead += buffer.limit() - start; + return MemoryRecords.readableRecords(buffer.slice()); + } + + private MemoryRecords createMemoryRecords(FileRecords fileRecords) { + final ByteBuffer buffer; + if (allocatedBuffer.isPresent()) { + buffer = allocatedBuffer.get(); + buffer.compact(); + } else { + buffer = bufferSupplier.get(Math.min(batchSize, records.sizeInBytes())); + allocatedBuffer = Optional.of(buffer); + } + + MemoryRecords memoryRecords = readFileRecords(fileRecords, buffer); + + if (memoryRecords.firstBatchSize() < buffer.remaining()) { Review comment: Since `firstBatchSize()` can return null, it might be worth adding a comment explaining that we guarantee a minimum size of `HEADER_SIZE_UP_TO_MAGIC`. ########## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ########## @@ -717,7 +720,8 @@ void start(int nodeId) { persistentState.store, logContext, time, - random + random, + serde Review comment: Maybe a useful `Invariant` we can add here is that there always exists a snapshot corresponding to the log start offset. ########## File path: raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + + Review comment: nit: extra newlines ########## File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java ########## @@ -60,19 +61,21 @@ @Override void close(); - class Batch<T> { + final class Batch<T> implements Iterable<T> { Review comment: Pulling it up sounds reasonable to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org