niket-goel commented on a change in pull request #10899:
URL: https://github.com/apache/kafka/pull/10899#discussion_r655511645



##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -664,4 +666,55 @@ private static void writeLeaderChangeMessage(ByteBuffer 
buffer,
         builder.close();
     }
 
+    public static MemoryRecords withSnapshotHeaderRecord(
+            long initialOffset,
+            long timestamp,
+            int leaderEpoch,
+            ByteBuffer buffer,
+            MetadataSnapshotHeaderRecord snapshotHeaderRecord) {

Review comment:
       Thanks. I was trying to find some info around styling. Will follow this 
across the PR.

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -233,6 +235,68 @@ public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, l
         }
     }
 
+
+    public void appendSnapshotHeaderMessage(MetadataSnapshotHeaderRecord 
snapshotHeaderRecord) {
+        appendLock.lock();
+        try {
+            // Ideally there should be nothing in the batch here.
+            // TODO verify this

Review comment:
       I am concerned about improper usage in the future which might lead to 
stamping the header after other records in the file. Is asserting on the fact 
that the current batch has no records as this time valid? It should be an error 
if it isn't.
   
   What do you think?

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -233,6 +235,68 @@ public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, l
         }
     }
 
+
+    public void appendSnapshotHeaderMessage(MetadataSnapshotHeaderRecord 
snapshotHeaderRecord) {
+        appendLock.lock();
+        try {
+            // Ideally there should be nothing in the batch here.
+            // TODO verify this
+            long currentTimeMs = time.milliseconds();
+            ByteBuffer buffer = memoryPool.tryAllocate(256);
+            if (buffer != null) {
+                MemoryRecords data = MemoryRecords.withSnapshotHeaderRecord(
+                    this.nextOffset,
+                    currentTimeMs,
+                    this.epoch,
+                    buffer,
+                    snapshotHeaderRecord
+                );
+                completed.add(new CompletedBatch<>(
+                    nextOffset,
+                    1,
+                    data,
+                    memoryPool,
+                    buffer
+                ));
+                nextOffset += 1;
+            } else {
+                throw new IllegalStateException("Could not allocate buffer for 
the metadata snapshot header record.");
+            }
+        } finally {
+            appendLock.unlock();
+        }
+    }
+
+    public void appendSnapshotFooterMessage(MetadataSnapshotFooterRecord 
snapshotFooterRecord) {
+        appendLock.lock();
+        try {
+            forceDrain();
+            long currentTimeMs = time.milliseconds();
+            ByteBuffer buffer = memoryPool.tryAllocate(256);
+            if (buffer != null) {
+                MemoryRecords data = MemoryRecords.withSnapshotFooterRecord(
+                    this.nextOffset,
+                    currentTimeMs,
+                    this.epoch,
+                    buffer,
+                    snapshotFooterRecord
+                );
+                completed.add(new CompletedBatch<>(
+                    nextOffset,
+                    1,
+                    data,
+                    memoryPool,
+                    buffer
+                ));
+                nextOffset += 1;
+            } else {
+                throw new IllegalStateException("Could not allocate buffer for 
the metadata snapshot footer record.");
+            }
+        } finally {
+            appendLock.unlock();
+        }
+    }

Review comment:
       I agree. Let me fix that.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -78,6 +83,76 @@ public SnapshotWriter(
         );
     }
 
+    /**
+     * Adds a {@link MetadataSnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    private void initializeSnapshotWithHeader() {
+        if (snapshot.sizeInBytes() != 0) {
+            throw new IllegalStateException("Initializing new snapshot (ID: "
+                    + snapshot.snapshotId().epoch + ", " + 
snapshot.snapshotId().offset
+                    + ") with a non-empty file");

Review comment:
       Will do.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -55,7 +60,7 @@
      * @param compressionType the compression algorithm to use
      * @param serde the record serialization and deserialization implementation
      */

Review comment:
       ack

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -78,6 +83,76 @@ public SnapshotWriter(
         );
     }
 
+    /**
+     * Adds a {@link MetadataSnapshotHeaderRecord} to snapshot
+     *
+     * @throws IllegalStateException if the snapshot is not empty
+     */
+    private void initializeSnapshotWithHeader() {
+        if (snapshot.sizeInBytes() != 0) {
+            throw new IllegalStateException("Initializing new snapshot (ID: "
+                    + snapshot.snapshotId().epoch + ", " + 
snapshot.snapshotId().offset
+                    + ") with a non-empty file");
+        }
+        MetadataSnapshotHeaderRecord headerRecord = new 
MetadataSnapshotHeaderRecord()
+            .setVersion(ControlRecordUtils.METADATA_SNAPSHOT_HEADER_VERSION);
+        accumulator.appendSnapshotHeaderMessage(headerRecord);
+        accumulator.forceDrain();
+    }
+
+    /**
+     * Add a {@link MetadataSnapshotFooterRecord} to the snapshot
+     *
+     * No more records should be appended to the snapshot after calling this 
method
+     *
+     * @throws IllegalStateException if the snapshot is empty (no header)
+     */
+    private void finalizeSnapshotWithFooter() {
+        if (snapshot.sizeInBytes() == 0) {

Review comment:
       I agree with the argument, and adding a test, but do you think we should 
remove the code check? For me the code check serves as a runtime fail safe and 
also a hint to the reader. The check is in no way complete. I do not have a 
strong opinion here. Just thinking out loud.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -135,9 +210,12 @@ public void append(List<T> records) {
 
     /**
      * Freezes the snapshot by flushing all pending writes and marking it as 
immutable.
+     *
+     * Also adds a {@link MetadataSnapshotFooterRecord} to the end of the 
snapshot
      */
     public void freeze() {
         appendBatches(accumulator.drain());
+        finalizeSnapshotWithFooter();

Review comment:
       Would we not want to drain the pipe (here accumulator) before finalizing 
it (by stamping the footer). I guess your point is that the footer is yet 
another record which we can append and then just do one final drain?
   PS right now in the code the finalize method is also forcing a drain on the 
accumulator. We could remove that too then. 
   
   Let me do it that way.

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -233,6 +235,68 @@ public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, l
         }
     }
 
+
+    public void appendSnapshotHeaderMessage(MetadataSnapshotHeaderRecord 
snapshotHeaderRecord) {
+        appendLock.lock();
+        try {
+            // Ideally there should be nothing in the batch here.
+            // TODO verify this

Review comment:
       Hmmm. I see your point of not encoding the logic of maintaining the 
snapshot invariants into the BatchAccumulator, but then there is nothing in 
code preventing the usage pattern you listed above. Are we ok with that?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -135,9 +210,12 @@ public void append(List<T> records) {
 
     /**
      * Freezes the snapshot by flushing all pending writes and marking it as 
immutable.
+     *
+     * Also adds a {@link MetadataSnapshotFooterRecord} to the end of the 
snapshot
      */
     public void freeze() {
         appendBatches(accumulator.drain());
+        finalizeSnapshotWithFooter();

Review comment:
       Ahh. I was missing point 3. Understood your point. Will make the change.

##########
File path: 
clients/src/main/resources/common/message/MetadataSnapshotFooterRecord.json
##########
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "MetadataSnapshotFooterRecord",

Review comment:
       Good Point. I had initially made this to be a generic Header and Footer 
record, but then scoped it to metadata. I did not know we had the intention of 
expanding the snapshot concept to other topics. (IIUC snapshots would only work 
on topics that are delta logged, and no other topics work that way today). But 
if we will do that, then I agree with you.
   
   I guess my only counter question would be that given today the metadata 
topic is kind of special,  do you envision the metadata snapshot schema 
evolving different (richer in information) from other snapshot-able topics, 
such that it might need it's own header/footer?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
##########
@@ -27,6 +29,8 @@
 public class ControlRecordUtils {
 
     public static final short LEADER_CHANGE_SCHEMA_VERSION = new 
LeaderChangeMessage().highestSupportedVersion();
+    public static final short METADATA_SNAPSHOT_HEADER_VERSION = new 
MetadataSnapshotHeaderRecord().highestSupportedVersion();

Review comment:
       I honestly do not have an opinion here. Was following what was done for 
the LEADER_CHANGE message (right above it). I think saying highest supported 
version (or maybe even current version) would make sense. I would then change 
the leader change message related constants as well to be consistent. Let me do 
that in the next version.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -664,4 +666,55 @@ private static void writeLeaderChangeMessage(ByteBuffer 
buffer,
         builder.close();
     }
 
+    public static MemoryRecords withSnapshotHeaderRecord(
+            long initialOffset,
+            long timestamp,
+            int leaderEpoch,
+            ByteBuffer buffer,
+            MetadataSnapshotHeaderRecord snapshotHeaderRecord) {
+        writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, 
leaderEpoch, snapshotHeaderRecord);
+        buffer.flip();
+        return MemoryRecords.readableRecords(buffer);
+            }
+
+    private static void writeSnapshotHeaderRecord(ByteBuffer buffer,
+            long initialOffset,
+            long timestamp,
+            int leaderEpoch,
+            MetadataSnapshotHeaderRecord snapshotHeaderRecord) {
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(

Review comment:
       You are right in that one of the underlying calls throws a few 
exceptions, but again looking at a similar implementation for the leader change 
message. Given that implementation (being canonical) does not implement a 
try/catch, I assumed that the API structure handles exceptions somewhere higher 
up in the stack. I guess that understanding is not correct. I will make similar 
changes for the leader change call stack as well then. Will ping offline about 
what the general exception handling mechanism here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to