jsancio commented on a change in pull request #10899:
URL: https://github.com/apache/kafka/pull/10899#discussion_r660032373



##########
File path: clients/src/main/resources/common/message/SnapshotHeaderRecord.json
##########
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "SnapshotHeaderRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    {"name": "Version", "type": "int16", "versions": "0+",
+      "about": "The version of the snapshot header record"},
+    {"name": "LastContainedLogTime", "type": "int64", "versions": "0+",
+      "about": "The append time of the highest record contained in this 
snapshot"}

Review comment:
       How about "The append time of the last record from the log contained in 
the snapshot"?

##########
File path: 
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##########
@@ -151,6 +194,57 @@ public void testAppendToFrozenSnapshot() throws Exception {
         );
     }
 
+    private int validateDelimiters(
+        RawSnapshotReader snapshot,
+        long lastContainedLogTime
+    ) {
+        assertNotEquals(0, snapshot.sizeInBytes());
+
+        int countRecords = 0;
+
+        Iterator<RecordBatch> recordBatches = 
Utils.covariantCast(snapshot.records().batchIterator());
+
+        assertEquals(Boolean.TRUE, recordBatches.hasNext());

Review comment:
       You can use `assertTrue` instead. This comment applies to a few places.

##########
File path: clients/src/main/resources/common/message/SnapshotHeaderRecord.json
##########
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "SnapshotHeaderRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    {"name": "Version", "type": "int16", "versions": "0+",
+      "about": "The version of the snapshot header record"},
+    {"name": "LastContainedLogTime", "type": "int64", "versions": "0+",

Review comment:
       Let's call this `LastContainedLogTimestamp` to match Kafka's existing 
use of "Timestamp"`.

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -204,35 +207,103 @@ private void completeCurrentBatch() {
         currentBatch = null;
     }
 
-    public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {
+    private void appendControlMessage(
+        Supplier<MemoryRecords> supplier,
+        ByteBuffer buffer
+    ) {
         appendLock.lock();
         try {
             forceDrain();
-            ByteBuffer buffer = memoryPool.tryAllocate(256);
-            if (buffer != null) {
-                MemoryRecords data = MemoryRecords.withLeaderChangeMessage(
-                    this.nextOffset, 
-                    currentTimeMs, 
-                    this.epoch, 
-                    buffer, 
-                    leaderChangeMessage
-                );
-                completed.add(new CompletedBatch<>(
-                    nextOffset,
-                    1,
-                    data,
-                    memoryPool,
-                    buffer
-                ));
-                nextOffset += 1;
-            } else {
-                throw new IllegalStateException("Could not allocate buffer for 
the leader change record.");
-            }
+            completed.add(new CompletedBatch<>(
+                nextOffset,
+                1,
+                supplier.get(),
+                memoryPool,
+                buffer
+            ));
+            nextOffset += 1;
         } finally {
             appendLock.unlock();
         }
     }
 
+    /**
+     * Append a {@link LeaderChangeMessage} record to the batch
+     *
+     * @param @LeaderChangeMessage The message to append
+     * @param @currentTimeMs The timestamp of message generation
+     * @throws IllegalStateException on failure to allocate a buffer for the 
record
+     */
+    public void appendLeaderChangeMessage(
+        LeaderChangeMessage leaderChangeMessage,
+        long currentTimeMs
+    ) {
+        ByteBuffer buffer = memoryPool.tryAllocate(256);

Review comment:
       How about moving the buffer allocation to `appendControlMessage`? Not 
sure we need different messages in the exception since we will have different 
stacktraces.

##########
File path: 
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##########
@@ -22,25 +22,65 @@
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Random;
+import java.util.Iterator;
 import java.util.Set;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
 import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.raft.RaftClientTestContext;
 import org.apache.kafka.raft.internals.StringSerde;
+import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier;
+import org.apache.kafka.common.record.ControlRecordUtils;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Utils;
+
 import org.junit.jupiter.api.Test;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 final public class SnapshotWriterReaderTest {
     private final int localId = 0;
     private final Set<Integer> voters = Collections.singleton(localId);
 
+    @Test
+    public void testSnapshotDelimiters() throws Exception {
+        int recordsPerBatch = 1;
+        int batches = 0;
+        int delimiterCount = 2;
+        long lastContainedLogTime = new MockTime().milliseconds();

Review comment:
       I suggest using some known value. `MockTime` could be zero for example. 
This is okay because this time is an append time which may be unrelated to the 
mock time.

##########
File path: 
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##########
@@ -151,6 +194,57 @@ public void testAppendToFrozenSnapshot() throws Exception {
         );
     }
 
+    private int validateDelimiters(
+        RawSnapshotReader snapshot,
+        long lastContainedLogTime
+    ) {
+        assertNotEquals(0, snapshot.sizeInBytes());
+
+        int countRecords = 0;
+
+        Iterator<RecordBatch> recordBatches = 
Utils.covariantCast(snapshot.records().batchIterator());
+
+        assertEquals(Boolean.TRUE, recordBatches.hasNext());
+        RecordBatch batch = recordBatches.next();
+
+        Iterator<Record> records = batch.streamingIterator(new 
GrowableBufferSupplier());
+
+        // Verify existence of the header record
+        assertEquals(Boolean.TRUE, batch.isControlBatch());
+        assertEquals(Boolean.TRUE, records.hasNext());
+        Record record = records.next();
+        countRecords += 1;
+
+        SnapshotHeaderRecord headerRecord = 
ControlRecordUtils.deserializedSnapshotHeaderRecord(record);
+        assertEquals(headerRecord.version(), 
ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION);
+        assertEquals(headerRecord.lastContainedLogTime(), 
lastContainedLogTime);
+
+        assertEquals(Boolean.FALSE, records.hasNext());

Review comment:
       You can use `assertFalse` instead. This commend applies to a few places.

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -204,35 +207,103 @@ private void completeCurrentBatch() {
         currentBatch = null;
     }
 
-    public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {
+    private void appendControlMessage(
+        Supplier<MemoryRecords> supplier,
+        ByteBuffer buffer
+    ) {
         appendLock.lock();
         try {
             forceDrain();
-            ByteBuffer buffer = memoryPool.tryAllocate(256);
-            if (buffer != null) {
-                MemoryRecords data = MemoryRecords.withLeaderChangeMessage(
-                    this.nextOffset, 
-                    currentTimeMs, 
-                    this.epoch, 
-                    buffer, 
-                    leaderChangeMessage
-                );
-                completed.add(new CompletedBatch<>(
-                    nextOffset,
-                    1,
-                    data,
-                    memoryPool,
-                    buffer
-                ));
-                nextOffset += 1;
-            } else {
-                throw new IllegalStateException("Could not allocate buffer for 
the leader change record.");
-            }
+            completed.add(new CompletedBatch<>(
+                nextOffset,
+                1,
+                supplier.get(),
+                memoryPool,
+                buffer
+            ));
+            nextOffset += 1;
         } finally {
             appendLock.unlock();
         }
     }
 
+    /**
+     * Append a {@link LeaderChangeMessage} record to the batch
+     *
+     * @param @LeaderChangeMessage The message to append
+     * @param @currentTimeMs The timestamp of message generation
+     * @throws IllegalStateException on failure to allocate a buffer for the 
record
+     */
+    public void appendLeaderChangeMessage(
+        LeaderChangeMessage leaderChangeMessage,
+        long currentTimeMs
+    ) {
+        ByteBuffer buffer = memoryPool.tryAllocate(256);
+        if (buffer != null) {
+            appendControlMessage(
+                () -> MemoryRecords.withLeaderChangeMessage(
+                    this.nextOffset,
+                    currentTimeMs,
+                    this.epoch,
+                    buffer,
+                    leaderChangeMessage),
+                buffer);
+        } else {
+            throw new IllegalStateException("Could not allocate buffer for the 
leader change record.");
+        }
+    }
+
+
+    /**
+     * Append a {@link SnapshotHeaderRecord} record to the batch
+     *
+     * @param @SnapshotHeaderRecord The message to append
+     * @throws IllegalStateException on failure to allocate a buffer for the 
record
+     */
+    public void appendSnapshotHeaderMessage(
+        SnapshotHeaderRecord snapshotHeaderRecord,
+        long currentTimeMs
+    ) {
+        ByteBuffer buffer = memoryPool.tryAllocate(256);
+        if (buffer != null) {
+            appendControlMessage(
+                () -> MemoryRecords.withSnapshotHeaderRecord(
+                    this.nextOffset,
+                    currentTimeMs,
+                    this.epoch,
+                    buffer,
+                    snapshotHeaderRecord),
+                buffer);
+        } else {
+            throw new IllegalStateException("Could not allocate buffer for the 
metadata snapshot header record.");
+        }
+    }
+
+    /**
+     * Append a {@link SnapshotFooterRecord} record to the batch
+     *
+     * @param @SnapshotFooterRecord The message to append
+     * @throws IllegalStateException on failure to allocate a buffer for the 
record
+     */
+    public void appendSnapshotFooterMessage(
+        SnapshotFooterRecord snapshotFooterRecord

Review comment:
       In two of this control messages we pass the `currentTimeMs` in this one 
we don't. It is nice to be consistent. I think that `appendLeaderChangeMessage` 
passed the `currentTimeMs` because it want to use the same time for the entire 
`poll` call.

##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
##########
@@ -106,7 +106,7 @@ public synchronized void handleCommit(BatchReader<Integer> 
reader) {
                     lastCommittedEpoch,
                     lastOffsetSnapshotted
                 );
-                Optional<SnapshotWriter<Integer>> snapshot = 
client.createSnapshot(lastCommittedOffset, lastCommittedEpoch);
+                Optional<SnapshotWriter<Integer>> snapshot = 
client.createSnapshot(lastCommittedOffset, lastCommittedEpoch, 
0/*KAFKA-12997*/);

Review comment:
       Same here. Let's remove the comment referencing a Jira.

##########
File path: 
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##########
@@ -55,13 +95,16 @@ public void testWritingSnapshot() throws Exception {
 
         context.advanceLocalLeaderHighWatermarkToLogEndOffset();
 
-        try (SnapshotWriter<String> snapshot = 
context.client.createSnapshot(id.offset - 1, id.epoch).get()) {
+        try (SnapshotWriter<String> snapshot = 
context.client.createSnapshot(id.offset - 1, id.epoch, 0).get()) {

Review comment:
       How about using a non-zero value for all of this cases?

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -204,35 +207,103 @@ private void completeCurrentBatch() {
         currentBatch = null;
     }
 
-    public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {
+    private void appendControlMessage(
+        Supplier<MemoryRecords> supplier,
+        ByteBuffer buffer
+    ) {
         appendLock.lock();
         try {
             forceDrain();
-            ByteBuffer buffer = memoryPool.tryAllocate(256);
-            if (buffer != null) {
-                MemoryRecords data = MemoryRecords.withLeaderChangeMessage(
-                    this.nextOffset, 
-                    currentTimeMs, 
-                    this.epoch, 
-                    buffer, 
-                    leaderChangeMessage
-                );
-                completed.add(new CompletedBatch<>(
-                    nextOffset,
-                    1,
-                    data,
-                    memoryPool,
-                    buffer
-                ));
-                nextOffset += 1;
-            } else {
-                throw new IllegalStateException("Could not allocate buffer for 
the leader change record.");
-            }
+            completed.add(new CompletedBatch<>(
+                nextOffset,
+                1,
+                supplier.get(),
+                memoryPool,
+                buffer
+            ));
+            nextOffset += 1;
         } finally {
             appendLock.unlock();
         }
     }
 
+    /**
+     * Append a {@link LeaderChangeMessage} record to the batch
+     *
+     * @param @LeaderChangeMessage The message to append
+     * @param @currentTimeMs The timestamp of message generation
+     * @throws IllegalStateException on failure to allocate a buffer for the 
record
+     */
+    public void appendLeaderChangeMessage(
+        LeaderChangeMessage leaderChangeMessage,
+        long currentTimeMs
+    ) {
+        ByteBuffer buffer = memoryPool.tryAllocate(256);

Review comment:
       This is an existing bug but it looks like we never release 
(`MemoryPool::release`) if there is an exception before adding to `completed`.

##########
File path: 
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##########
@@ -151,6 +194,57 @@ public void testAppendToFrozenSnapshot() throws Exception {
         );
     }
 
+    private int validateDelimiters(
+        RawSnapshotReader snapshot,
+        long lastContainedLogTime
+    ) {
+        assertNotEquals(0, snapshot.sizeInBytes());
+
+        int countRecords = 0;
+
+        Iterator<RecordBatch> recordBatches = 
Utils.covariantCast(snapshot.records().batchIterator());
+
+        assertEquals(Boolean.TRUE, recordBatches.hasNext());
+        RecordBatch batch = recordBatches.next();
+
+        Iterator<Record> records = batch.streamingIterator(new 
GrowableBufferSupplier());
+
+        // Verify existence of the header record
+        assertEquals(Boolean.TRUE, batch.isControlBatch());
+        assertEquals(Boolean.TRUE, records.hasNext());
+        Record record = records.next();
+        countRecords += 1;
+
+        SnapshotHeaderRecord headerRecord = 
ControlRecordUtils.deserializedSnapshotHeaderRecord(record);
+        assertEquals(headerRecord.version(), 
ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION);
+        assertEquals(headerRecord.lastContainedLogTime(), 
lastContainedLogTime);
+
+        assertEquals(Boolean.FALSE, records.hasNext());
+
+        // Loop over remaining records
+        while (recordBatches.hasNext()) {
+            batch = recordBatches.next();
+            records = batch.streamingIterator(new GrowableBufferSupplier());
+
+            while (records.hasNext()) {
+                countRecords += 1;
+                record = records.next();
+            }
+        }
+
+        // Verify existence of the footer record
+        assertEquals(Boolean.TRUE, batch.isControlBatch());
+
+        SnapshotFooterRecord footerRecord = 
ControlRecordUtils.deserializedSnapshotFooterRecord(record);
+        assertEquals(footerRecord.version(), 
ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION);
+
+        // Verify there is nothing past the footer
+        assertEquals(Boolean.FALSE, records.hasNext());
+        assertEquals(Boolean.FALSE, recordBatches.hasNext());

Review comment:
       These two checks will always pass because of the `while` loops above.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
##########
@@ -26,19 +28,49 @@
  */
 public class ControlRecordUtils {
 
-    public static final short LEADER_CHANGE_SCHEMA_VERSION = new 
LeaderChangeMessage().highestSupportedVersion();
+    public static final short LEADER_CHANGE_SCHEMA_HIGHEST_VERSION = new 
LeaderChangeMessage().highestSupportedVersion();
+    public static final short SNAPSHOT_HEADER_HIGHEST_VERSION = new 
SnapshotHeaderRecord().highestSupportedVersion();
+    public static final short SNAPSHOT_FOOTER_HIGHEST_VERSION = new 
SnapshotFooterRecord().highestSupportedVersion();
 
     public static LeaderChangeMessage deserializeLeaderChangeMessage(Record 
record) {
         ControlRecordType recordType = ControlRecordType.parse(record.key());
         if (recordType != ControlRecordType.LEADER_CHANGE) {
             throw new IllegalArgumentException(
-                "Expected LEADER_CHANGE control record type(3), but found " + 
recordType.toString());
+                "Expected LEADER_CHANGE control record type(2), but found " + 
recordType.toString());
         }
         return deserializeLeaderChangeMessage(record.value().duplicate());
     }
 
     public static LeaderChangeMessage 
deserializeLeaderChangeMessage(ByteBuffer data) {

Review comment:
       Unrelated to this PR but should we file a Jira to change this name to 
`LeaderChangeRecord` to make it consistent with the rest of the values encoded 
in the metadata log and snapshot? I think changing the name of this type is 
backwards compatible. cc @hachikuji 

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -345,7 +345,8 @@ void createSnapshotGenerator(long committedOffset, int 
committedEpoch) {
             }
             Optional<SnapshotWriter<ApiMessageAndVersion>> writer = 
raftClient.createSnapshot(
                 committedOffset,
-                committedEpoch
+                committedEpoch,
+                0/*KAFKA-12997*/

Review comment:
       We normally don't mention Jira in the code. If I want the reviewer to be 
aware of a Jira, I link to it through a PR comment.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -654,14 +657,69 @@ private static void writeLeaderChangeMessage(ByteBuffer 
buffer,
                                                  long timestamp,
                                                  int leaderEpoch,
                                                  LeaderChangeMessage 
leaderChangeMessage) {
+        try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+            buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+            TimestampType.CREATE_TIME, initialOffset, timestamp,
+            RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
+            false, true, leaderEpoch, buffer.capacity())
+        ) {
+            builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+        }
+    }
+
+    public static MemoryRecords withSnapshotHeaderRecord(
+        long initialOffset,
+        long timestamp,
+        int leaderEpoch,
+        ByteBuffer buffer,
+        SnapshotHeaderRecord snapshotHeaderRecord
+    ) {
+        writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, 
leaderEpoch, snapshotHeaderRecord);
+        buffer.flip();
+        return MemoryRecords.readableRecords(buffer);
+    }
+
+    private static void writeSnapshotHeaderRecord(ByteBuffer buffer,
+        long initialOffset,
+        long timestamp,
+        int leaderEpoch,
+        SnapshotHeaderRecord snapshotHeaderRecord
+    ) {
+        try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+            buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+            TimestampType.CREATE_TIME, initialOffset, timestamp,
+            RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
+            false, true, leaderEpoch, buffer.capacity())
+        ) {
+            builder.appendSnapshotHeaderMessage(timestamp, 
snapshotHeaderRecord);
+        }
+    }
+
+    public static MemoryRecords withSnapshotFooterRecord(
+        long initialOffset,
+        long timestamp,
+        int leaderEpoch,
+        ByteBuffer buffer,
+        SnapshotFooterRecord snapshotFooterRecord
+    ) {
+        writeSnapshotFooterRecord(buffer, initialOffset, timestamp, 
leaderEpoch, snapshotFooterRecord);
+        buffer.flip();
+        return MemoryRecords.readableRecords(buffer);
+    }
+
+    private static void writeSnapshotFooterRecord(ByteBuffer buffer,
+        long initialOffset,
+        long timestamp,
+        int leaderEpoch,
+        SnapshotFooterRecord snapshotFooterRecord
+    ) {
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
             buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
             TimestampType.CREATE_TIME, initialOffset, timestamp,
             RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
             false, true, leaderEpoch, buffer.capacity()
         );
-        builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+        builder.appendSnapshotFooterMessage(timestamp, snapshotFooterRecord);
         builder.close();

Review comment:
       We should use Java's try-with-resources here.

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##########
@@ -100,4 +116,21 @@ public void close() {
             new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, 
maxBatchSize)
         );
     }
+
+    /**
+     * Returns the next non-control Batch
+     */
+    private Optional<Batch<T>> nextBatch() {
+        while (iterator.hasNext()) {
+            Batch<T> batch = iterator.next();
+
+            if (batch.records().isEmpty()) {
+                continue;
+            } else {
+                return Optional.of(batch);
+            }

Review comment:
       ```suggestion
               if (!batch.records().isEmpty()) {
                   return Optional.of(batch);
               }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to