hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533732789



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+    private static final String SNAPSHOT_DIR = "snapshots";
+    private static final String SUFFIX =  ".snapshot";

Review comment:
       This suffix is used for producer state snapshots already. Maybe we could 
use `.snap` or something like that.

##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##########
@@ -149,6 +152,29 @@ default OptionalLong truncateToEndOffset(OffsetAndEpoch 
endOffset) {
         return OptionalLong.of(truncationOffset);
     }
 
+    /**
+     * Create a writable snapshot for the given snapshot id.
+     *
+     * See {@link RawSnapshotWriter} for details on how to use this object.
+     *
+     * @param snapshotId the end offset and epoch that identifies the snapshot
+     * @return a writable snapshot
+     */
+    RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws 
IOException;

Review comment:
       Do we also need an api to list snapshots?

##########
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+    private final Path path;
+    private final FileChannel channel;
+    private final OffsetAndEpoch snapshotId;
+    private boolean frozen = false;
+
+    private FileRawSnapshotWriter(
+        Path path,
+        FileChannel channel,
+        OffsetAndEpoch snapshotId
+    ) {
+        this.path = path;
+        this.channel = channel;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long sizeInBytes() throws IOException {
+        return channel.size();
+    }
+
+    @Override
+    public void append(ByteBuffer buffer) throws IOException {
+        if (frozen) {
+            throw new IllegalStateException(
+                String.format("Append not supported. Snapshot is already 
frozen: id = %s; path = %s", snapshotId, path)
+            );
+        }
+
+        Utils.writeFully(channel, buffer);
+    }
+
+    @Override
+    public boolean isFrozen() {
+        return frozen;
+    }
+
+    @Override
+    public void freeze() throws IOException {
+        channel.close();
+        frozen = true;
+
+        // Set readonly and ignore the result
+        if (!path.toFile().setReadOnly()) {
+            throw new IOException(String.format("Unable to set file %s as 
read-only", path));
+        }
+
+        Path destination = Snapshots.moveRename(path, snapshotId);
+        Files.move(path, destination, StandardCopyOption.ATOMIC_MOVE);

Review comment:
       Wonder if we should consider using `Utils.atomicMoveWithFallback`?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+    private static final String SNAPSHOT_DIR = "snapshots";

Review comment:
       Is there a tangible benefit to separating snapshots into a new 
directory? Currently the log directory is a flat structure. I'm wondering if we 
should stick with convention.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -100,4 +102,15 @@ default void handleResign() {}
      */
     CompletableFuture<Void> shutdown(int timeoutMs);
 
+    /**
+     * Create a writable snapshot file for a given offset and epoch.
+     *
+     * The RaftClient assumes that the snapshot return will contain the 
records up to but
+     * not including the end offset in the snapshot id. See {@link 
SnapshotWriter} for
+     * details on how to use this object.
+     *
+     * @param snapshotId the end offset and epoch that identifies the snapshot

Review comment:
       Do we assume the offset is below the high watermark?

##########
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Iterator;
+import 
org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotReader implements RawSnapshotReader {
+    private final FileRecords fileRecords;
+    private final OffsetAndEpoch snapshotId;
+
+    private FileRawSnapshotReader(FileRecords fileRecords, OffsetAndEpoch 
snapshotId) {
+        this.fileRecords = fileRecords;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long sizeInBytes() {
+        return fileRecords.sizeInBytes();
+    }
+
+    @Override
+    public Iterator<RecordBatch> iterator() {
+        return new Iterator<RecordBatch>() {

Review comment:
       nit: kind of annoying to wrap the iterator just to avoid the generic 
warning. An alternative might be to use `<? extends RecordBatch>` in the 
interface.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+
+/**
+ * A type for writing a snapshot fora given end offset and epoch.
+ *
+ * A snapshot writer can be used to append objects until freeze is called. 
When freeze is
+ * called the snapshot is validated and marked as immutable. After freeze is 
called any
+ * append will fail with an exception.
+ *
+ * It is assumed that the content of the snapshot represents all of the 
records for the
+ * topic partition from offset 0 up to but not including the end offset in the 
snapshot
+ * id.
+ *
+ * @see org.apache.kafka.raft.RaftClient#createSnapshot(OffsetAndEpoch)
+ */
+final public class SnapshotWriter<T> implements Closeable {
+    final private RawSnapshotWriter snapshot;
+    final private BatchAccumulator<T> accumulator;
+    final private Time time;
+
+    /**
+     * Initializes a new instance of the class.
+     *
+     * @param snapshot the low level snapshot writer
+     * @param maxBatchSize the maximum size in byte for a batch
+     * @param memoryPool the memory pool for buffer allocation
+     * @param time the clock implementation
+     * @param compressionType the compression algorithm to use
+     * @param serde the record serialization and deserialization implementation
+     */
+    public SnapshotWriter(
+        RawSnapshotWriter snapshot,
+        int maxBatchSize,
+        MemoryPool memoryPool,
+        Time time,
+        CompressionType compressionType,
+        RecordSerde<T> serde
+    ) {
+        this.snapshot = snapshot;
+        this.time = time;
+
+        this.accumulator = new BatchAccumulator<>(
+            snapshot.snapshotId().epoch,
+            0,
+            Integer.MAX_VALUE,
+            maxBatchSize,
+            memoryPool,
+            time,
+            compressionType,
+            serde
+        );
+    }
+
+    /**
+     * Returns the end offset and epoch for the snapshot.
+     */
+    public OffsetAndEpoch snapshotId() {
+        return snapshot.snapshotId();
+    }
+
+    /**
+     * Returns true if the snapshot has been frozen, otherwise false is 
returned.
+     *
+     * Modification to the snapshot are not allowed once it is frozen.
+     */
+    public boolean isFrozen() {
+        return snapshot.isFrozen();
+    }
+
+    /**
+     * Appends a list of values to the snapshot.
+     *
+     * The list of record passed are guaranteed to get written together.
+     *
+     * @param records the list of records to append to the snapshot
+     * @throws IOException for any IO error while appending
+     * @throws IllegalStateException if append is called when isFrozen is true
+     */
+    public void append(List<T> records) throws IOException {
+        if (snapshot.isFrozen()) {
+            String message = String.format(
+                "Append not supported. Snapshot is already frozen: id = {}.",
+                snapshot.snapshotId()
+            );
+
+            throw new IllegalStateException(message);
+        }
+
+        accumulator.append(snapshot.snapshotId().epoch, records);
+
+        if (!accumulator.needsDrain(time.milliseconds())) {

Review comment:
       nit: I think this would be more natural:
   ```java
   if (accumulator.needsDrain(time.milliseconds())) {
     appendBatches(accumulator.drain());
   }
   ```

##########
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+    private final Path path;
+    private final FileChannel channel;
+    private final OffsetAndEpoch snapshotId;
+    private boolean frozen = false;
+
+    private FileRawSnapshotWriter(
+        Path path,
+        FileChannel channel,
+        OffsetAndEpoch snapshotId
+    ) {
+        this.path = path;
+        this.channel = channel;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long sizeInBytes() throws IOException {
+        return channel.size();
+    }
+
+    @Override
+    public void append(ByteBuffer buffer) throws IOException {
+        if (frozen) {
+            throw new IllegalStateException(
+                String.format("Append not supported. Snapshot is already 
frozen: id = %s; path = %s", snapshotId, path)
+            );
+        }
+
+        Utils.writeFully(channel, buffer);
+    }
+
+    @Override
+    public boolean isFrozen() {
+        return frozen;
+    }
+
+    @Override
+    public void freeze() throws IOException {
+        channel.close();
+        frozen = true;
+
+        // Set readonly and ignore the result
+        if (!path.toFile().setReadOnly()) {
+            throw new IOException(String.format("Unable to set file %s as 
read-only", path));
+        }
+
+        Path destination = Snapshots.moveRename(path, snapshotId);
+        Files.move(path, destination, StandardCopyOption.ATOMIC_MOVE);
+    }
+
+    @Override
+    public void close() throws IOException {
+        channel.close();
+        Files.deleteIfExists(path);

Review comment:
       Maybe we can just a better name for `path` since it makes this code look 
suspicious.

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##########
@@ -472,4 +489,110 @@ private EpochStartOffset(int epoch, long startOffset) {
         }
     }
 
+    final class MockRawSnapshotWriter implements RawSnapshotWriter {
+        private final OffsetAndEpoch snapshotId;
+        private ByteBuffer data;

Review comment:
       We could probably use `ByteBufferOutputStream` which already handles 
expansion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to