[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r535482262 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -74,18 +74,19 @@ public void freeze() throws IOException { frozen = true; // Set readonly and ignore the result -if (!path.toFile().setReadOnly()) { -throw new IOException(String.format("Unable to set file %s as read-only", path)); +if (!tempSnapshotPath.toFile().setReadOnly()) { +throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath)); } -Path destination = Snapshots.moveRename(path, snapshotId); -Files.move(path, destination, StandardCopyOption.ATOMIC_MOVE); +Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId); +Files.move(tempSnapshotPath, destination, StandardCopyOption.ATOMIC_MOVE); } @Override public void close() throws IOException { channel.close(); -Files.deleteIfExists(path); +// This is a noop if freeze was called before calling close +Files.deleteIfExists(tempSnapshotPath); Review comment: Yes! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r535474040 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.OffsetAndEpoch; + +public final class FileRawSnapshotWriter implements RawSnapshotWriter { +private final Path tempSnapshotPath; +private final FileChannel channel; +private final OffsetAndEpoch snapshotId; +private boolean frozen = false; + +private FileRawSnapshotWriter( +Path tempSnapshotPath, +FileChannel channel, +OffsetAndEpoch snapshotId +) { +this.tempSnapshotPath = tempSnapshotPath; +this.channel = channel; +this.snapshotId = snapshotId; +} + +@Override +public OffsetAndEpoch snapshotId() { +return snapshotId; +} + +@Override +public long sizeInBytes() throws IOException { +return channel.size(); +} + +@Override +public void append(ByteBuffer buffer) throws IOException { +if (frozen) { +throw new IllegalStateException( +String.format("Append not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) +); +} + +Utils.writeFully(channel, buffer); +} + +@Override +public boolean isFrozen() { +return frozen; +} + +@Override +public void freeze() throws IOException { +channel.close(); +frozen = true; + +// Set readonly and ignore the result +if (!tempSnapshotPath.toFile().setReadOnly()) { +throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath)); +} + +Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId); +Files.move(tempSnapshotPath, destination, StandardCopyOption.ATOMIC_MOVE); Review comment: I investigated the issue a bit. It look like this utility was introduce to replace these lines in `OffsetCheckpoint.scala`: ```scala // swap new offset checkpoint file with previous one if(!temp.renameTo(file)) { // renameTo() fails on Windows if the destination file exists. file.delete() if(!temp.renameTo(file)) throw new IOException(...) } ``` Looking at the JDK implementation in Windows, `ATOMIC_MOVE` should work in Windows if the target file exists: https://github.com/openjdk/jdk/blob/master/src/java.base/windows/classes/sun/nio/fs/WindowsFileCopy.java#L298-L312 In other words, I think we can keep the code as is in this PR. cc @ijuma This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r534592624 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -100,4 +102,15 @@ default void handleResign() {} */ CompletableFuture shutdown(int timeoutMs); +/** + * Create a writable snapshot file for a given offset and epoch. + * + * The RaftClient assumes that the snapshot return will contain the records up to but + * not including the end offset in the snapshot id. See {@link SnapshotWriter} for + * details on how to use this object. + * + * @param snapshotId the end offset and epoch that identifies the snapshot Review comment: Create https://issues.apache.org/jira/browse/KAFKA-10800 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r534587344 ## File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.text.NumberFormat; +import org.apache.kafka.raft.OffsetAndEpoch; + +final class Snapshots { +private static final String SNAPSHOT_DIR = "snapshots"; Review comment: Done. Removed the `snapshots` directory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r534581287 ## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ## @@ -472,4 +489,110 @@ private EpochStartOffset(int epoch, long startOffset) { } } +final class MockRawSnapshotWriter implements RawSnapshotWriter { +private final OffsetAndEpoch snapshotId; +private ByteBuffer data; Review comment: Cool. Yeah, let's use that. So many util goodies. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r534577659 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.OffsetAndEpoch; + +public final class FileRawSnapshotWriter implements RawSnapshotWriter { +private final Path path; +private final FileChannel channel; +private final OffsetAndEpoch snapshotId; +private boolean frozen = false; + +private FileRawSnapshotWriter( +Path path, +FileChannel channel, +OffsetAndEpoch snapshotId +) { +this.path = path; +this.channel = channel; +this.snapshotId = snapshotId; +} + +@Override +public OffsetAndEpoch snapshotId() { +return snapshotId; +} + +@Override +public long sizeInBytes() throws IOException { +return channel.size(); +} + +@Override +public void append(ByteBuffer buffer) throws IOException { +if (frozen) { +throw new IllegalStateException( +String.format("Append not supported. Snapshot is already frozen: id = %s; path = %s", snapshotId, path) +); +} + +Utils.writeFully(channel, buffer); +} + +@Override +public boolean isFrozen() { +return frozen; +} + +@Override +public void freeze() throws IOException { +channel.close(); +frozen = true; + +// Set readonly and ignore the result +if (!path.toFile().setReadOnly()) { +throw new IOException(String.format("Unable to set file %s as read-only", path)); +} + +Path destination = Snapshots.moveRename(path, snapshotId); +Files.move(path, destination, StandardCopyOption.ATOMIC_MOVE); Review comment: I prefer to keep it this way. The atomic move (rename) should always succeed because we guarantee it is within the same directory. I think I prefer for Kafka to throw an exception than for the raft client or state machine to see a partial snapshot file because it performed a file copy instead of a file rename. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r534572296 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.Iterator; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.raft.OffsetAndEpoch; + +public final class FileRawSnapshotReader implements RawSnapshotReader { +private final FileRecords fileRecords; +private final OffsetAndEpoch snapshotId; + +private FileRawSnapshotReader(FileRecords fileRecords, OffsetAndEpoch snapshotId) { +this.fileRecords = fileRecords; +this.snapshotId = snapshotId; +} + +@Override +public OffsetAndEpoch snapshotId() { +return snapshotId; +} + +@Override +public long sizeInBytes() { +return fileRecords.sizeInBytes(); +} + +@Override +public Iterator iterator() { +return new Iterator() { Review comment: I am not sure if we can do that. The interface `RawSnapshotReader` extends `Iterable`. `Iterable` defines that method as `Iterator iterator()`. As you point out, they should have defined it as `Iterator iterator()` since `Iterator` is covariant. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r534489751 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -100,4 +102,15 @@ default void handleResign() {} */ CompletableFuture shutdown(int timeoutMs); +/** + * Create a writable snapshot file for a given offset and epoch. + * + * The RaftClient assumes that the snapshot return will contain the records up to but + * not including the end offset in the snapshot id. See {@link SnapshotWriter} for + * details on how to use this object. + * + * @param snapshotId the end offset and epoch that identifies the snapshot Review comment: Yeah. I'll create a Jira to validate the snapshot id. We should perform at least the following validations: 1. it is less than the high-watermark. 2. it is a valid epoch and end offset based on the log. I think we can use the leader epoch cache to check this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r533835857 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java ## @@ -149,6 +152,29 @@ default OptionalLong truncateToEndOffset(OffsetAndEpoch endOffset) { return OptionalLong.of(truncationOffset); } +/** + * Create a writable snapshot for the given snapshot id. + * + * See {@link RawSnapshotWriter} for details on how to use this object. + * + * @param snapshotId the end offset and epoch that identifies the snapshot + * @return a writable snapshot + */ +RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws IOException; Review comment: Yeah. I think we will need that when we implement deleting snapshot. Do you mind if I add this later? Also, I think we are going to need a `readLatestSnapshot()` when the state machine (controller or broker) needs to load the latest valid snapshot. I was planning to add this later when the case was clear to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r533833428 ## File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.text.NumberFormat; +import org.apache.kafka.raft.OffsetAndEpoch; + +final class Snapshots { +private static final String SNAPSHOT_DIR = "snapshots"; Review comment: I think this depends on if we need to scan the snapshot directory. Unfortunately, I don't have a concrete answer at the moment. When we implement the changes to the rest of the raft client. Log truncation, updating the start offset and LEO, we may need to scan the snapshot/checkpoint folder to determine the greatest log start offset and LEO. @lbradstreet suggested storing them in a different directory as part of the KIP-630 review process as Kafka already have a few files in the partition log directory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r533829583 ## File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.text.NumberFormat; +import org.apache.kafka.raft.OffsetAndEpoch; + +final class Snapshots { +private static final String SNAPSHOT_DIR = "snapshots"; +private static final String SUFFIX = ".snapshot"; Review comment: Good catch. I mentioned using `.checkpoint` in KIP-630. I forgot to change it here. I'll change it to that but let me know if you have a preference. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r533828329 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -100,4 +102,15 @@ default void handleResign() {} */ CompletableFuture shutdown(int timeoutMs); +/** + * Create a writable snapshot file for a given offset and epoch. + * + * The RaftClient assumes that the snapshot return will contain the records up to but + * not including the end offset in the snapshot id. See {@link SnapshotWriter} for + * details on how to use this object. + * + * @param snapshotId the end offset and epoch that identifies the snapshot Review comment: Is there a specific reason why you are asking this? We don't currently check for this. I will add a check for this and we can relax this later if we need to. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r519213671 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.raft.OffsetAndEpoch; + +// TODO: Write documentation for this type and all of the methods +public interface SnapshotReader extends Closeable, Iterable { Review comment: @hachikuji and I discussed this offline. The latest PR should reflect our discussion. The interfaces `RawSnapshotWriter` and `RawSnapshotReader` are expected to change after we address https://issues.apache.org/jira/browse/KAFKA-10694 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r519211721 ## File path: raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.raft.OffsetAndEpoch; + +/** + * Interface for reading snapshots as a sequence of records. + */ +public interface RawSnapshotReader extends Closeable, Iterable { +/** + * Returns the end offset and epoch for the snapshot. + */ +public OffsetAndEpoch snapshotId(); + +/** + * Returns the number of bytes for the snapshot. + * + * @throws IOException for any IO error while reading the size + */ +public long sizeInBytes() throws IOException; + +/** + * Reads bytes from position into the given buffer. + * + * It is not guarantee that the given buffer will be filled. + * + * @param buffer byte buffer to put the read files + * @param position the starting position in the snapshot to read + * @return the number of bytes read + * @throws IOException for any IO error while reading the snapshot + */ +public int read(ByteBuffer buffer, long position) throws IOException; Review comment: As @hachikuji and I discussed offline, it is very like that this method will change to: ```java public BaseRecords slice(long position) throws IOException; ``` After we implement https://issues.apache.org/jira/browse/KAFKA-10694 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r519211586 ## File path: raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotWriter.java ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.kafka.raft.OffsetAndEpoch; + +/** + * Interface for writing snapshot as a sequence of records. + */ +public interface RawSnapshotWriter extends Closeable { +/** + * Returns the end offset and epoch for the snapshot. + */ +public OffsetAndEpoch snapshotId(); + +/** + * Returns the number of bytes for the snapshot. + * + * @throws IOException for any IO error while reading the size + */ +public long sizeInBytes() throws IOException; + +/** + * Fully appends the buffer to the snapshot. + * + * If the method returns without an exception the given buffer was fully writing the + * snapshot. + * + * @param buffer the buffer to append + * @throws IOException for any IO error during append + */ +public void append(ByteBuffer buffer) throws IOException; Review comment: As @hachikuji and I discussed offline, it is very like that this method will change to: ```java public void append(MemoryRecords records) throws IOException; ``` After we implement https://issues.apache.org/jira/browse/KAFKA-10694 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r519211390 ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -32,25 +32,25 @@ * This class is responsible for managing the current state of this node and ensuring only * valid state transitions. * - * Unattached => + * Unattached transitions to: Review comment: Changes to this file are unrelated to this PR. Made them here so that `docsJar` would succeed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r518916392 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.raft.OffsetAndEpoch; + +// TODO: Write documentation for this type and all of the methods +public interface SnapshotReader extends Closeable, Iterable { Review comment: > Perhaps it would be possible to extend Records to provide what we need instead of creating a new interface? I think you are right that we don't need `SnapshotReader` as `Records` provides all of the functionality we need. `SnapshotReader` was added so that we didn't expose all of the mutating APIs in `FileRecords`. Snapshot are supposed to be immutable once frozen. I think we still want `SnapshotWriter` or something similar as it provides 1. raw writes of bytes and 2. `freeze` which optionally marks the snapshot as immutable if the validation passes. For 1., I don't think we should add raw writes to `Records` or `FileRecords` as in essence we are exposing an unsafe API and the user needs to make sure that they are writing the correct data. For 2., I think we can get away from introducing a new type/interface and instead add that functionality as a static method in `Snapshots.java`. Unittest (mock tests) maybe difficult with this code organization. > As far as the naming, I wonder if we can reserve the nicer SnapshotXXX names for the state machine. Yes.I'll use the prettier name for the type exposed to the state machine. > Really what I would like is a common type that can be used by both handleSnapshot and handleCommit since both callbacks just need to provide a way to read through a set of records. I was thinking it could be BatchReader, but it doesn't have to be if there is something better. I think this should be possible. I haven't implemented this part so I don't have all of the details. I think the requirement for the type sent through `handleSnapshot` are a subset of the requirements for the type sent through `handleCommit`. In particular, the Raft Client (`ListenerContext`) only needs to know when the snapshot has been read fully (e.g. `close`). The Raft Client doesn't need so to know what is the "last seen offset + 1" as it already knows the `SnapshotId` sent through `handleSnapshot`. > (By the way, it's not too clear to me why we need freeze when we already have close.) I wanted to allow the user to abort a snapshot. If `close` is called without calling `freeze` then the partial snapshot is deleted and not made immutable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r518396177 ## File path: core/src/main/scala/kafka/snapshot/KafkaSnapshotReader.scala ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.snapshot + +import java.nio.ByteBuffer +import java.nio.file.Path +import java.util.{Iterator => JIterator} +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.record.FileRecords +import org.apache.kafka.raft.OffsetAndEpoch +import org.apache.kafka.snapshot.SnapshotReader + +final class KafkaSnapshotReader private (fileRecords: FileRecords, snapshotId: OffsetAndEpoch) extends SnapshotReader { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r517703078 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.raft.OffsetAndEpoch; + +// TODO: Write documentation for this type and all of the methods +public interface SnapshotReader extends Closeable, Iterable { Review comment: This comment applies to some of your other observations. At high-level there are 4 use cases that we need to design and implement. Two use cases are for the Raft implementation. Two use cases are for the state machine. ### Raft implementation These types are internal to the raft implementation and don't have to be exposed to the state machine. Leader Use Case The leader needs to be able to send a part of the snapshot over the network. Something like this ```java interface SnapshotReader extends Closeable { long transferTo(long position, long maxBytes, WritableChannel channel); int read(ByteBuffer buffer, long position); } ``` Follower Use Case The followers need to be able to copy bytes from the network and validate the snapshot on disk when fetching is done. ```java interface SnapshotWriter extends Closeable { void append(ByteBuffer buffer); void validate(); void freeze(); } ``` ### State machine implementation These types are exposed to the state machine. Load Snapshot The state machine needs to be able to load/scan the entire snapshot. The state machine can use `close` to tell the raft client that it finished loading the snapshot. This will be implemented in a future PR but it could look like this: ```java interface BatchedSnapshotReader extends Iterable>, Closeable { } ``` Generate Snapshot The state machine needs to be able to generate a snapshot by appending records/values and marking the snapshot as immutable (`freeze`) when it is done. ```java interface BatchdSnapshotWriter extends Closeable { void append(Iterable records); void freeze(); } ``` ### Notes `SnapshotWriter` and `SnapshotReader` need to be interfaces because we will have a real implementation and a mocked implementation for testing. These two types are internal to raft and are not exposed to the state machine. `BatchedSnapshotReader` and `BatchedSnapshotWriter` can depend on `SnapshotWriter` and `SnapshotReade` to reuse some code but this is not strictly required. These two type don't have to be `interfaces` if they delegate the IO to `SnapshotWriter` and `SnapshotReader`. What do you think @hachikuji? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r517703953 ## File path: raft/src/main/java/org/apache/kafka/snapshot/BatchedSnapshotWriter.java ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.raft.RecordSerde; +import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; +import org.apache.kafka.raft.internals.BatchAccumulator; + +// TODO: Write documentation for this type and all of the methods +final public class BatchedSnapshotWriter implements Closeable { Review comment: I cover some of the motivation for this here: https://github.com/apache/kafka/pull/9512#discussion_r517703078. Let's move the conversation there if you agree. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r517703625 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.raft.OffsetAndEpoch; + +// TODO: Write documentation for this type and all of the methods +public interface SnapshotReader extends Closeable, Iterable { + +public OffsetAndEpoch snapshotId(); + +public long sizeInBytes(); + +public Iterator iterator(); + +public int read(ByteBuffer buffer, long position) throws IOException; Review comment: I think I address this in this comment https://github.com/apache/kafka/pull/9512#discussion_r517703078. If you agree let's move the conversation there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r517703078 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.raft.OffsetAndEpoch; + +// TODO: Write documentation for this type and all of the methods +public interface SnapshotReader extends Closeable, Iterable { Review comment: This comment applies to some of your other observations. At high-level there are 4 use cases that we need to design and implement. Two use cases are for the Raft implementation. Two use cases are for the state machine. ### Raft implementation These types are internal to the raft implementation and don't have to be exposed to the state machine. Leader Use Case The leader needs to be able to send a part of the snapshot over the network. Something like this ```java interface SnapshotReader extends Closeable { long transferTo(long position, long maxBytes, WritableChannel channel); int read(ByteBuffer buffer, long position); } ``` Follower Use Case The followers need to be able to copy bytes from the network and validate the snapshot on disk when fetching is done. ```java interface SnapshotWriter extends Closeable { void append(ByteBuffer buffer); void validate(); void freeze(); } ``` ### State machine implementation These types are exposed to the state machine. Load Snapshot The state machine needs to be able to load/scan the entire snapshot. The state machine can use `close` to tell the raft client that it finished loading the snapshot. This will be implemented in a future PR but it could look like this: ```java interface BatchedSnapshotReader extends Iterable>, Closeable { } ``` Generate Snapshot The state machine needs to be able to generate a snapshot by appending records/values and marking the snapshot as immutable (`freeze`) when it is done. ```java interface BatchdSnapshotWriter extends Closeable { void append(Iterable records); void freeze(); } ``` `SnapshotWriter` and `SnapshotReader` need to be interfaces because we will have a real implementation and a mocked implementation for testing. These two types are internal to raft and are not exposed to the state machine. `BatchedSnapshotReader` and `BatchedSnapshotWriter` can depend on `SnapshotWriter` and `SnapshotReade` to reuse some code but this is not strictly required. These two type don't have to be `interfaces` if they delegate the IO to `SnapshotWriter` and `SnapshotReader`. What do you think @hachikuji? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r517672247 ## File path: core/src/main/scala/kafka/snapshot/KafkaSnapshotReader.scala ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.snapshot + +import java.nio.ByteBuffer +import java.nio.file.Path +import java.util.{Iterator => JIterator} +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.record.FileRecords +import org.apache.kafka.raft.OffsetAndEpoch +import org.apache.kafka.snapshot.SnapshotReader + +final class KafkaSnapshotReader private (fileRecords: FileRecords, snapshotId: OffsetAndEpoch) extends SnapshotReader { Review comment: Sounds good. I'll move this code over and implement it in Java. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org