[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-03 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r535482262



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -74,18 +74,19 @@ public void freeze() throws IOException {
 frozen = true;
 
 // Set readonly and ignore the result
-if (!path.toFile().setReadOnly()) {
-throw new IOException(String.format("Unable to set file %s as 
read-only", path));
+if (!tempSnapshotPath.toFile().setReadOnly()) {
+throw new IOException(String.format("Unable to set file (%s) as 
read-only", tempSnapshotPath));
 }
 
-Path destination = Snapshots.moveRename(path, snapshotId);
-Files.move(path, destination, StandardCopyOption.ATOMIC_MOVE);
+Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
+Files.move(tempSnapshotPath, destination, 
StandardCopyOption.ATOMIC_MOVE);
 }
 
 @Override
 public void close() throws IOException {
 channel.close();
-Files.deleteIfExists(path);
+// This is a noop if freeze was called before calling close
+Files.deleteIfExists(tempSnapshotPath);

Review comment:
   Yes!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-03 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r535474040



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+private final Path tempSnapshotPath;
+private final FileChannel channel;
+private final OffsetAndEpoch snapshotId;
+private boolean frozen = false;
+
+private FileRawSnapshotWriter(
+Path tempSnapshotPath,
+FileChannel channel,
+OffsetAndEpoch snapshotId
+) {
+this.tempSnapshotPath = tempSnapshotPath;
+this.channel = channel;
+this.snapshotId = snapshotId;
+}
+
+@Override
+public OffsetAndEpoch snapshotId() {
+return snapshotId;
+}
+
+@Override
+public long sizeInBytes() throws IOException {
+return channel.size();
+}
+
+@Override
+public void append(ByteBuffer buffer) throws IOException {
+if (frozen) {
+throw new IllegalStateException(
+String.format("Append not supported. Snapshot is already 
frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+);
+}
+
+Utils.writeFully(channel, buffer);
+}
+
+@Override
+public boolean isFrozen() {
+return frozen;
+}
+
+@Override
+public void freeze() throws IOException {
+channel.close();
+frozen = true;
+
+// Set readonly and ignore the result
+if (!tempSnapshotPath.toFile().setReadOnly()) {
+throw new IOException(String.format("Unable to set file (%s) as 
read-only", tempSnapshotPath));
+}
+
+Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
+Files.move(tempSnapshotPath, destination, 
StandardCopyOption.ATOMIC_MOVE);

Review comment:
   I investigated the issue a bit. It look like this utility was introduce 
to replace these lines in `OffsetCheckpoint.scala`:
   ```scala
   // swap new offset checkpoint file with previous one
   if(!temp.renameTo(file)) {
   // renameTo() fails on Windows if the destination file exists.
   file.delete()
   if(!temp.renameTo(file))
   throw new IOException(...)
   }
   ```
   
   Looking at the JDK implementation in Windows, `ATOMIC_MOVE` should work in 
Windows if the target file exists: 
https://github.com/openjdk/jdk/blob/master/src/java.base/windows/classes/sun/nio/fs/WindowsFileCopy.java#L298-L312
   
   In other words, I think we can keep the code as is in this PR.
   
   cc @ijuma 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-02 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534592624



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -100,4 +102,15 @@ default void handleResign() {}
  */
 CompletableFuture shutdown(int timeoutMs);
 
+/**
+ * Create a writable snapshot file for a given offset and epoch.
+ *
+ * The RaftClient assumes that the snapshot return will contain the 
records up to but
+ * not including the end offset in the snapshot id. See {@link 
SnapshotWriter} for
+ * details on how to use this object.
+ *
+ * @param snapshotId the end offset and epoch that identifies the snapshot

Review comment:
   Create https://issues.apache.org/jira/browse/KAFKA-10800





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-02 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534587344



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+private static final String SNAPSHOT_DIR = "snapshots";

Review comment:
   Done. Removed the `snapshots` directory.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-02 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534581287



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -472,4 +489,110 @@ private EpochStartOffset(int epoch, long startOffset) {
 }
 }
 
+final class MockRawSnapshotWriter implements RawSnapshotWriter {
+private final OffsetAndEpoch snapshotId;
+private ByteBuffer data;

Review comment:
   Cool. Yeah, let's use that. So many util goodies.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-02 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534577659



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+private final Path path;
+private final FileChannel channel;
+private final OffsetAndEpoch snapshotId;
+private boolean frozen = false;
+
+private FileRawSnapshotWriter(
+Path path,
+FileChannel channel,
+OffsetAndEpoch snapshotId
+) {
+this.path = path;
+this.channel = channel;
+this.snapshotId = snapshotId;
+}
+
+@Override
+public OffsetAndEpoch snapshotId() {
+return snapshotId;
+}
+
+@Override
+public long sizeInBytes() throws IOException {
+return channel.size();
+}
+
+@Override
+public void append(ByteBuffer buffer) throws IOException {
+if (frozen) {
+throw new IllegalStateException(
+String.format("Append not supported. Snapshot is already 
frozen: id = %s; path = %s", snapshotId, path)
+);
+}
+
+Utils.writeFully(channel, buffer);
+}
+
+@Override
+public boolean isFrozen() {
+return frozen;
+}
+
+@Override
+public void freeze() throws IOException {
+channel.close();
+frozen = true;
+
+// Set readonly and ignore the result
+if (!path.toFile().setReadOnly()) {
+throw new IOException(String.format("Unable to set file %s as 
read-only", path));
+}
+
+Path destination = Snapshots.moveRename(path, snapshotId);
+Files.move(path, destination, StandardCopyOption.ATOMIC_MOVE);

Review comment:
   I prefer to keep it this way. The atomic move (rename) should always 
succeed because we guarantee it is within the same directory. I think I prefer 
for Kafka to throw an exception than for the raft client or state machine to 
see a partial snapshot file because it performed a file copy instead of a file 
rename.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-02 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534572296



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Iterator;
+import 
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotReader implements RawSnapshotReader {
+private final FileRecords fileRecords;
+private final OffsetAndEpoch snapshotId;
+
+private FileRawSnapshotReader(FileRecords fileRecords, OffsetAndEpoch 
snapshotId) {
+this.fileRecords = fileRecords;
+this.snapshotId = snapshotId;
+}
+
+@Override
+public OffsetAndEpoch snapshotId() {
+return snapshotId;
+}
+
+@Override
+public long sizeInBytes() {
+return fileRecords.sizeInBytes();
+}
+
+@Override
+public Iterator iterator() {
+return new Iterator() {

Review comment:
   I am not sure if we can do that. The interface `RawSnapshotReader` 
extends `Iterable`. `Iterable` defines that method as `Iterator 
iterator()`. As you point out, they should have defined it as `Iterator  iterator()` since `Iterator` is covariant.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-02 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534489751



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -100,4 +102,15 @@ default void handleResign() {}
  */
 CompletableFuture shutdown(int timeoutMs);
 
+/**
+ * Create a writable snapshot file for a given offset and epoch.
+ *
+ * The RaftClient assumes that the snapshot return will contain the 
records up to but
+ * not including the end offset in the snapshot id. See {@link 
SnapshotWriter} for
+ * details on how to use this object.
+ *
+ * @param snapshotId the end offset and epoch that identifies the snapshot

Review comment:
   Yeah. I'll create a Jira to validate the snapshot id. We should perform 
at least the following validations:
   1. it is less than the high-watermark.
   2. it is a valid epoch and end offset based on the log. I think we can use 
the leader epoch cache to check this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-01 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533835857



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##
@@ -149,6 +152,29 @@ default OptionalLong truncateToEndOffset(OffsetAndEpoch 
endOffset) {
 return OptionalLong.of(truncationOffset);
 }
 
+/**
+ * Create a writable snapshot for the given snapshot id.
+ *
+ * See {@link RawSnapshotWriter} for details on how to use this object.
+ *
+ * @param snapshotId the end offset and epoch that identifies the snapshot
+ * @return a writable snapshot
+ */
+RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws 
IOException;

Review comment:
   Yeah. I think we will need that when we implement deleting snapshot. Do 
you mind if I add this later?
   
   Also, I think we are going to need a `readLatestSnapshot()` when the state 
machine (controller or broker) needs to load the latest valid snapshot. I was 
planning to add this later when the case was clear to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-01 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533833428



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+private static final String SNAPSHOT_DIR = "snapshots";

Review comment:
   I think this depends on if we need to scan the snapshot directory. 
Unfortunately, I don't have a concrete answer at the moment. When we implement 
the changes to the rest of the raft client. Log truncation, updating the start 
offset and LEO, we may need to scan the snapshot/checkpoint folder to determine 
the greatest log start offset and LEO. @lbradstreet suggested storing them in a 
different directory as part of the KIP-630 review process as Kafka already have 
a few files in the partition log directory.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-01 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533829583



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+private static final String SNAPSHOT_DIR = "snapshots";
+private static final String SUFFIX =  ".snapshot";

Review comment:
   Good catch. I mentioned using `.checkpoint` in KIP-630. I forgot to 
change it here. I'll change it to that but let me know if you have a preference.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-01 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533828329



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -100,4 +102,15 @@ default void handleResign() {}
  */
 CompletableFuture shutdown(int timeoutMs);
 
+/**
+ * Create a writable snapshot file for a given offset and epoch.
+ *
+ * The RaftClient assumes that the snapshot return will contain the 
records up to but
+ * not including the end offset in the snapshot id. See {@link 
SnapshotWriter} for
+ * details on how to use this object.
+ *
+ * @param snapshotId the end offset and epoch that identifies the snapshot

Review comment:
   Is there a specific reason why you are asking this? We don't currently 
check for this. I will add a check for this and we can relax this later if we 
need to.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-07 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r519213671



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable {

Review comment:
   @hachikuji and I discussed this offline. The latest PR should reflect 
our discussion.
   
   The interfaces `RawSnapshotWriter` and `RawSnapshotReader` are expected to 
change after we address https://issues.apache.org/jira/browse/KAFKA-10694





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-07 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r519211721



##
File path: raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+/**
+ * Interface for reading snapshots as a sequence of records.
+ */
+public interface RawSnapshotReader extends Closeable, Iterable {
+/**
+ * Returns the end offset and epoch for the snapshot.
+ */
+public OffsetAndEpoch snapshotId();
+
+/**
+ * Returns the number of bytes for the snapshot.
+ *
+ * @throws IOException for any IO error while reading the size
+ */
+public long sizeInBytes() throws IOException;
+
+/**
+ * Reads bytes from position into the given buffer.
+ *
+ * It is not guarantee that the given buffer will be filled.
+ *
+ * @param buffer byte buffer to put the read files
+ * @param position the starting position in the snapshot to read
+ * @return the number of bytes read
+ * @throws IOException for any IO error while reading the snapshot
+ */
+public int read(ByteBuffer buffer, long position) throws IOException;

Review comment:
   As @hachikuji and I discussed offline, it is very like that this method 
will change to:
   
   ```java
   public BaseRecords slice(long position) throws IOException;
   ```
   
   After we implement https://issues.apache.org/jira/browse/KAFKA-10694





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-07 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r519211586



##
File path: raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotWriter.java
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+/**
+ * Interface for writing snapshot as a sequence of records.
+ */
+public interface RawSnapshotWriter extends Closeable {
+/**
+ * Returns the end offset and epoch for the snapshot.
+ */
+public OffsetAndEpoch snapshotId();
+
+/**
+ * Returns the number of bytes for the snapshot.
+ *
+ * @throws IOException for any IO error while reading the size
+ */
+public long sizeInBytes() throws IOException;
+
+/**
+ * Fully appends the buffer to the snapshot.
+ *
+ * If the method returns without an exception the given buffer was fully 
writing the
+ * snapshot.
+ *
+ * @param buffer the buffer to append
+ * @throws IOException for any IO error during append
+ */
+public void append(ByteBuffer buffer) throws IOException;

Review comment:
   As @hachikuji and I discussed offline, it is very like that this method 
will change to:
   
   ```java
   public void append(MemoryRecords records) throws IOException;
   ```
   
   After we implement https://issues.apache.org/jira/browse/KAFKA-10694





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-07 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r519211390



##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -32,25 +32,25 @@
  * This class is responsible for managing the current state of this node and 
ensuring only
  * valid state transitions.
  *
- * Unattached =>
+ * Unattached transitions to:

Review comment:
   Changes to this file are unrelated to this PR. Made them here so that 
`docsJar` would succeed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-06 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r518916392



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable {

Review comment:
   > Perhaps it would be possible to extend Records to provide what we need 
instead of creating a new interface?
   
   I think you are right that we don't need `SnapshotReader` as `Records` 
provides all of the functionality we need. `SnapshotReader` was added so that 
we didn't expose all of the mutating APIs in `FileRecords`. Snapshot are 
supposed to be immutable once frozen.
   
   I think we still want `SnapshotWriter` or something similar as it provides 
1. raw writes of bytes and 2. `freeze` which optionally marks the snapshot as 
immutable if the validation passes.
   
   For 1., I don't think we should add raw writes to `Records` or `FileRecords` 
as in essence we are exposing an unsafe API and the user needs to make sure 
that they are writing the correct data.
   
   For 2., I think we can get away from introducing a new type/interface and 
instead add that functionality as a static method in `Snapshots.java`. Unittest 
(mock tests) maybe difficult with this code organization.
   
   > As far as the naming, I wonder if we can reserve the nicer SnapshotXXX 
names for the state machine.
   
   Yes.I'll use the prettier name for the type exposed to the state machine.
   
   > Really what I would like is a common type that can be used by both 
handleSnapshot and handleCommit since both callbacks just need to provide a way 
to read through a set of records. I was thinking it could be BatchReader, but 
it doesn't have to be if there is something better.
   
   I think this should be possible. I haven't implemented this part so I don't 
have all of the details. I think the requirement for the type sent through 
`handleSnapshot` are a subset of the requirements for the type sent through 
`handleCommit`. In particular, the Raft Client (`ListenerContext`) only needs 
to know when the snapshot has been read fully (e.g. `close`). The Raft Client 
doesn't need so to know what is the "last seen offset + 1" as it already knows 
the `SnapshotId` sent through `handleSnapshot`.
   
   > (By the way, it's not too clear to me why we need freeze when we already 
have close.)
   
   I wanted to allow the user to abort a snapshot. If `close` is called without 
calling `freeze` then the partial snapshot is deleted and not made immutable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-05 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r518396177



##
File path: core/src/main/scala/kafka/snapshot/KafkaSnapshotReader.scala
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.snapshot
+
+import java.nio.ByteBuffer
+import java.nio.file.Path
+import java.util.{Iterator => JIterator}
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.record.FileRecords
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.snapshot.SnapshotReader
+
+final class KafkaSnapshotReader private (fileRecords: FileRecords, snapshotId: 
OffsetAndEpoch) extends SnapshotReader {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-04 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517703078



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable {

Review comment:
   This comment applies to some of your other observations. At high-level 
there are 4 use cases that we need to design and implement. Two use cases are 
for the Raft implementation. Two use cases are for the state machine.
   
   ### Raft implementation
   These types are internal to the raft implementation and don't have to be 
exposed to the state machine.
    Leader Use Case
   The leader needs to be able to send a part of the snapshot over the network. 
Something like this
   ```java
   interface SnapshotReader extends Closeable {
   long transferTo(long position, long maxBytes, WritableChannel channel);
   int read(ByteBuffer buffer, long position);
   }
   ```
    Follower Use Case
   The followers need to be able to copy bytes from the network and validate 
the snapshot on disk when fetching is done.
   ```java
   interface SnapshotWriter extends Closeable {
   void append(ByteBuffer buffer);
   void validate();
   void freeze();
   }
   ```
   ### State machine implementation
   These types are exposed to the state machine.
    Load Snapshot
   The state machine needs to be able to load/scan the entire snapshot. The 
state machine can use `close` to tell the raft client that it finished loading 
the snapshot. This will be implemented in a future PR but it could look like 
this:
   ```java
   interface BatchedSnapshotReader extends Iterable>, Closeable {
   }
   ```
    Generate Snapshot
   The state machine needs to be able to generate a snapshot by appending 
records/values and marking the snapshot as immutable (`freeze`) when it is done.
   ```java
   interface BatchdSnapshotWriter extends Closeable {
  void append(Iterable records);
  void freeze();
   }
   ```
   
   ### Notes
   
   `SnapshotWriter` and `SnapshotReader` need to be interfaces because we will 
have a real implementation and a mocked implementation for testing. These two 
types are internal to raft and are not exposed to the state machine.
   
   `BatchedSnapshotReader` and `BatchedSnapshotWriter` can depend on 
`SnapshotWriter` and `SnapshotReade` to reuse some code but this is not 
strictly required. These two type don't have to be `interfaces` if they 
delegate the IO to `SnapshotWriter` and `SnapshotReader`.
   
   What do you think @hachikuji?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-04 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517703953



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/BatchedSnapshotWriter.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+
+// TODO: Write documentation for this type and all of the methods
+final public class BatchedSnapshotWriter implements Closeable {

Review comment:
   I cover some of the motivation for this here: 
https://github.com/apache/kafka/pull/9512#discussion_r517703078.
   
   Let's move the conversation there if you agree.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-04 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517703625



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable {
+
+public OffsetAndEpoch snapshotId();
+
+public long sizeInBytes();
+
+public Iterator iterator();
+
+public int read(ByteBuffer buffer, long position) throws IOException;

Review comment:
   I think I address this in this comment 
https://github.com/apache/kafka/pull/9512#discussion_r517703078. If you agree 
let's move the conversation there.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-04 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517703078



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable {

Review comment:
   This comment applies to some of your other observations. At high-level 
there are 4 use cases that we need to design and implement. Two use cases are 
for the Raft implementation. Two use cases are for the state machine.
   
   ### Raft implementation
   These types are internal to the raft implementation and don't have to be 
exposed to the state machine.
    Leader Use Case
   The leader needs to be able to send a part of the snapshot over the network. 
Something like this
   ```java
   interface SnapshotReader extends Closeable {
   long transferTo(long position, long maxBytes, WritableChannel channel);
   int read(ByteBuffer buffer, long position);
   }
   ```
    Follower Use Case
   The followers need to be able to copy bytes from the network and validate 
the snapshot on disk when fetching is done.
   ```java
   interface SnapshotWriter extends Closeable {
   void append(ByteBuffer buffer);
   void validate();
   void freeze();
   }
   ```
   ### State machine implementation
   These types are exposed to the state machine.
    Load Snapshot
   The state machine needs to be able to load/scan the entire snapshot. The 
state machine can use `close` to tell the raft client that it finished loading 
the snapshot. This will be implemented in a future PR but it could look like 
this:
   ```java
   interface BatchedSnapshotReader extends Iterable>, Closeable {
   }
   ```
    Generate Snapshot
   The state machine needs to be able to generate a snapshot by appending 
records/values and marking the snapshot as immutable (`freeze`) when it is done.
   ```java
   interface BatchdSnapshotWriter extends Closeable {
  void append(Iterable records);
  void freeze();
   }
   ```
   
   `SnapshotWriter` and `SnapshotReader` need to be interfaces because we will 
have a real implementation and a mocked implementation for testing. These two 
types are internal to raft and are not exposed to the state machine.
   
   `BatchedSnapshotReader` and `BatchedSnapshotWriter` can depend on 
`SnapshotWriter` and `SnapshotReade` to reuse some code but this is not 
strictly required. These two type don't have to be `interfaces` if they 
delegate the IO to `SnapshotWriter` and `SnapshotReader`.
   
   What do you think @hachikuji?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-04 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r517672247



##
File path: core/src/main/scala/kafka/snapshot/KafkaSnapshotReader.scala
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.snapshot
+
+import java.nio.ByteBuffer
+import java.nio.file.Path
+import java.util.{Iterator => JIterator}
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.record.FileRecords
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.snapshot.SnapshotReader
+
+final class KafkaSnapshotReader private (fileRecords: FileRecords, snapshotId: 
OffsetAndEpoch) extends SnapshotReader {

Review comment:
   Sounds good. I'll move this code over and implement it in Java.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org