jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r600813565
########## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ########## @@ -418,48 +414,49 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) { public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {} @Override - public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) { - if (logStartOffset() > logStartSnapshotId.offset || - highWatermark.offset < logStartSnapshotId.offset) { + public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) { Review comment: Note that the changes to this method are to relax the log start offset and high-watermark invariant so that we can create more interesting snapshot and log states in the `RaftClientTestContext.Builder`. ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1300,18 +1330,18 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( int maxSnapshotSize; try { - maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes()); + maxSnapshotSize = Math.toIntExact(snapshotSize); } catch (ArithmeticException e) { maxSnapshotSize = Integer.MAX_VALUE; } if (partitionSnapshot.position() > Integer.MAX_VALUE) { + // TODO: This should return an error response instead of throwing an exception throw new IllegalStateException(String.format("Trying to fetch a snapshot with position: %d lager than Int.MaxValue", partitionSnapshot.position())); } - UnalignedRecords records = snapshot.read(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize)); - - long snapshotSize = snapshot.sizeInBytes(); + // TODO: I think this slice of records is closed when the snapshot is close in the try (...) above. + UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize)); Review comment: When the implementation is a `FileRawSnapshotReader`, the created slice will be `close` before the network client has had a chance to send the bytes. Created https://issues.apache.org/jira/browse/KAFKA-12543 and I will work on this after this PR. ########## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ########## @@ -355,33 +354,30 @@ public LogFetchInfo read(long startOffset, Isolation isolation) { } ByteBuffer buffer = ByteBuffer.allocate(512); - LogEntry firstEntry = null; + LogOffsetMetadata batchStartOffset = null; Review comment: Note that the changes to this method are so that `read` doesn't return all of the batches from from `startOffset` to `highWatermark`. This was needed for more interested test cases around snapshot loading. ########## File path: raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java ########## @@ -124,6 +126,15 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) { } } + @Override + public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) { + // TODO: Create Jira: Handle loading commit in ListenerShim Review comment: Okay. I'll remove the TODO. Do you have a Jira for this? If not let me know and I can create one. ########## File path: shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java ########## @@ -105,6 +106,22 @@ public void handleCommits(long lastOffset, List<ApiMessage> messages) { }, null); } + @Override + public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) { + // TODO: Create Jira: Need to cover the case where handle snapshot invalidates previous commits + // Need to handle that reader.snapshotId() means that every record up to that offset is committed Review comment: Created this Jira: https://issues.apache.org/jira/browse/KAFKA-12545 ########## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java ########## @@ -87,6 +96,25 @@ public synchronized void handleCommit(BatchReader<Integer> reader) { } } + @Override + public synchronized void handleSnapshot(SnapshotReader<Integer> reader) { + try { + try (SnapshotReader<Integer> snapshot = reader) { + log.debug("Loading snapshot {}", snapshot.snapshotId()); + for (List<Integer> batch : snapshot) { + for (Integer value : batch) { + log.debug("Setting value: {}", value); + this.committed = value; + this.uncommitted = value; + } + } + log.debug("Finished loading snapshot. Set value: {}", this.committed); + } + } catch (IOException e) { Review comment: This is because `SnapshotReader::close` was declare as throwing an `IOException`. This made the API for `SnapshotReader` confusing a `hasNext` and `next` don't throw an `IOException` even though they read from disk in some cases. I fixed this by changing `SnapshotReader` to implement `AutoCloseable` instead of `Closeable`. ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -2154,8 +2182,14 @@ private boolean maybeCompleteShutdown(long currentTimeMs) { return false; } - private void maybeUpdateOldestSnapshotId() { - log.latestSnapshotId().ifPresent(log::deleteBeforeSnapshot); + private void maybeUpdateEarliestSnapshotId() { Review comment: My thought process was that what the Raft client cares about is the log start offset and the snapshot at that offset. The fact that snapshots get deleted and how they get deleted is an implementation detail in `ReplicatedLog`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org