hachikuji commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r622494008



##########
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##########
@@ -60,19 +61,21 @@
     @Override
     void close();
 
-    class Batch<T> {
+    final class Batch<T> implements Iterable<T> {
         private final long baseOffset;
         private final int epoch;
+        private final long lastOffset;
         private final List<T> records;
 
-        public Batch(long baseOffset, int epoch, List<T> records) {
+        private Batch(long baseOffset, int epoch, long lastOffset, List<T> 
records) {

Review comment:
       Perhaps it is clear enough already, but maybe we should document that 
these offsets are inclusive.

##########
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##########
@@ -226,7 +232,11 @@ class TestRaftServer(
             reader.close()
           }
 
-        case _ =>
+        case HandleSnapshot(reader) =>
+          // Ignore snapshots; only interested on records appended by this 
leader

Review comment:
       nit: on -> in?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -311,8 +311,18 @@ private void updateListenersProgress(long highWatermark) {
     private void updateListenersProgress(List<ListenerContext> 
listenerContexts, long highWatermark) {
         for (ListenerContext listenerContext : listenerContexts) {
             listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset 
-> {
-                if (nextExpectedOffset < log.startOffset()) {
-                    listenerContext.fireHandleSnapshot(log.startOffset());
+                if (nextExpectedOffset < log.startOffset() && 
nextExpectedOffset < highWatermark) {
+                    SnapshotReader<T> snapshot = 
latestSnapshot().orElseThrow(() -> {
+                        return new IllegalStateException(
+                            String.format(
+                                "Snapshot expected when next offset is %s, log 
start offset is %s and high-watermark is %s",

Review comment:
       Perhaps it is useful to mention the class of the listener since we are 
referring to its next expected offset?

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##########
@@ -426,48 +422,55 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch 
snapshotId) {
     public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {}
 
     @Override
-    public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) {
-        if (logStartOffset() > logStartSnapshotId.offset ||
-            highWatermark.offset < logStartSnapshotId.offset) {
-
+    public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {
+        if (logStartOffset() > snapshotId.offset) {
+            throw new OffsetOutOfRangeException(
+                String.format(
+                    "New log start (%s) is less than the curent log start 
offset (%s)",
+                    snapshotId,
+                    logStartOffset()
+                )
+            );
+        }
+        if (highWatermark.offset < snapshotId.offset) {
             throw new OffsetOutOfRangeException(
                 String.format(
-                    "New log start (%s) is less than start offset (%s) or is 
greater than the high watermark (%s)",
-                    logStartSnapshotId,
-                    logStartOffset(),
+                    "New log start (%s) is greater than the high watermark 
(%s)",
+                    snapshotId,
                     highWatermark.offset
                 )
             );
         }
 
         boolean updated = false;
-        Optional<OffsetAndEpoch> snapshotIdOpt = latestSnapshotId();
-        if (snapshotIdOpt.isPresent()) {
-            OffsetAndEpoch snapshotId = snapshotIdOpt.get();
-            if (startOffset() < logStartSnapshotId.offset &&
-                highWatermark.offset >= logStartSnapshotId.offset &&
-                snapshotId.offset >= logStartSnapshotId.offset) {
+        if (snapshots.containsKey(snapshotId)) {
+            snapshots.headMap(snapshotId, false).clear();
 
-                snapshots.headMap(logStartSnapshotId, false).clear();
+            // Update the high watermark if it is less than the new log start 
offset
+            if (snapshotId.offset > highWatermark.offset) {

Review comment:
       Maybe I am missing something, but how could this be possible given the 
check above?

##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##########
@@ -184,6 +187,22 @@ Builder appendToLog(int epoch, List<String> records) {
             return this;
         }
 
+        Builder withSnapshot(OffsetAndEpoch snapshotId) throws IOException {

Review comment:
       Maybe `withEmptySnapshot`?

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##########
@@ -363,33 +362,30 @@ public LogFetchInfo read(long startOffset, Isolation 
isolation) {
         }
 
         ByteBuffer buffer = ByteBuffer.allocate(512);
-        LogEntry firstEntry = null;
+        LogOffsetMetadata batchStartOffset = null;
 
         for (LogBatch batch : batches) {
             // Note that start offset is inclusive while max offset is 
exclusive. We only return
             // complete batches, so batches which end at an offset larger than 
the max offset are
             // filtered, which is effectively the same as having the consumer 
drop an incomplete
             // batch returned in a fetch response.
-            if (batch.lastOffset() >= startOffset) {
-                if (batch.lastOffset() < maxOffset) {
-                    buffer = batch.writeTo(buffer);
-                }
+            if (batch.lastOffset() >= startOffset && batch.lastOffset() < 
maxOffset && !batch.entries.isEmpty()) {

Review comment:
       I think this is a good change. I wonder if it creates a sort of reverse 
problem though where we end up not exercising multi-batch paths. Perhaps we 
could randomly choose 1-3 batches to return or something like that.

##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##########
@@ -1081,24 +1103,25 @@ OptionalInt currentClaimedEpoch() {
                 .orElse(null);
         }
 
-        @Override
-        public void handleClaim(int epoch) {
-            // We record the next expected offset as the claimed epoch's start
-            // offset. This is useful to verify that the `handleClaim` callback
-            // was not received early.
-            long claimedEpochStartOffset = lastCommitOffset().isPresent() ?
-                lastCommitOffset().getAsLong() + 1 : 0L;
-            this.currentClaimedEpoch = OptionalInt.of(epoch);
-            this.claimedEpochStartOffsets.put(epoch, claimedEpochStartOffset);
+        Optional<SnapshotReader<String>> takeSnapshot() {

Review comment:
       This name confused me a little bit. I thought it was generating a 
snapshot. Maybe something like `drainHandledSnapshot` or something like that 
would be clearer?

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class RecordsIterator<T> implements Iterator<Batch<T>>, 
AutoCloseable {
+    private final Records records;
+    private final RecordSerde<T> serde;
+    private final BufferSupplier bufferSupplier;
+    private final int batchSize;
+
+    private Iterator<MutableRecordBatch> nextBatches = 
Collections.emptyIterator();
+    private Optional<Batch<T>> nextBatch = Optional.empty();
+    // Buffer used as the backing store for nextBatches if needed
+    private Optional<ByteBuffer> allocatedBuffer = Optional.empty();
+    // Number of bytes from records read up to now
+    private int bytesRead = 0;
+    private boolean isClosed = false;
+
+    public RecordsIterator(
+        Records records,
+        RecordSerde<T> serde,
+        BufferSupplier bufferSupplier,
+        int batchSize
+    ) {
+        this.records = records;
+        this.serde = serde;
+        this.bufferSupplier = bufferSupplier;
+        this.batchSize = Math.max(batchSize, Records.HEADER_SIZE_UP_TO_MAGIC);
+    }
+
+    @Override
+    public boolean hasNext() {
+        ensureOpen();
+
+        if (!nextBatch.isPresent()) {
+            nextBatch = nextBatch();
+        }
+
+        return nextBatch.isPresent();
+    }
+
+    @Override
+    public Batch<T> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+        }
+
+        Batch<T> batch = nextBatch.get();
+        nextBatch = Optional.empty();
+
+        return batch;
+    }
+
+    @Override
+    public void close() {
+        isClosed = true;
+        allocatedBuffer.ifPresent(bufferSupplier::release);
+        allocatedBuffer = Optional.empty();
+    }
+
+    private void ensureOpen() {
+        if (isClosed) {
+            throw new IllegalStateException("Serde record batch itererator was 
closed");
+        }
+    }
+
+    private MemoryRecords readFileRecords(FileRecords fileRecords, ByteBuffer 
buffer) {
+        int start = buffer.position();
+        try {
+            fileRecords.readInto(buffer, bytesRead);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to read records into memory", 
e);
+        }
+
+        bytesRead += buffer.limit() - start;
+        return MemoryRecords.readableRecords(buffer.slice());
+    }
+
+    private MemoryRecords createMemoryRecords(FileRecords fileRecords) {
+        final ByteBuffer buffer;
+        if (allocatedBuffer.isPresent()) {
+            buffer = allocatedBuffer.get();
+            buffer.compact();
+        } else {
+            buffer = bufferSupplier.get(Math.min(batchSize, 
records.sizeInBytes()));
+            allocatedBuffer = Optional.of(buffer);
+        }
+
+        MemoryRecords memoryRecords = readFileRecords(fileRecords, buffer);
+
+        if (memoryRecords.firstBatchSize() < buffer.remaining()) {

Review comment:
       Since `firstBatchSize()` can return null, it might be worth adding a 
comment explaining that we guarantee a minimum size of 
`HEADER_SIZE_UP_TO_MAGIC`.

##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##########
@@ -717,7 +720,8 @@ void start(int nodeId) {
                 persistentState.store,
                 logContext,
                 time,
-                random
+                random,
+                serde

Review comment:
       Maybe a useful `Invariant` we can add here is that there always exists a 
snapshot corresponding to the log start offset.

##########
File path: 
raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+

Review comment:
       nit: extra newlines

##########
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##########
@@ -60,19 +61,21 @@
     @Override
     void close();
 
-    class Batch<T> {
+    final class Batch<T> implements Iterable<T> {

Review comment:
       Pulling it up sounds reasonable to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to