niket-goel commented on a change in pull request #10899: URL: https://github.com/apache/kafka/pull/10899#discussion_r655511645
########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -664,4 +666,55 @@ private static void writeLeaderChangeMessage(ByteBuffer buffer, builder.close(); } + public static MemoryRecords withSnapshotHeaderRecord( + long initialOffset, + long timestamp, + int leaderEpoch, + ByteBuffer buffer, + MetadataSnapshotHeaderRecord snapshotHeaderRecord) { Review comment: Thanks. I was trying to find some info around styling. Will follow this across the PR. ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -233,6 +235,68 @@ public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, l } } + + public void appendSnapshotHeaderMessage(MetadataSnapshotHeaderRecord snapshotHeaderRecord) { + appendLock.lock(); + try { + // Ideally there should be nothing in the batch here. + // TODO verify this Review comment: I am concerned about improper usage in the future which might lead to stamping the header after other records in the file. Is asserting on the fact that the current batch has no records as this time valid? It should be an error if it isn't. What do you think? ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -233,6 +235,68 @@ public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, l } } + + public void appendSnapshotHeaderMessage(MetadataSnapshotHeaderRecord snapshotHeaderRecord) { + appendLock.lock(); + try { + // Ideally there should be nothing in the batch here. + // TODO verify this + long currentTimeMs = time.milliseconds(); + ByteBuffer buffer = memoryPool.tryAllocate(256); + if (buffer != null) { + MemoryRecords data = MemoryRecords.withSnapshotHeaderRecord( + this.nextOffset, + currentTimeMs, + this.epoch, + buffer, + snapshotHeaderRecord + ); + completed.add(new CompletedBatch<>( + nextOffset, + 1, + data, + memoryPool, + buffer + )); + nextOffset += 1; + } else { + throw new IllegalStateException("Could not allocate buffer for the metadata snapshot header record."); + } + } finally { + appendLock.unlock(); + } + } + + public void appendSnapshotFooterMessage(MetadataSnapshotFooterRecord snapshotFooterRecord) { + appendLock.lock(); + try { + forceDrain(); + long currentTimeMs = time.milliseconds(); + ByteBuffer buffer = memoryPool.tryAllocate(256); + if (buffer != null) { + MemoryRecords data = MemoryRecords.withSnapshotFooterRecord( + this.nextOffset, + currentTimeMs, + this.epoch, + buffer, + snapshotFooterRecord + ); + completed.add(new CompletedBatch<>( + nextOffset, + 1, + data, + memoryPool, + buffer + )); + nextOffset += 1; + } else { + throw new IllegalStateException("Could not allocate buffer for the metadata snapshot footer record."); + } + } finally { + appendLock.unlock(); + } + } Review comment: I agree. Let me fix that. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ########## @@ -78,6 +83,76 @@ public SnapshotWriter( ); } + /** + * Adds a {@link MetadataSnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + private void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + throw new IllegalStateException("Initializing new snapshot (ID: " + + snapshot.snapshotId().epoch + ", " + snapshot.snapshotId().offset + + ") with a non-empty file"); Review comment: Will do. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ########## @@ -55,7 +60,7 @@ * @param compressionType the compression algorithm to use * @param serde the record serialization and deserialization implementation */ Review comment: ack ########## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ########## @@ -78,6 +83,76 @@ public SnapshotWriter( ); } + /** + * Adds a {@link MetadataSnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + private void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + throw new IllegalStateException("Initializing new snapshot (ID: " + + snapshot.snapshotId().epoch + ", " + snapshot.snapshotId().offset + + ") with a non-empty file"); + } + MetadataSnapshotHeaderRecord headerRecord = new MetadataSnapshotHeaderRecord() + .setVersion(ControlRecordUtils.METADATA_SNAPSHOT_HEADER_VERSION); + accumulator.appendSnapshotHeaderMessage(headerRecord); + accumulator.forceDrain(); + } + + /** + * Add a {@link MetadataSnapshotFooterRecord} to the snapshot + * + * No more records should be appended to the snapshot after calling this method + * + * @throws IllegalStateException if the snapshot is empty (no header) + */ + private void finalizeSnapshotWithFooter() { + if (snapshot.sizeInBytes() == 0) { Review comment: I agree with the argument, and adding a test, but do you think we should remove the code check? For me the code check serves as a runtime fail safe and also a hint to the reader. The check is in no way complete. I do not have a strong opinion here. Just thinking out loud. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ########## @@ -135,9 +210,12 @@ public void append(List<T> records) { /** * Freezes the snapshot by flushing all pending writes and marking it as immutable. + * + * Also adds a {@link MetadataSnapshotFooterRecord} to the end of the snapshot */ public void freeze() { appendBatches(accumulator.drain()); + finalizeSnapshotWithFooter(); Review comment: Would we not want to drain the pipe (here accumulator) before finalizing it (by stamping the footer). I guess your point is that the footer is yet another record which we can append and then just do one final drain? PS right now in the code the finalize method is also forcing a drain on the accumulator. We could remove that too then. Let me do it that way. ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -233,6 +235,68 @@ public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, l } } + + public void appendSnapshotHeaderMessage(MetadataSnapshotHeaderRecord snapshotHeaderRecord) { + appendLock.lock(); + try { + // Ideally there should be nothing in the batch here. + // TODO verify this Review comment: Hmmm. I see your point of not encoding the logic of maintaining the snapshot invariants into the BatchAccumulator, but then there is nothing in code preventing the usage pattern you listed above. Are we ok with that? ########## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ########## @@ -135,9 +210,12 @@ public void append(List<T> records) { /** * Freezes the snapshot by flushing all pending writes and marking it as immutable. + * + * Also adds a {@link MetadataSnapshotFooterRecord} to the end of the snapshot */ public void freeze() { appendBatches(accumulator.drain()); + finalizeSnapshotWithFooter(); Review comment: Ahh. I was missing point 3. Understood your point. Will make the change. ########## File path: clients/src/main/resources/common/message/MetadataSnapshotFooterRecord.json ########## @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "MetadataSnapshotFooterRecord", Review comment: Good Point. I had initially made this to be a generic Header and Footer record, but then scoped it to metadata. I did not know we had the intention of expanding the snapshot concept to other topics. (IIUC snapshots would only work on topics that are delta logged, and no other topics work that way today). But if we will do that, then I agree with you. I guess my only counter question would be that given today the metadata topic is kind of special, do you envision the metadata snapshot schema evolving different (richer in information) from other snapshot-able topics, such that it might need it's own header/footer? ########## File path: clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java ########## @@ -27,6 +29,8 @@ public class ControlRecordUtils { public static final short LEADER_CHANGE_SCHEMA_VERSION = new LeaderChangeMessage().highestSupportedVersion(); + public static final short METADATA_SNAPSHOT_HEADER_VERSION = new MetadataSnapshotHeaderRecord().highestSupportedVersion(); Review comment: I honestly do not have an opinion here. Was following what was done for the LEADER_CHANGE message (right above it). I think saying highest supported version (or maybe even current version) would make sense. I would then change the leader change message related constants as well to be consistent. Let me do that in the next version. ########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -664,4 +666,55 @@ private static void writeLeaderChangeMessage(ByteBuffer buffer, builder.close(); } + public static MemoryRecords withSnapshotHeaderRecord( + long initialOffset, + long timestamp, + int leaderEpoch, + ByteBuffer buffer, + MetadataSnapshotHeaderRecord snapshotHeaderRecord) { + writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, leaderEpoch, snapshotHeaderRecord); + buffer.flip(); + return MemoryRecords.readableRecords(buffer); + } + + private static void writeSnapshotHeaderRecord(ByteBuffer buffer, + long initialOffset, + long timestamp, + int leaderEpoch, + MetadataSnapshotHeaderRecord snapshotHeaderRecord) { + MemoryRecordsBuilder builder = new MemoryRecordsBuilder( Review comment: You are right in that one of the underlying calls throws a few exceptions, but again looking at a similar implementation for the leader change message. Given that implementation (being canonical) does not implement a try/catch, I assumed that the API structure handles exceptions somewhere higher up in the stack. I guess that understanding is not correct. I will make similar changes for the leader change call stack as well then. Will ping offline about what the general exception handling mechanism here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org