[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-07 Thread GitBox


hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r537757487



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -472,4 +490,106 @@ private EpochStartOffset(int epoch, long startOffset) {
 }
 }
 
+final class MockRawSnapshotWriter implements RawSnapshotWriter {
+private final OffsetAndEpoch snapshotId;
+private ByteBufferOutputStream data;
+private boolean frozen;
+
+public MockRawSnapshotWriter(OffsetAndEpoch snapshotId) {
+this.snapshotId = snapshotId;
+this.data = new ByteBufferOutputStream(0);
+this.frozen = false;
+}
+
+@Override
+public OffsetAndEpoch snapshotId() {
+return snapshotId;
+}
+
+@Override
+public long sizeInBytes() {
+return data.position();
+}
+
+@Override
+public void append(ByteBuffer buffer) {
+if (frozen) {
+throw new RuntimeException("Snapshot is already frozen " + 
snapshotId);
+}
+
+data.write(buffer);
+}
+
+@Override
+public boolean isFrozen() {
+return frozen;
+}
+
+@Override
+public void freeze() {
+if (frozen) {
+throw new RuntimeException("Snapshot is already frozen " + 
snapshotId);
+}
+
+frozen = true;
+ByteBuffer buffer = data.buffer();
+buffer.flip();
+
+snapshots.putIfAbsent(snapshotId, new 
MockRawSnapshotReader(snapshotId, buffer));
+}
+
+@Override
+public void close() {}
+}
+
+final static class MockRawSnapshotReader implements RawSnapshotReader {
+private final OffsetAndEpoch snapshotId;
+private final MemoryRecords data;
+
+MockRawSnapshotReader(OffsetAndEpoch snapshotId, ByteBuffer data) {
+this.snapshotId = snapshotId;
+this.data = MemoryRecords.readableRecords(data);
+}
+
+@Override
+public OffsetAndEpoch snapshotId() {
+return snapshotId;
+}
+
+@Override
+public long sizeInBytes() {
+return data.sizeInBytes();
+}
+
+@Override
+public Iterator iterator() {
+return new Iterator() {

Review comment:
   nit: we cab use `covariantCast`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-02 Thread GitBox


hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534607264



##
File path: raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.stream.IntStream;
+import org.apache.kafka.common.record.BufferSupplier.GrowableBufferSupplier;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public final class FileRawSnapshotTest {
+@Test
+public void testWritingSnapshot() throws IOException {
+Path tempDir = TestUtils.tempDirectory().toPath();
+OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
+int bufferSize = 256;
+int batches = 10;
+int expectedSize = 0;
+
+try (FileRawSnapshotWriter snapshot = 
FileRawSnapshotWriter.create(tempDir, offsetAndEpoch)) {
+assertEquals(0, snapshot.sizeInBytes());
+
+MemoryRecords records = buildRecords(new ByteBuffer[] 
{ByteBuffer.wrap(randomBytes(bufferSize))});

Review comment:
   nit: can just do `buildRecords(ByteBuffer.wrap(randomBytes(bufferSize)))`

##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+private final Path tempSnapshotPath;
+private final FileChannel channel;
+private final OffsetAndEpoch snapshotId;
+private boolean frozen = false;
+
+private FileRawSnapshotWriter(
+Path tempSnapshotPath,
+FileChannel channel,
+OffsetAndEpoch snapshotId
+) {
+this.tempSnapshotPath = tempSnapshotPath;
+this.channel = channel;
+this.snapshotId = snapshotId;
+}
+
+@Override
+public OffsetAndEpoch snapshotId() {
+return snapshotId;
+}
+
+@Override
+public long sizeInBytes() throws IOException {
+return channel.size();
+}
+
+@Override
+public void append(ByteBuffer buffer) throws IOException {
+if (frozen) {
+throw new IllegalStateException(
+String.format("Append not supported. Snapshot is already 
frozen: id = %s; temp path =

[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-02 Thread GitBox


hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534487681



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+private static final String SNAPSHOT_DIR = "snapshots";

Review comment:
   My preference is probably to keep things flat for now until we figure 
out what we want the long-term structure to look like. I guess it comes down to 
the implementation, but intuitively, the only time we'd need to scan would be 
on startup and we have to do that anyway.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-02 Thread GitBox


hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534486002



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+private static final String SNAPSHOT_DIR = "snapshots";

Review comment:
   Not sure if the snapshots will have much of an impact on this. It's not 
like we'll be storing more than a couple of them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-02 Thread GitBox


hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534485293



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+private static final String SNAPSHOT_DIR = "snapshots";
+private static final String SUFFIX =  ".snapshot";

Review comment:
   Sounds fine to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-02 Thread GitBox


hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r534485088



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -100,4 +102,15 @@ default void handleResign() {}
  */
 CompletableFuture shutdown(int timeoutMs);
 
+/**
+ * Create a writable snapshot file for a given offset and epoch.
+ *
+ * The RaftClient assumes that the snapshot return will contain the 
records up to but
+ * not including the end offset in the snapshot id. See {@link 
SnapshotWriter} for
+ * details on how to use this object.
+ *
+ * @param snapshotId the end offset and epoch that identifies the snapshot

Review comment:
   Just curious. Thought it might be worth mentioning the expectation in 
the javadoc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-01 Thread GitBox


hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533732789



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+private static final String SNAPSHOT_DIR = "snapshots";
+private static final String SUFFIX =  ".snapshot";

Review comment:
   This suffix is used for producer state snapshots already. Maybe we could 
use `.snap` or something like that.

##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##
@@ -149,6 +152,29 @@ default OptionalLong truncateToEndOffset(OffsetAndEpoch 
endOffset) {
 return OptionalLong.of(truncationOffset);
 }
 
+/**
+ * Create a writable snapshot for the given snapshot id.
+ *
+ * See {@link RawSnapshotWriter} for details on how to use this object.
+ *
+ * @param snapshotId the end offset and epoch that identifies the snapshot
+ * @return a writable snapshot
+ */
+RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws 
IOException;

Review comment:
   Do we also need an api to list snapshots?

##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+private final Path path;
+private final FileChannel channel;
+private final OffsetAndEpoch snapshotId;
+private boolean frozen = false;
+
+private FileRawSnapshotWriter(
+Path path,
+FileChannel channel,
+OffsetAndEpoch snapshotId
+) {
+this.path = path;
+this.channel = channel;
+this.snapshotId = snapshotId;
+}
+
+@Override
+public OffsetAndEpoch snapshotId() {
+return snapshotId;
+}
+
+@Override
+public long sizeInBytes() throws IOException {
+return channel.size();
+}
+
+@Override
+public void append(ByteBuffer buffer) throws IOException {
+if (frozen) {
+throw new IllegalStateException(
+String.format("Append not supported. Snapshot is already 
frozen: id = %s; path = %s", snapshotId, path)
+);
+}
+
+Utils.writeFully(channel, buffer);
+}
+
+@Override
+public boolean isFrozen() {
+return frozen;
+}
+
+@Override
+public void freeze() throws IOException {
+channel.close();
+frozen = true;
+
+// Set readonly and ignore the result
+if (!path.toFile().setReadOnly()) {
+throw new IOException(String.format("Unable to set file %s as 
read-only", path));
+}
+
+Path destination = Snapshots.moveRename(path,

[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-06 Thread GitBox


hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r518882604



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable {

Review comment:
   Ok, I think I see what you're saying. I guess I was not expecting that 
we would need a separate low-level interface when we already have `FileRecords` 
and `MemoryRecords`. I think `Records` already gives us a way to read from 
arbitrary positions:
   ```java
   long writeTo(GatheringByteChannel channel, long position, int length) 
throws IOException;
   ```
   But there is no `Records` implementation currently that allows unaligned 
writes. We only have this:
   ```java
   public int append(MemoryRecords records) throws IOException;
   ```
   Perhaps it would be possible to extend `Records` to provide what we need 
instead of creating a new interface?
   
   As far as the naming, I wonder if we can reserve the nicer `SnapshotXXX` 
names for the state machine. Really what I would like is a common type that can 
be used by both `handleSnapshot` and `handleCommit` since both callbacks just 
need to provide a way to read through a set of records. I was thinking it could 
be `BatchReader`, but it doesn't have to be if there is something better.
   
   (By the way, it's not too clear to me why we need `freeze` when we already 
have `close`.)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-11-03 Thread GitBox


hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r516989380



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable {
+
+public OffsetAndEpoch snapshotId();
+
+public long sizeInBytes();
+
+public Iterator iterator();
+
+public int read(ByteBuffer buffer, long position) throws IOException;

Review comment:
   So I guess we need this in order to be able to serve `FetchSnapshot` 
requests. I am wondering if it would be better to pull it into a separate 
object. Maybe we can have something like this:
   ```java
   class SnapshotReader extends BatchReader;
   
   class Snapshot extends Iterable {
 public SnapshotReader iterator();
 public int read(ByteBuffer buffer, long position) throws IOException;
   }
   ```

##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+// TODO: Write documentation for this type and all of the methods
+public interface SnapshotReader extends Closeable, Iterable {

Review comment:
   It would be nice if we can figure out how to consolidate this and 
`BatchReader`. It seems like it should be doable. 

##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/BatchedSnapshotWriter.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+
+// TODO: Write documentation for this type