[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-30 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r624230147



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -966,6 +967,54 @@ public void verify() {
 }
 }
 
+private static class SnapshotAtLogStart implements Invariant {

Review comment:
   Okay. The simulations now take less than a minute in total on my machine 
which is what I see on trunk in my machine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-30 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r624200356



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -966,6 +967,54 @@ public void verify() {
 }
 }
 
+private static class SnapshotAtLogStart implements Invariant {

Review comment:
   Good catch. I made some improvements After the improvements the 
simulation time went from 1 minute in trunk to a 1:41 minutes for this PR. It 
looks like a good amount of that time is spent in 
MockLog.validateOffsetAndEpoch. Let me see what we can do but we can also 
remove that functionality from the verification.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-30 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r624200356



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -966,6 +967,54 @@ public void verify() {
 }
 }
 
+private static class SnapshotAtLogStart implements Invariant {

Review comment:
   Good catch. I made some improvements After the improvements the 
simulation time went from 1 minute in trunk to a 1:41 minutes for this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-30 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r624166487



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##
@@ -75,56 +75,6 @@ public void testTopicId() {
 assertEquals(topicId, log.topicId());
 }
 
-@Test
-public void testAppendAsLeaderHelper() {

Review comment:
   I actually, move this to `testAppendAsLeader`. The two tests were 
testing almost the same things. I just merged them so that one test now covers 
all of the cases that didn't overlap.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-30 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r624166487



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##
@@ -75,56 +75,6 @@ public void testTopicId() {
 assertEquals(topicId, log.topicId());
 }
 
-@Test
-public void testAppendAsLeaderHelper() {

Review comment:
   I actually, move this to `testAppendAsLeader`. The two tests were 
testing also the same things. I just merged them so that one test now covers 
all of the cases that didn't overlap.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-29 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r623370656



##
File path: 
raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+

Review comment:
   Lines removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-29 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r623370427



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -426,48 +422,55 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch 
snapshotId) {
 public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {}
 
 @Override
-public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) {
-if (logStartOffset() > logStartSnapshotId.offset ||
-highWatermark.offset < logStartSnapshotId.offset) {
-
+public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {
+if (logStartOffset() > snapshotId.offset) {
+throw new OffsetOutOfRangeException(
+String.format(
+"New log start (%s) is less than the curent log start 
offset (%s)",
+snapshotId,
+logStartOffset()
+)
+);
+}
+if (highWatermark.offset < snapshotId.offset) {
 throw new OffsetOutOfRangeException(
 String.format(
-"New log start (%s) is less than start offset (%s) or is 
greater than the high watermark (%s)",
-logStartSnapshotId,
-logStartOffset(),
+"New log start (%s) is greater than the high watermark 
(%s)",
+snapshotId,
 highWatermark.offset
 )
 );
 }
 
 boolean updated = false;
-Optional snapshotIdOpt = latestSnapshotId();
-if (snapshotIdOpt.isPresent()) {
-OffsetAndEpoch snapshotId = snapshotIdOpt.get();
-if (startOffset() < logStartSnapshotId.offset &&
-highWatermark.offset >= logStartSnapshotId.offset &&
-snapshotId.offset >= logStartSnapshotId.offset) {
+if (snapshots.containsKey(snapshotId)) {
+snapshots.headMap(snapshotId, false).clear();
 
-snapshots.headMap(logStartSnapshotId, false).clear();
+// Update the high watermark if it is less than the new log start 
offset
+if (snapshotId.offset > highWatermark.offset) {

Review comment:
   You are correct. This not needed.

##
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##
@@ -184,6 +187,22 @@ Builder appendToLog(int epoch, List records) {
 return this;
 }
 
+Builder withSnapshot(OffsetAndEpoch snapshotId) throws IOException {

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-29 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r623370233



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -363,33 +362,30 @@ public LogFetchInfo read(long startOffset, Isolation 
isolation) {
 }
 
 ByteBuffer buffer = ByteBuffer.allocate(512);
-LogEntry firstEntry = null;
+LogOffsetMetadata batchStartOffset = null;
 
 for (LogBatch batch : batches) {
 // Note that start offset is inclusive while max offset is 
exclusive. We only return
 // complete batches, so batches which end at an offset larger than 
the max offset are
 // filtered, which is effectively the same as having the consumer 
drop an incomplete
 // batch returned in a fetch response.
-if (batch.lastOffset() >= startOffset) {
-if (batch.lastOffset() < maxOffset) {
-buffer = batch.writeTo(buffer);
-}
+if (batch.lastOffset() >= startOffset && batch.lastOffset() < 
maxOffset && !batch.entries.isEmpty()) {

Review comment:
   I wanted to keep the tests reproducible. As a comprise I changed this 
code to read at most 2 batches. We can revisit this in the future and make it 
random but reproducible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-29 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r623369093



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -717,7 +720,8 @@ void start(int nodeId) {
 persistentState.store,
 logContext,
 time,
-random
+random,
+serde

Review comment:
   Yes. Good idea. Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-29 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r623368881



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##
@@ -1081,24 +1103,25 @@ OptionalInt currentClaimedEpoch() {
 .orElse(null);
 }
 
-@Override
-public void handleClaim(int epoch) {
-// We record the next expected offset as the claimed epoch's start
-// offset. This is useful to verify that the `handleClaim` callback
-// was not received early.
-long claimedEpochStartOffset = lastCommitOffset().isPresent() ?
-lastCommitOffset().getAsLong() + 1 : 0L;
-this.currentClaimedEpoch = OptionalInt.of(epoch);
-this.claimedEpochStartOffsets.put(epoch, claimedEpochStartOffset);
+Optional> takeSnapshot() {

Review comment:
   Yep. Changed the name as suggested. The name was inspired that a naming 
pattern used in Rust 
(https://doc.rust-lang.org/std/option/enum.Option.html#method.take) but I see 
that we use `takeSnapshot` in other parts of Kafka for creating snapshots.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-29 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r623367561



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class RecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int batchSize;
+
+private Iterator nextBatches = 
Collections.emptyIterator();
+private Optional> nextBatch = Optional.empty();
+// Buffer used as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records read up to now
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public RecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int batchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.batchSize = Math.max(batchSize, Records.HEADER_SIZE_UP_TO_MAGIC);
+}
+
+@Override
+public boolean hasNext() {
+ensureOpen();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void ensureOpen() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private MemoryRecords readFileRecords(FileRecords fileRecords, ByteBuffer 
buffer) {
+int start = buffer.position();
+try {
+fileRecords.readInto(buffer, bytesRead);
+} catch (IOException e) {
+throw new RuntimeException("Failed to read records into memory", 
e);
+}
+
+bytesRead += buffer.limit() - start;
+return MemoryRecords.readableRecords(buffer.slice());
+}
+
+private MemoryRecords createMemoryRecords(FileRecords fileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+} else {
+buffer = bufferSupplier.get(Math.min(batchSize, 
records.sizeInBytes()));
+allocatedBuffer = Optional.of(buffer);
+}
+
+MemoryRecords memoryRecords = readFileRecords(fileRecords, buffer);
+
+if (memoryRecords.firstBatchSize() < buffer.remaining()) {

Review comment:
   Yes. Added a comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-29 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r623367352



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -311,8 +311,18 @@ private void updateListenersProgress(long highWatermark) {
 private void updateListenersProgress(List 
listenerContexts, long highWatermark) {
 for (ListenerContext listenerContext : listenerContexts) {
 listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset 
-> {
-if (nextExpectedOffset < log.startOffset()) {
-listenerContext.fireHandleSnapshot(log.startOffset());
+if (nextExpectedOffset < log.startOffset() && 
nextExpectedOffset < highWatermark) {
+SnapshotReader snapshot = 
latestSnapshot().orElseThrow(() -> {
+return new IllegalStateException(
+String.format(
+"Snapshot expected when next offset is %s, log 
start offset is %s and high-watermark is %s",

Review comment:
   Good idea. I decided to use `getClass().getTypeName()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-29 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r623367035



##
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##
@@ -60,19 +61,21 @@
 @Override
 void close();
 
-class Batch {
+final class Batch implements Iterable {
 private final long baseOffset;
 private final int epoch;
+private final long lastOffset;
 private final List records;
 
-public Batch(long baseOffset, int epoch, List records) {
+private Batch(long baseOffset, int epoch, long lastOffset, List 
records) {

Review comment:
   Yes. I documented the static methods for creating objects of this type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-29 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r623366415



##
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##
@@ -226,7 +232,11 @@ class TestRaftServer(
 reader.close()
   }
 
-case _ =>
+case HandleSnapshot(reader) =>
+  // Ignore snapshots; only interested on records appended by this 
leader

Review comment:
   Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995466



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2154,8 +2182,14 @@ private boolean maybeCompleteShutdown(long 
currentTimeMs) {
 return false;
 }
 
-private void maybeUpdateOldestSnapshotId() {
-log.latestSnapshotId().ifPresent(log::deleteBeforeSnapshot);
+private void maybeUpdateEarliestSnapshotId() {

Review comment:
   I changed the method name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995406



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -326,6 +336,14 @@ private void updateListenersProgress(List 
listenerContexts, lon
 }
 }
 
+private Optional> latestSnapshot() {
+return log.latestSnapshotId().flatMap(snapshoId -> {
+return log

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995402



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -326,6 +336,14 @@ private void updateListenersProgress(List 
listenerContexts, lon
 }
 }
 
+private Optional> latestSnapshot() {
+return log.latestSnapshotId().flatMap(snapshoId -> {

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995361



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -68,20 +70,65 @@ public synchronized void increment() {
 @Override
 public synchronized void handleCommit(BatchReader reader) {
 try {
-int initialValue = this.committed;
+int initialValue = committed;
 while (reader.hasNext()) {
 BatchReader.Batch batch = reader.next();
 log.debug("Handle commit of batch with records {} at base 
offset {}",
 batch.records(), batch.baseOffset());
 for (Integer value : batch.records()) {
-if (value != this.committed + 1) {
-throw new AssertionError("Expected next committed 
value to be " +
-(this.committed + 1) + ", but instead found " + 
value + " on node " + nodeId);
+if (value != committed + 1) {

Review comment:
   Done. Not sure if it is any better :smile: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995276



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -68,20 +70,65 @@ public synchronized void increment() {
 @Override
 public synchronized void handleCommit(BatchReader reader) {
 try {
-int initialValue = this.committed;
+int initialValue = committed;
 while (reader.hasNext()) {
 BatchReader.Batch batch = reader.next();
 log.debug("Handle commit of batch with records {} at base 
offset {}",
 batch.records(), batch.baseOffset());
 for (Integer value : batch.records()) {
-if (value != this.committed + 1) {
-throw new AssertionError("Expected next committed 
value to be " +
-(this.committed + 1) + ", but instead found " + 
value + " on node " + nodeId);
+if (value != committed + 1) {
+throw new AssertionError(
+String.format(
+"Expected next committed value to be %s, but 
instead found %s on node %s",
+committed + 1,
+value,
+nodeId
+)
+);
 }
-this.committed = value;
+committed = value;
 }
+
+nextReadOffset = batch.lastOffset() + 1;
+readEpoch = batch.epoch();
 }
 log.debug("Counter incremented from {} to {}", initialValue, 
committed);
+
+if (lastSnapshotEndOffset + 10 < nextReadOffset) {

Review comment:
   Made it a variable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995276



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -68,20 +70,65 @@ public synchronized void increment() {
 @Override
 public synchronized void handleCommit(BatchReader reader) {
 try {
-int initialValue = this.committed;
+int initialValue = committed;
 while (reader.hasNext()) {
 BatchReader.Batch batch = reader.next();
 log.debug("Handle commit of batch with records {} at base 
offset {}",
 batch.records(), batch.baseOffset());
 for (Integer value : batch.records()) {
-if (value != this.committed + 1) {
-throw new AssertionError("Expected next committed 
value to be " +
-(this.committed + 1) + ", but instead found " + 
value + " on node " + nodeId);
+if (value != committed + 1) {
+throw new AssertionError(
+String.format(
+"Expected next committed value to be %s, but 
instead found %s on node %s",
+committed + 1,
+value,
+nodeId
+)
+);
 }
-this.committed = value;
+committed = value;
 }
+
+nextReadOffset = batch.lastOffset() + 1;
+readEpoch = batch.epoch();
 }
 log.debug("Counter incremented from {} to {}", initialValue, 
committed);
+
+if (lastSnapshotEndOffset + 10 < nextReadOffset) {

Review comment:
   Made it configurable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995124



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -355,24 +373,29 @@ private void fireHandleResign(int epoch) {
 }
 
 @Override
-public void initialize() throws IOException {
-quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
+public void initialize() {
+try {
+quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
 
-long currentTimeMs = time.milliseconds();
-if (quorum.isLeader()) {
-throw new IllegalStateException("Voter cannot initialize as a 
Leader");
-} else if (quorum.isCandidate()) {
-onBecomeCandidate(currentTimeMs);
-} else if (quorum.isFollower()) {
-onBecomeFollower(currentTimeMs);
-}
+long currentTimeMs = time.milliseconds();
+if (quorum.isLeader()) {
+throw new IllegalStateException("Voter cannot initialize as a 
Leader");
+} else if (quorum.isCandidate()) {
+onBecomeCandidate(currentTimeMs);
+} else if (quorum.isFollower()) {
+onBecomeFollower(currentTimeMs);
+}
 
-// When there is only a single voter, become candidate immediately
-if (quorum.isVoter()
-&& quorum.remoteVoters().isEmpty()
-&& !quorum.isLeader()
-&& !quorum.isCandidate()) {
-transitionToCandidate(currentTimeMs);
+// When there is only a single voter, become candidate immediately
+if (quorum.isVoter()
+&& quorum.remoteVoters().isEmpty()
+&& !quorum.isLeader()

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610995053



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
##
@@ -163,48 +85,46 @@ public OptionalLong lastOffset() {
 
 @Override
 public void close() {
-isClosed = true;
+if (!isClosed) {
+isClosed = true;
 
-if (allocatedBuffer != null) {
-bufferSupplier.release(allocatedBuffer);
+iterator.close();
+closeListener.onClose(this);
 }
-
-closeListener.onClose(this);
 }
 
-public T readRecord(Readable input) {
-// Read size of body in bytes
-input.readVarint();
-
-// Read unused attributes
-input.readByte();
-
-long timestampDelta = input.readVarlong();
-if (timestampDelta != 0) {
-throw new IllegalArgumentException();
-}
-
-// Read offset delta
-input.readVarint();
-
-int keySize = input.readVarint();
-if (keySize != -1) {
-throw new IllegalArgumentException("Unexpected key size " + 
keySize);
-}
+public static  RecordsBatchReader of(
+long baseOffset,
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize,
+CloseListener> closeListener
+) {
+return new RecordsBatchReader<>(
+baseOffset,
+new SerdeRecordsIterator<>(records, serde, bufferSupplier, 
maxBatchSize),
+closeListener
+);
+}
 
-int valueSize = input.readVarint();
-if (valueSize < 0) {
-throw new IllegalArgumentException();
+private void checkIfClosed() {

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610955955



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}

Review comment:
   Yeah, got it. I thought about it some more. This is currently only used 
for the metadata partitions. If the controller or broker cannot read the 
metadata partition the

[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610952440



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -418,48 +414,49 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch 
snapshotId) {
 public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {}
 
 @Override
-public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) {
-if (logStartOffset() > logStartSnapshotId.offset ||
-highWatermark.offset < logStartSnapshotId.offset) {
+public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {
+if (logStartOffset() > snapshotId.offset ||
+highWatermark.offset < snapshotId.offset) {
 
 throw new OffsetOutOfRangeException(
 String.format(
 "New log start (%s) is less than start offset (%s) or is 
greater than the high watermark (%s)",

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610951347



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}
+} else {
+buffer = bufferSupplier.get(Math.min(maxBatchSize, 
records.sizeInBytes()));
+allocatedBuffer = Optional.of(buffer);
+

[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610949543



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {

Review comment:
   By the way Kafka already has 
`org.apache.kafka.common.record.RecordBatchIterator`.
   
   To me, the name of the iterator should represent what it iterates over not 
what it generates. In this case it is iterating over `Records` (`MemoryRecords` 
and `FileRecords`). When we implement @mumrah's suggestion of having separate 
implements for each, we will have `MemoryRecordsIterator` and 
`FileRecordsIterator`.
   
   I'll change the name to `RecordsIterator`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610944539



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -68,20 +70,65 @@ public synchronized void increment() {
 @Override
 public synchronized void handleCommit(BatchReader reader) {
 try {
-int initialValue = this.committed;
+int initialValue = committed;
 while (reader.hasNext()) {
 BatchReader.Batch batch = reader.next();
 log.debug("Handle commit of batch with records {} at base 
offset {}",
 batch.records(), batch.baseOffset());
 for (Integer value : batch.records()) {
-if (value != this.committed + 1) {
-throw new AssertionError("Expected next committed 
value to be " +
-(this.committed + 1) + ", but instead found " + 
value + " on node " + nodeId);
+if (value != committed + 1) {
+throw new AssertionError(
+String.format(
+"Expected next committed value to be %s, but 
instead found %s on node %s",
+committed + 1,
+value,
+nodeId
+)
+);
 }
-this.committed = value;
+committed = value;
 }
+
+nextReadOffset = batch.lastOffset() + 1;
+readEpoch = batch.epoch();
 }
 log.debug("Counter incremented from {} to {}", initialValue, 
committed);
+
+if (lastSnapshotEndOffset + 10 < nextReadOffset) {

Review comment:
   Hmm. We should only generate snapshots at batch boundaries. There is no 
guarantee that the `lastOffset` of a batch is a multiple of some number since 
batches can any number of records.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610942230



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -68,20 +70,65 @@ public synchronized void increment() {
 @Override
 public synchronized void handleCommit(BatchReader reader) {
 try {
-int initialValue = this.committed;
+int initialValue = committed;
 while (reader.hasNext()) {
 BatchReader.Batch batch = reader.next();
 log.debug("Handle commit of batch with records {} at base 
offset {}",
 batch.records(), batch.baseOffset());
 for (Integer value : batch.records()) {
-if (value != this.committed + 1) {
-throw new AssertionError("Expected next committed 
value to be " +
-(this.committed + 1) + ", but instead found " + 
value + " on node " + nodeId);
+if (value != committed + 1) {
+throw new AssertionError(
+String.format(
+"Expected next committed value to be %s, but 
instead found %s on node %s",
+committed + 1,
+value,
+nodeId
+)
+);
 }
-this.committed = value;
+committed = value;
 }
+
+nextReadOffset = batch.lastOffset() + 1;

Review comment:
   Yep. Not sure what I was thinking when I added those fields.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610939585



##
File path: raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java
##
@@ -37,17 +36,21 @@
  *
  * @throws IOException for any IO error while reading the size

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610939450



##
File path: 
raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
##
@@ -124,6 +125,11 @@ public void handleCommit(BatchReader 
reader) {
 }
 }
 
+@Override
+public void handleSnapshot(SnapshotReader 
reader) {
+reader.close();

Review comment:
   In the future yes. Simply ignore the snapshot is okay for now for the 
following reasons:
   1. We are getting rid of this shim in 
https://github.com/apache/kafka/pull/10497
   2. None of the kraft listeners generate snapshots so this should never be 
called
   3. We will implement snapshot loading for the controller and broker as part 
of this jira: https://issues.apache.org/jira/browse/KAFKA-12466




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610934045



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}
+} else {
+buffer = bufferSupplier.get(Math.min(maxBatchSize, 
records.sizeInBytes()));
+allocatedBuffer = Optional.of(buffer);
+

[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r61098



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {

Review comment:
   I think most of the code will be the same but I can try to have to 
different implementations. Do you mind if I do that in a future PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-09 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r610933083



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {

Review comment:
   Yes. Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-06 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r608180796



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}

Review comment:
   To get this new size we need to read the size from the batch record 
using the `Readable` for example. I don't know if we want to trust this value 
for memory allocation

[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-06 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r608180796



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}

Review comment:
   To get this new size we need to ready the size from the batch record 
using the `Readable` for example. I don't know if we want to trust this value 
for memory allocatio

[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-06 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r608006894



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}

Review comment:
   @hachikuji @mumrah I am not super happy with this handling. Specially 
since it is possible for us to write a batch greater than `maxBatchSize` based 
on how `BatchAccum

[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-06 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r607999624



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}
+} else {
+buffer = bufferSupplier.get(Math.min(maxBatchSize, 
records.sizeInBytes()));
+allocatedBuffer = Optional.of(buffer);
+

[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-03-31 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r605167349



##
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##
@@ -60,19 +61,21 @@
 @Override
 void close();
 
-class Batch {
+final class Batch implements Iterable {

Review comment:
   @hachikuji given that we use this type in `SnapshotReader` I am okay 
moving this type to something like `o.a.k.r.Batch` instead of 
`o.a.k.r.BatchReader.Batch`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-03-24 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r600926563



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -87,6 +96,25 @@ public synchronized void handleCommit(BatchReader 
reader) {
 }
 }
 
+@Override
+public synchronized void handleSnapshot(SnapshotReader reader) {
+try {
+try (SnapshotReader snapshot = reader) {
+log.debug("Loading snapshot {}", snapshot.snapshotId());
+for (List batch : snapshot) {
+for (Integer value : batch) {
+log.debug("Setting value: {}", value);
+this.committed = value;
+this.uncommitted = value;
+}
+}
+log.debug("Finished loading snapshot. Set value: {}", 
this.committed);
+}
+} catch (IOException e) {

Review comment:
   I also changed a few places that were throwing `IOException`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-03-24 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r600926104



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SnapshotReader implements Closeable, Iterable> {

Review comment:
   Implemented a `SerdeRecordsIterator` that both `BatchReader` and 
`SnapshotReader` use internally. The biggest change here was to take into 
account large `FileRecords`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-03-24 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r600925523



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -87,6 +96,25 @@ public synchronized void handleCommit(BatchReader 
reader) {
 }
 }
 
+@Override
+public synchronized void handleSnapshot(SnapshotReader reader) {
+try {
+try (SnapshotReader snapshot = reader) {
+log.debug("Loading snapshot {}", snapshot.snapshotId());
+for (List batch : snapshot) {

Review comment:
   Yes. I missed this. `ReplicatedCounter` now generates a snapshot after 
more than 10 records. Fix the existing simulation tests to take snapshot into 
account.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-03-24 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r600789602



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -87,6 +96,25 @@ public synchronized void handleCommit(BatchReader 
reader) {
 }
 }
 
+@Override
+public synchronized void handleSnapshot(SnapshotReader reader) {
+try {
+try (SnapshotReader snapshot = reader) {
+log.debug("Loading snapshot {}", snapshot.snapshotId());
+for (List batch : snapshot) {
+for (Integer value : batch) {
+log.debug("Setting value: {}", value);
+this.committed = value;
+this.uncommitted = value;
+}
+}
+log.debug("Finished loading snapshot. Set value: {}", 
this.committed);
+}
+} catch (IOException e) {

Review comment:
   This is because `SnapshotReader::close` was declare as throwing an 
`IOException`.
   
   This made the API for `SnapshotReader` confusing as `hasNext` and `next` 
don't throw an `IOException` even though they read from disk in some cases. I 
fixed this by changing `SnapshotReader` to implement `AutoCloseable` instead of 
`Closeable`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-03-24 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r600813565



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -418,48 +414,49 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch 
snapshotId) {
 public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {}
 
 @Override
-public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) {
-if (logStartOffset() > logStartSnapshotId.offset ||
-highWatermark.offset < logStartSnapshotId.offset) {
+public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {

Review comment:
   Note that the changes to this method are to relax the log start offset 
and high-watermark invariant so that we can create more interesting snapshot 
and log states in the `RaftClientTestContext.Builder`.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1300,18 +1330,18 @@ private FetchSnapshotResponseData 
handleFetchSnapshotRequest(
 
 int maxSnapshotSize;
 try {
-maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes());
+maxSnapshotSize = Math.toIntExact(snapshotSize);
 } catch (ArithmeticException e) {
 maxSnapshotSize = Integer.MAX_VALUE;
 }
 
 if (partitionSnapshot.position() > Integer.MAX_VALUE) {
+// TODO: This should return an error response instead of 
throwing an exception
 throw new IllegalStateException(String.format("Trying to fetch 
a snapshot with position: %d lager than Int.MaxValue", 
partitionSnapshot.position()));
 }
 
-UnalignedRecords records = 
snapshot.read(partitionSnapshot.position(), Math.min(data.maxBytes(), 
maxSnapshotSize));
-
-long snapshotSize = snapshot.sizeInBytes();
+// TODO: I think this slice of records is closed when the snapshot 
is close in the try (...) above.
+UnalignedRecords records = 
snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), 
maxSnapshotSize));

Review comment:
   When the implementation is a `FileRawSnapshotReader`, the created slice 
will be `close` before the network client has had a chance to send the bytes. 
Created https://issues.apache.org/jira/browse/KAFKA-12543 and I will work on 
this after this PR.

##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -355,33 +354,30 @@ public LogFetchInfo read(long startOffset, Isolation 
isolation) {
 }
 
 ByteBuffer buffer = ByteBuffer.allocate(512);
-LogEntry firstEntry = null;
+LogOffsetMetadata batchStartOffset = null;

Review comment:
   Note that the changes to this method are so that `read`  doesn't return 
all of the batches from from `startOffset` to `highWatermark`. This was needed 
for more interested test cases around snapshot loading.

##
File path: 
raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
##
@@ -124,6 +126,15 @@ public void handleCommit(BatchReader 
reader) {
 }
 }
 
+@Override
+public void handleSnapshot(SnapshotReader 
reader) {
+// TODO: Create Jira: Handle loading commit in ListenerShim

Review comment:
   Okay. I'll remove the TODO. Do you have a Jira for this? If not let me 
know and I can create one.

##
File path: shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
##
@@ -105,6 +106,22 @@ public void handleCommits(long lastOffset, 
List messages) {
 }, null);
 }
 
+@Override
+public void handleSnapshot(SnapshotReader 
reader) {
+// TODO: Create Jira: Need to cover the case where handle snapshot 
invalidates previous commits
+//Need to handle that reader.snapshotId() 
means that every record up to that offset is committed

Review comment:
   Created this Jira: https://issues.apache.org/jira/browse/KAFKA-12545

##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##
@@ -87,6 +96,25 @@ public synchronized void handleCommit(BatchReader 
reader) {
 }
 }
 
+@Override
+public synchronized void handleSnapshot(SnapshotReader reader) {
+try {
+try (SnapshotReader snapshot = reader) {
+log.debug("Loading snapshot {}", snapshot.snapshotId());
+for (List batch : snapshot) {
+for (Integer value : batch) {
+log.debug("Setting value: {}", value);
+this.committed = value;
+this.uncommitted = value;
+}
+}
+log.debug("Finished loading snapshot. Set value: {}", 
this.committed);
+}
+} catch (IOException e

[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-03-18 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r596996121



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SnapshotReader implements Closeable, Iterable> {

Review comment:
   Fair enough. Let me revisit this and see what code we can reuse.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-03-17 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r596301164



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -311,8 +311,18 @@ private void updateListenersProgress(long highWatermark) {
 private void updateListenersProgress(List 
listenerContexts, long highWatermark) {
 for (ListenerContext listenerContext : listenerContexts) {
 listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset 
-> {
-if (nextExpectedOffset < log.startOffset()) {
-listenerContext.fireHandleSnapshot(log.startOffset());
+if (nextExpectedOffset < log.startOffset() && 
nextExpectedOffset < highWatermark) {
+SnapshotReader snapshot = 
earliestSnapshot().orElseThrow(() -> {

Review comment:
   No good reason. I'll change it to use the latest.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-02-18 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r578522996



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -321,6 +335,23 @@ private void updateListenersProgress(List 
listenerContexts, lon
 }
 }
 
+private Optional> oldestSnapshot() {
+if (log.oldestSnapshotId().isPresent()) {

Review comment:
   Thanks. Fixed!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org