This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1bfa4cd17be KAFKA-10864 Convert end txn marker schema to use 
auto-generated protocol (#9766)
1bfa4cd17be is described below

commit 1bfa4cd17be3ad3dd6e8b97dd0a2c9f2d43c89aa
Author: dengziming <[email protected]>
AuthorDate: Wed Mar 5 15:47:02 2025 +0800

    KAFKA-10864 Convert end txn marker schema to use auto-generated protocol 
(#9766)
    
    1. Convert end txn marker schema to use auto-generated 
protocol`EndTxnMarker`
    2. substitute `CURRENT_END_TXN_MARKER_VALUE_SIZE` with 
an`endTnxMarkerValueSize` method since the size is accumulated from 
`EndTxnMarker`.
    3. add buffer to `EndTransactionMarker` to avoid twice compute from 
`serializeValue` and `endTnxMarkerValueSize`.
    4. flexibleVersions is set to none.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/common/record/EndTransactionMarker.java  | 58 +++++---------
 .../apache/kafka/common/record/MemoryRecords.java  |  3 +-
 .../resources/common/message/EndTxnMarker.json     | 26 +++++++
 .../common/record/EndTransactionMarkerTest.java    | 89 ++++++++++++++++++++--
 4 files changed, 130 insertions(+), 46 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
 
b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
index 9e7225c2110..92d53dc2e4d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
@@ -17,10 +17,9 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.InvalidRecordException;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.message.EndTxnMarker;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,23 +34,16 @@ import java.nio.ByteBuffer;
 public class EndTransactionMarker {
     private static final Logger log = 
LoggerFactory.getLogger(EndTransactionMarker.class);
 
-    private static final short CURRENT_END_TXN_MARKER_VERSION = 0;
-    private static final Schema END_TXN_MARKER_SCHEMA_VERSION_V0 = new Schema(
-            new Field("version", Type.INT16),
-            new Field("coordinator_epoch", Type.INT32));
-    static final int CURRENT_END_TXN_MARKER_VALUE_SIZE = 6;
-    static final int CURRENT_END_TXN_SCHEMA_RECORD_SIZE = 
DefaultRecord.sizeInBytes(0, 0L,
-            ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE,
-            EndTransactionMarker.CURRENT_END_TXN_MARKER_VALUE_SIZE,
-            Record.EMPTY_HEADERS);
-
     private final ControlRecordType type;
     private final int coordinatorEpoch;
+    private final ByteBuffer buffer;
 
     public EndTransactionMarker(ControlRecordType type, int coordinatorEpoch) {
         ensureTransactionMarkerControlType(type);
         this.type = type;
         this.coordinatorEpoch = coordinatorEpoch;
+        EndTxnMarker marker = new 
EndTxnMarker().setCoordinatorEpoch(coordinatorEpoch);
+        this.buffer = 
MessageUtil.toVersionPrefixedByteBuffer(EndTxnMarker.HIGHEST_SUPPORTED_VERSION, 
marker);
     }
 
     public int coordinatorEpoch() {
@@ -62,19 +54,8 @@ public class EndTransactionMarker {
         return type;
     }
 
-    private Struct buildRecordValue() {
-        Struct struct = new Struct(END_TXN_MARKER_SCHEMA_VERSION_V0);
-        struct.set("version", CURRENT_END_TXN_MARKER_VERSION);
-        struct.set("coordinator_epoch", coordinatorEpoch);
-        return struct;
-    }
-
     public ByteBuffer serializeValue() {
-        Struct valueStruct = buildRecordValue();
-        ByteBuffer value = ByteBuffer.allocate(valueStruct.sizeOf());
-        valueStruct.writeTo(value);
-        value.flip();
-        return value;
+        return buffer.duplicate();
     }
 
     @Override
@@ -95,7 +76,7 @@ public class EndTransactionMarker {
 
     private static void ensureTransactionMarkerControlType(ControlRecordType 
type) {
         if (type != ControlRecordType.COMMIT && type != 
ControlRecordType.ABORT)
-            throw new IllegalArgumentException("Invalid control record type 
for end transaction marker" + type);
+            throw new IllegalArgumentException("Invalid control record type 
for end transaction marker " + type);
     }
 
     public static EndTransactionMarker deserialize(Record record) {
@@ -103,24 +84,27 @@ public class EndTransactionMarker {
         return deserializeValue(type, record.value());
     }
 
+    // Visible for testing
     static EndTransactionMarker deserializeValue(ControlRecordType type, 
ByteBuffer value) {
         ensureTransactionMarkerControlType(type);
 
-        if (value.remaining() < CURRENT_END_TXN_MARKER_VALUE_SIZE)
-            throw new InvalidRecordException("Invalid value size found for end 
transaction marker. Must have " +
-                    "at least " + CURRENT_END_TXN_MARKER_VALUE_SIZE + " bytes, 
but found only " + value.remaining());
-
-        short version = value.getShort(0);
-        if (version < 0)
+        short version = value.getShort();
+        if (version < EndTxnMarker.LOWEST_SUPPORTED_VERSION)
             throw new InvalidRecordException("Invalid version found for end 
transaction marker: " + version +
                     ". May indicate data corruption");
 
-        if (version > CURRENT_END_TXN_MARKER_VERSION)
+        if (version > EndTxnMarker.HIGHEST_SUPPORTED_VERSION)
             log.debug("Received end transaction marker value version {}. 
Parsing as version {}", version,
-                    CURRENT_END_TXN_MARKER_VERSION);
+                    EndTxnMarker.HIGHEST_SUPPORTED_VERSION);
+        EndTxnMarker marker = new EndTxnMarker(new ByteBufferAccessor(value), 
EndTxnMarker.HIGHEST_SUPPORTED_VERSION);
+        return new EndTransactionMarker(type, marker.coordinatorEpoch());
+    }
 
-        int coordinatorEpoch = value.getInt(2);
-        return new EndTransactionMarker(type, coordinatorEpoch);
+    public int endTxnMarkerValueSize() {
+        return DefaultRecord.sizeInBytes(0, 0L,
+                ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE,
+                buffer.remaining(),
+                Record.EMPTY_HEADERS);
     }
 
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index c06188edf22..c2fd231e4b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -678,8 +678,7 @@ public class MemoryRecords extends AbstractRecords {
     public static MemoryRecords withEndTransactionMarker(long initialOffset, 
long timestamp, int partitionLeaderEpoch,
                                                          long producerId, 
short producerEpoch,
                                                          EndTransactionMarker 
marker) {
-        int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD +
-                EndTransactionMarker.CURRENT_END_TXN_SCHEMA_RECORD_SIZE;
+        int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD + 
marker.endTxnMarkerValueSize();
         ByteBuffer buffer = ByteBuffer.allocate(endTxnMarkerBatchSize);
         writeEndTransactionalMarker(buffer, initialOffset, timestamp, 
partitionLeaderEpoch, producerId,
                 producerEpoch, marker);
diff --git a/clients/src/main/resources/common/message/EndTxnMarker.json 
b/clients/src/main/resources/common/message/EndTxnMarker.json
new file mode 100644
index 00000000000..f6e1e209ae9
--- /dev/null
+++ b/clients/src/main/resources/common/message/EndTxnMarker.json
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "EndTxnMarker",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+    { "name": "CoordinatorEpoch", "type": "int32", "versions": "0+",
+      "about": "The coordinator epoch when appending the record"
+    }
+  ]
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
index 64224e00328..c9a3b3b10b7 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
@@ -17,16 +17,31 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.message.EndTxnMarker;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.utils.ByteUtils;
 
 import org.junit.jupiter.api.Test;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class EndTransactionMarkerTest {
 
+    // Old hard-coded schema, used to validate old hard-coded schema format is 
exactly the same as new auto generated protocol format
+    private final Schema v0Schema = new Schema(
+            new Field("version", Type.INT16),
+            new Field("coordinator_epoch", Type.INT32));
+
+    private static final List<ControlRecordType> VALID_CONTROLLER_RECORD_TYPE 
= Arrays.asList(ControlRecordType.COMMIT, ControlRecordType.ABORT);
+
     @Test
     public void testUnknownControlTypeNotAllowed() {
         assertThrows(IllegalArgumentException.class,
@@ -40,19 +55,13 @@ public class EndTransactionMarkerTest {
     }
 
     @Test
-    public void testIllegalNegativeVersion() {
+    public void testIllegalVersion() {
         ByteBuffer buffer = ByteBuffer.allocate(2);
         buffer.putShort((short) -1);
         buffer.flip();
         assertThrows(InvalidRecordException.class, () -> 
EndTransactionMarker.deserializeValue(ControlRecordType.ABORT, buffer));
     }
 
-    @Test
-    public void testNotEnoughBytes() {
-        assertThrows(InvalidRecordException.class,
-            () -> 
EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, 
ByteBuffer.wrap(new byte[0])));
-    }
-
     @Test
     public void testSerde() {
         int coordinatorEpoch = 79;
@@ -73,4 +82,70 @@ public class EndTransactionMarkerTest {
         EndTransactionMarker deserialized = 
EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, buffer);
         assertEquals(coordinatorEpoch, deserialized.coordinatorEpoch());
     }
+
+    @Test
+    public void testSerializeAndDeserialize() {
+        for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) {
+            for (short version = EndTxnMarker.LOWEST_SUPPORTED_VERSION;
+                 version <= EndTxnMarker.HIGHEST_SUPPORTED_VERSION; version++) 
{
+                EndTransactionMarker marker = new EndTransactionMarker(type, 
1);
+
+                ByteBuffer buffer = marker.serializeValue();
+                EndTransactionMarker deserializedMarker = 
EndTransactionMarker.deserializeValue(type, buffer);
+                assertEquals(marker, deserializedMarker);
+            }
+        }
+    }
+
+    @Test
+    public void testEndTxnMarkerValueSize() {
+        for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) {
+            EndTransactionMarker marker = new EndTransactionMarker(type, 1);
+            int offsetSize = ByteUtils.sizeOfVarint(0);
+            int timestampSize = ByteUtils.sizeOfVarlong(0);
+            int keySize = ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE;
+            int valueSize = marker.serializeValue().remaining();
+            int headerSize = 
ByteUtils.sizeOfVarint(Record.EMPTY_HEADERS.length);
+            int totalSize = 1 + offsetSize + timestampSize + 
ByteUtils.sizeOfVarint(keySize) + keySize + ByteUtils.sizeOfVarint(valueSize) + 
valueSize + headerSize;
+            assertEquals(ByteUtils.sizeOfVarint(totalSize) +  totalSize, 
marker.endTxnMarkerValueSize());
+        }
+    }
+
+    @Test
+    public void testBackwardDeserializeCompatibility() {
+        int coordinatorEpoch = 10;
+        for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) {
+            for (short version = EndTxnMarker.LOWEST_SUPPORTED_VERSION;
+                 version <= EndTxnMarker.HIGHEST_SUPPORTED_VERSION; version++) 
{
+
+                Struct struct = new Struct(v0Schema);
+                struct.set("version", version);
+                struct.set("coordinator_epoch", coordinatorEpoch);
+
+                ByteBuffer oldVersionBuffer = 
ByteBuffer.allocate(struct.sizeOf());
+                struct.writeTo(oldVersionBuffer);
+                oldVersionBuffer.flip();
+
+                EndTransactionMarker deserializedMarker = 
EndTransactionMarker.deserializeValue(type, oldVersionBuffer);
+                assertEquals(coordinatorEpoch, 
deserializedMarker.coordinatorEpoch());
+                assertEquals(type, deserializedMarker.controlType());
+            }
+        }
+    }
+
+    @Test
+    public void testForwardDeserializeCompatibility() {
+        int coordinatorEpoch = 10;
+        for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) {
+            for (short version = EndTxnMarker.LOWEST_SUPPORTED_VERSION;
+                 version <= EndTxnMarker.HIGHEST_SUPPORTED_VERSION; version++) 
{
+                EndTransactionMarker marker = new EndTransactionMarker(type, 
coordinatorEpoch);
+                ByteBuffer newVersionBuffer = marker.serializeValue();
+
+                Struct struct = v0Schema.read(newVersionBuffer);
+                EndTransactionMarker deserializedMarker = new 
EndTransactionMarker(type, struct.getInt("coordinator_epoch"));
+                assertEquals(marker, deserializedMarker);
+            }
+        }
+    }
 }

Reply via email to