jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r600813565



##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##########
@@ -418,48 +414,49 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch 
snapshotId) {
     public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {}
 
     @Override
-    public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) {
-        if (logStartOffset() > logStartSnapshotId.offset ||
-            highWatermark.offset < logStartSnapshotId.offset) {
+    public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {

Review comment:
       Note that the changes to this method are to relax the log start offset 
and high-watermark invariant so that we can create more interesting snapshot 
and log states in the `RaftClientTestContext.Builder`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1300,18 +1330,18 @@ private FetchSnapshotResponseData 
handleFetchSnapshotRequest(
 
             int maxSnapshotSize;
             try {
-                maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes());
+                maxSnapshotSize = Math.toIntExact(snapshotSize);
             } catch (ArithmeticException e) {
                 maxSnapshotSize = Integer.MAX_VALUE;
             }
 
             if (partitionSnapshot.position() > Integer.MAX_VALUE) {
+                // TODO: This should return an error response instead of 
throwing an exception
                 throw new IllegalStateException(String.format("Trying to fetch 
a snapshot with position: %d lager than Int.MaxValue", 
partitionSnapshot.position()));
             }
 
-            UnalignedRecords records = 
snapshot.read(partitionSnapshot.position(), Math.min(data.maxBytes(), 
maxSnapshotSize));
-
-            long snapshotSize = snapshot.sizeInBytes();
+            // TODO: I think this slice of records is closed when the snapshot 
is close in the try (...) above.
+            UnalignedRecords records = 
snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), 
maxSnapshotSize));

Review comment:
       When the implementation is a `FileRawSnapshotReader`, the created slice 
will be `close` before the network client has had a chance to send the bytes. 
Created https://issues.apache.org/jira/browse/KAFKA-12543 and I will work on 
this after this PR.

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##########
@@ -355,33 +354,30 @@ public LogFetchInfo read(long startOffset, Isolation 
isolation) {
         }
 
         ByteBuffer buffer = ByteBuffer.allocate(512);
-        LogEntry firstEntry = null;
+        LogOffsetMetadata batchStartOffset = null;

Review comment:
       Note that the changes to this method are so that `read`  doesn't return 
all of the batches from from `startOffset` to `highWatermark`. This was needed 
for more interested test cases around snapshot loading.

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
##########
@@ -124,6 +126,15 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> 
reader) {
             }
         }
 
+        @Override
+        public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> 
reader) {
+            // TODO: Create Jira: Handle loading commit in ListenerShim

Review comment:
       Okay. I'll remove the TODO. Do you have a Jira for this? If not let me 
know and I can create one.

##########
File path: shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
##########
@@ -105,6 +106,22 @@ public void handleCommits(long lastOffset, 
List<ApiMessage> messages) {
             }, null);
         }
 
+        @Override
+        public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> 
reader) {
+            // TODO: Create Jira: Need to cover the case where handle snapshot 
invalidates previous commits
+            //                    Need to handle that reader.snapshotId() 
means that every record up to that offset is committed

Review comment:
       Created this Jira: https://issues.apache.org/jira/browse/KAFKA-12545

##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##########
@@ -87,6 +96,25 @@ public synchronized void handleCommit(BatchReader<Integer> 
reader) {
         }
     }
 
+    @Override
+    public synchronized void handleSnapshot(SnapshotReader<Integer> reader) {
+        try {
+            try (SnapshotReader<Integer> snapshot = reader) {
+                log.debug("Loading snapshot {}", snapshot.snapshotId());
+                for (List<Integer> batch : snapshot) {
+                    for (Integer value : batch) {
+                        log.debug("Setting value: {}", value);
+                        this.committed = value;
+                        this.uncommitted = value;
+                    }
+                }
+                log.debug("Finished loading snapshot. Set value: {}", 
this.committed);
+            }
+        } catch (IOException e) {

Review comment:
       This is because `SnapshotReader::close` was declare as throwing an 
`IOException`.
   
   This made the API for `SnapshotReader` confusing a `hasNext` and `next` 
don't throw an `IOException` even though they read from disk in some cases. I 
fixed this by changing `SnapshotReader` to implement `AutoCloseable` instead of 
`Closeable`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2154,8 +2182,14 @@ private boolean maybeCompleteShutdown(long 
currentTimeMs) {
         return false;
     }
 
-    private void maybeUpdateOldestSnapshotId() {
-        log.latestSnapshotId().ifPresent(log::deleteBeforeSnapshot);
+    private void maybeUpdateEarliestSnapshotId() {

Review comment:
       My thought process was that what the Raft client cares about is the log 
start offset and the snapshot at that offset. The fact that snapshots get 
deleted and how they get deleted is an implementation detail in `ReplicatedLog`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to