This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1bfa4cd17be KAFKA-10864 Convert end txn marker schema to use
auto-generated protocol (#9766)
1bfa4cd17be is described below
commit 1bfa4cd17be3ad3dd6e8b97dd0a2c9f2d43c89aa
Author: dengziming <[email protected]>
AuthorDate: Wed Mar 5 15:47:02 2025 +0800
KAFKA-10864 Convert end txn marker schema to use auto-generated protocol
(#9766)
1. Convert end txn marker schema to use auto-generated
protocol`EndTxnMarker`
2. substitute `CURRENT_END_TXN_MARKER_VALUE_SIZE` with
an`endTnxMarkerValueSize` method since the size is accumulated from
`EndTxnMarker`.
3. add buffer to `EndTransactionMarker` to avoid twice compute from
`serializeValue` and `endTnxMarkerValueSize`.
4. flexibleVersions is set to none.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/common/record/EndTransactionMarker.java | 58 +++++---------
.../apache/kafka/common/record/MemoryRecords.java | 3 +-
.../resources/common/message/EndTxnMarker.json | 26 +++++++
.../common/record/EndTransactionMarkerTest.java | 89 ++++++++++++++++++++--
4 files changed, 130 insertions(+), 46 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
index 9e7225c2110..92d53dc2e4d 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java
@@ -17,10 +17,9 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.InvalidRecordException;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.message.EndTxnMarker;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,23 +34,16 @@ import java.nio.ByteBuffer;
public class EndTransactionMarker {
private static final Logger log =
LoggerFactory.getLogger(EndTransactionMarker.class);
- private static final short CURRENT_END_TXN_MARKER_VERSION = 0;
- private static final Schema END_TXN_MARKER_SCHEMA_VERSION_V0 = new Schema(
- new Field("version", Type.INT16),
- new Field("coordinator_epoch", Type.INT32));
- static final int CURRENT_END_TXN_MARKER_VALUE_SIZE = 6;
- static final int CURRENT_END_TXN_SCHEMA_RECORD_SIZE =
DefaultRecord.sizeInBytes(0, 0L,
- ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE,
- EndTransactionMarker.CURRENT_END_TXN_MARKER_VALUE_SIZE,
- Record.EMPTY_HEADERS);
-
private final ControlRecordType type;
private final int coordinatorEpoch;
+ private final ByteBuffer buffer;
public EndTransactionMarker(ControlRecordType type, int coordinatorEpoch) {
ensureTransactionMarkerControlType(type);
this.type = type;
this.coordinatorEpoch = coordinatorEpoch;
+ EndTxnMarker marker = new
EndTxnMarker().setCoordinatorEpoch(coordinatorEpoch);
+ this.buffer =
MessageUtil.toVersionPrefixedByteBuffer(EndTxnMarker.HIGHEST_SUPPORTED_VERSION,
marker);
}
public int coordinatorEpoch() {
@@ -62,19 +54,8 @@ public class EndTransactionMarker {
return type;
}
- private Struct buildRecordValue() {
- Struct struct = new Struct(END_TXN_MARKER_SCHEMA_VERSION_V0);
- struct.set("version", CURRENT_END_TXN_MARKER_VERSION);
- struct.set("coordinator_epoch", coordinatorEpoch);
- return struct;
- }
-
public ByteBuffer serializeValue() {
- Struct valueStruct = buildRecordValue();
- ByteBuffer value = ByteBuffer.allocate(valueStruct.sizeOf());
- valueStruct.writeTo(value);
- value.flip();
- return value;
+ return buffer.duplicate();
}
@Override
@@ -95,7 +76,7 @@ public class EndTransactionMarker {
private static void ensureTransactionMarkerControlType(ControlRecordType
type) {
if (type != ControlRecordType.COMMIT && type !=
ControlRecordType.ABORT)
- throw new IllegalArgumentException("Invalid control record type
for end transaction marker" + type);
+ throw new IllegalArgumentException("Invalid control record type
for end transaction marker " + type);
}
public static EndTransactionMarker deserialize(Record record) {
@@ -103,24 +84,27 @@ public class EndTransactionMarker {
return deserializeValue(type, record.value());
}
+ // Visible for testing
static EndTransactionMarker deserializeValue(ControlRecordType type,
ByteBuffer value) {
ensureTransactionMarkerControlType(type);
- if (value.remaining() < CURRENT_END_TXN_MARKER_VALUE_SIZE)
- throw new InvalidRecordException("Invalid value size found for end
transaction marker. Must have " +
- "at least " + CURRENT_END_TXN_MARKER_VALUE_SIZE + " bytes,
but found only " + value.remaining());
-
- short version = value.getShort(0);
- if (version < 0)
+ short version = value.getShort();
+ if (version < EndTxnMarker.LOWEST_SUPPORTED_VERSION)
throw new InvalidRecordException("Invalid version found for end
transaction marker: " + version +
". May indicate data corruption");
- if (version > CURRENT_END_TXN_MARKER_VERSION)
+ if (version > EndTxnMarker.HIGHEST_SUPPORTED_VERSION)
log.debug("Received end transaction marker value version {}.
Parsing as version {}", version,
- CURRENT_END_TXN_MARKER_VERSION);
+ EndTxnMarker.HIGHEST_SUPPORTED_VERSION);
+ EndTxnMarker marker = new EndTxnMarker(new ByteBufferAccessor(value),
EndTxnMarker.HIGHEST_SUPPORTED_VERSION);
+ return new EndTransactionMarker(type, marker.coordinatorEpoch());
+ }
- int coordinatorEpoch = value.getInt(2);
- return new EndTransactionMarker(type, coordinatorEpoch);
+ public int endTxnMarkerValueSize() {
+ return DefaultRecord.sizeInBytes(0, 0L,
+ ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE,
+ buffer.remaining(),
+ Record.EMPTY_HEADERS);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index c06188edf22..c2fd231e4b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -678,8 +678,7 @@ public class MemoryRecords extends AbstractRecords {
public static MemoryRecords withEndTransactionMarker(long initialOffset,
long timestamp, int partitionLeaderEpoch,
long producerId,
short producerEpoch,
EndTransactionMarker
marker) {
- int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD +
- EndTransactionMarker.CURRENT_END_TXN_SCHEMA_RECORD_SIZE;
+ int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD +
marker.endTxnMarkerValueSize();
ByteBuffer buffer = ByteBuffer.allocate(endTxnMarkerBatchSize);
writeEndTransactionalMarker(buffer, initialOffset, timestamp,
partitionLeaderEpoch, producerId,
producerEpoch, marker);
diff --git a/clients/src/main/resources/common/message/EndTxnMarker.json
b/clients/src/main/resources/common/message/EndTxnMarker.json
new file mode 100644
index 00000000000..f6e1e209ae9
--- /dev/null
+++ b/clients/src/main/resources/common/message/EndTxnMarker.json
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "type": "data",
+ "name": "EndTxnMarker",
+ "validVersions": "0",
+ "flexibleVersions": "none",
+ "fields": [
+ { "name": "CoordinatorEpoch", "type": "int32", "versions": "0+",
+ "about": "The coordinator epoch when appending the record"
+ }
+ ]
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
index 64224e00328..c9a3b3b10b7 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
@@ -17,16 +17,31 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.message.EndTxnMarker;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.utils.ByteUtils;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class EndTransactionMarkerTest {
+ // Old hard-coded schema, used to validate old hard-coded schema format is
exactly the same as new auto generated protocol format
+ private final Schema v0Schema = new Schema(
+ new Field("version", Type.INT16),
+ new Field("coordinator_epoch", Type.INT32));
+
+ private static final List<ControlRecordType> VALID_CONTROLLER_RECORD_TYPE
= Arrays.asList(ControlRecordType.COMMIT, ControlRecordType.ABORT);
+
@Test
public void testUnknownControlTypeNotAllowed() {
assertThrows(IllegalArgumentException.class,
@@ -40,19 +55,13 @@ public class EndTransactionMarkerTest {
}
@Test
- public void testIllegalNegativeVersion() {
+ public void testIllegalVersion() {
ByteBuffer buffer = ByteBuffer.allocate(2);
buffer.putShort((short) -1);
buffer.flip();
assertThrows(InvalidRecordException.class, () ->
EndTransactionMarker.deserializeValue(ControlRecordType.ABORT, buffer));
}
- @Test
- public void testNotEnoughBytes() {
- assertThrows(InvalidRecordException.class,
- () ->
EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT,
ByteBuffer.wrap(new byte[0])));
- }
-
@Test
public void testSerde() {
int coordinatorEpoch = 79;
@@ -73,4 +82,70 @@ public class EndTransactionMarkerTest {
EndTransactionMarker deserialized =
EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, buffer);
assertEquals(coordinatorEpoch, deserialized.coordinatorEpoch());
}
+
+ @Test
+ public void testSerializeAndDeserialize() {
+ for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) {
+ for (short version = EndTxnMarker.LOWEST_SUPPORTED_VERSION;
+ version <= EndTxnMarker.HIGHEST_SUPPORTED_VERSION; version++)
{
+ EndTransactionMarker marker = new EndTransactionMarker(type,
1);
+
+ ByteBuffer buffer = marker.serializeValue();
+ EndTransactionMarker deserializedMarker =
EndTransactionMarker.deserializeValue(type, buffer);
+ assertEquals(marker, deserializedMarker);
+ }
+ }
+ }
+
+ @Test
+ public void testEndTxnMarkerValueSize() {
+ for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) {
+ EndTransactionMarker marker = new EndTransactionMarker(type, 1);
+ int offsetSize = ByteUtils.sizeOfVarint(0);
+ int timestampSize = ByteUtils.sizeOfVarlong(0);
+ int keySize = ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE;
+ int valueSize = marker.serializeValue().remaining();
+ int headerSize =
ByteUtils.sizeOfVarint(Record.EMPTY_HEADERS.length);
+ int totalSize = 1 + offsetSize + timestampSize +
ByteUtils.sizeOfVarint(keySize) + keySize + ByteUtils.sizeOfVarint(valueSize) +
valueSize + headerSize;
+ assertEquals(ByteUtils.sizeOfVarint(totalSize) + totalSize,
marker.endTxnMarkerValueSize());
+ }
+ }
+
+ @Test
+ public void testBackwardDeserializeCompatibility() {
+ int coordinatorEpoch = 10;
+ for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) {
+ for (short version = EndTxnMarker.LOWEST_SUPPORTED_VERSION;
+ version <= EndTxnMarker.HIGHEST_SUPPORTED_VERSION; version++)
{
+
+ Struct struct = new Struct(v0Schema);
+ struct.set("version", version);
+ struct.set("coordinator_epoch", coordinatorEpoch);
+
+ ByteBuffer oldVersionBuffer =
ByteBuffer.allocate(struct.sizeOf());
+ struct.writeTo(oldVersionBuffer);
+ oldVersionBuffer.flip();
+
+ EndTransactionMarker deserializedMarker =
EndTransactionMarker.deserializeValue(type, oldVersionBuffer);
+ assertEquals(coordinatorEpoch,
deserializedMarker.coordinatorEpoch());
+ assertEquals(type, deserializedMarker.controlType());
+ }
+ }
+ }
+
+ @Test
+ public void testForwardDeserializeCompatibility() {
+ int coordinatorEpoch = 10;
+ for (ControlRecordType type: VALID_CONTROLLER_RECORD_TYPE) {
+ for (short version = EndTxnMarker.LOWEST_SUPPORTED_VERSION;
+ version <= EndTxnMarker.HIGHEST_SUPPORTED_VERSION; version++)
{
+ EndTransactionMarker marker = new EndTransactionMarker(type,
coordinatorEpoch);
+ ByteBuffer newVersionBuffer = marker.serializeValue();
+
+ Struct struct = v0Schema.read(newVersionBuffer);
+ EndTransactionMarker deserializedMarker = new
EndTransactionMarker(type, struct.getInt("coordinator_epoch"));
+ assertEquals(marker, deserializedMarker);
+ }
+ }
+ }
}