niket-goel commented on a change in pull request #10899:
URL: https://github.com/apache/kafka/pull/10899#discussion_r660134385



##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -204,35 +207,103 @@ private void completeCurrentBatch() {
         currentBatch = null;
     }
 
-    public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {
+    private void appendControlMessage(
+        Supplier<MemoryRecords> supplier,
+        ByteBuffer buffer
+    ) {
         appendLock.lock();
         try {
             forceDrain();
-            ByteBuffer buffer = memoryPool.tryAllocate(256);
-            if (buffer != null) {
-                MemoryRecords data = MemoryRecords.withLeaderChangeMessage(
-                    this.nextOffset, 
-                    currentTimeMs, 
-                    this.epoch, 
-                    buffer, 
-                    leaderChangeMessage
-                );
-                completed.add(new CompletedBatch<>(
-                    nextOffset,
-                    1,
-                    data,
-                    memoryPool,
-                    buffer
-                ));
-                nextOffset += 1;
-            } else {
-                throw new IllegalStateException("Could not allocate buffer for 
the leader change record.");
-            }
+            completed.add(new CompletedBatch<>(
+                nextOffset,
+                1,
+                supplier.get(),
+                memoryPool,
+                buffer
+            ));
+            nextOffset += 1;
         } finally {
             appendLock.unlock();
         }
     }
 
+    /**
+     * Append a {@link LeaderChangeMessage} record to the batch
+     *
+     * @param @LeaderChangeMessage The message to append
+     * @param @currentTimeMs The timestamp of message generation
+     * @throws IllegalStateException on failure to allocate a buffer for the 
record
+     */
+    public void appendLeaderChangeMessage(
+        LeaderChangeMessage leaderChangeMessage,
+        long currentTimeMs
+    ) {
+        ByteBuffer buffer = memoryPool.tryAllocate(256);
+        if (buffer != null) {
+            appendControlMessage(
+                () -> MemoryRecords.withLeaderChangeMessage(
+                    this.nextOffset,
+                    currentTimeMs,
+                    this.epoch,
+                    buffer,
+                    leaderChangeMessage),
+                buffer);
+        } else {
+            throw new IllegalStateException("Could not allocate buffer for the 
leader change record.");
+        }
+    }
+
+
+    /**
+     * Append a {@link SnapshotHeaderRecord} record to the batch
+     *
+     * @param @SnapshotHeaderRecord The message to append
+     * @throws IllegalStateException on failure to allocate a buffer for the 
record
+     */
+    public void appendSnapshotHeaderMessage(
+        SnapshotHeaderRecord snapshotHeaderRecord,
+        long currentTimeMs
+    ) {
+        ByteBuffer buffer = memoryPool.tryAllocate(256);
+        if (buffer != null) {
+            appendControlMessage(
+                () -> MemoryRecords.withSnapshotHeaderRecord(
+                    this.nextOffset,
+                    currentTimeMs,
+                    this.epoch,
+                    buffer,
+                    snapshotHeaderRecord),
+                buffer);
+        } else {
+            throw new IllegalStateException("Could not allocate buffer for the 
metadata snapshot header record.");
+        }
+    }
+
+    /**
+     * Append a {@link SnapshotFooterRecord} record to the batch
+     *
+     * @param @SnapshotFooterRecord The message to append
+     * @throws IllegalStateException on failure to allocate a buffer for the 
record
+     */
+    public void appendSnapshotFooterMessage(
+        SnapshotFooterRecord snapshotFooterRecord

Review comment:
       Will do

##########
File path: 
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##########
@@ -151,6 +194,57 @@ public void testAppendToFrozenSnapshot() throws Exception {
         );
     }
 
+    private int validateDelimiters(
+        RawSnapshotReader snapshot,
+        long lastContainedLogTime
+    ) {
+        assertNotEquals(0, snapshot.sizeInBytes());
+
+        int countRecords = 0;
+
+        Iterator<RecordBatch> recordBatches = 
Utils.covariantCast(snapshot.records().batchIterator());
+
+        assertEquals(Boolean.TRUE, recordBatches.hasNext());

Review comment:
       thanks

##########
File path: 
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##########
@@ -22,25 +22,65 @@
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Random;
+import java.util.Iterator;
 import java.util.Set;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
 import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.raft.RaftClientTestContext;
 import org.apache.kafka.raft.internals.StringSerde;
+import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier;
+import org.apache.kafka.common.record.ControlRecordUtils;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Utils;
+
 import org.junit.jupiter.api.Test;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 final public class SnapshotWriterReaderTest {
     private final int localId = 0;
     private final Set<Integer> voters = Collections.singleton(localId);
 
+    @Test
+    public void testSnapshotDelimiters() throws Exception {
+        int recordsPerBatch = 1;
+        int batches = 0;
+        int delimiterCount = 2;
+        long lastContainedLogTime = new MockTime().milliseconds();

Review comment:
       Will use a deafult

##########
File path: 
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##########
@@ -151,6 +194,57 @@ public void testAppendToFrozenSnapshot() throws Exception {
         );
     }
 
+    private int validateDelimiters(
+        RawSnapshotReader snapshot,
+        long lastContainedLogTime
+    ) {
+        assertNotEquals(0, snapshot.sizeInBytes());
+
+        int countRecords = 0;
+
+        Iterator<RecordBatch> recordBatches = 
Utils.covariantCast(snapshot.records().batchIterator());
+
+        assertEquals(Boolean.TRUE, recordBatches.hasNext());
+        RecordBatch batch = recordBatches.next();
+
+        Iterator<Record> records = batch.streamingIterator(new 
GrowableBufferSupplier());
+
+        // Verify existence of the header record
+        assertEquals(Boolean.TRUE, batch.isControlBatch());
+        assertEquals(Boolean.TRUE, records.hasNext());
+        Record record = records.next();
+        countRecords += 1;
+
+        SnapshotHeaderRecord headerRecord = 
ControlRecordUtils.deserializedSnapshotHeaderRecord(record);
+        assertEquals(headerRecord.version(), 
ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION);
+        assertEquals(headerRecord.lastContainedLogTime(), 
lastContainedLogTime);
+
+        assertEquals(Boolean.FALSE, records.hasNext());
+
+        // Loop over remaining records
+        while (recordBatches.hasNext()) {
+            batch = recordBatches.next();
+            records = batch.streamingIterator(new GrowableBufferSupplier());
+
+            while (records.hasNext()) {
+                countRecords += 1;
+                record = records.next();
+            }
+        }
+
+        // Verify existence of the footer record
+        assertEquals(Boolean.TRUE, batch.isControlBatch());
+
+        SnapshotFooterRecord footerRecord = 
ControlRecordUtils.deserializedSnapshotFooterRecord(record);
+        assertEquals(footerRecord.version(), 
ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION);
+
+        // Verify there is nothing past the footer
+        assertEquals(Boolean.FALSE, records.hasNext());
+        assertEquals(Boolean.FALSE, recordBatches.hasNext());

Review comment:
       Yep. Just an old habit of asserting things to aid readability. Will 
remove it here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to