rreddy-22 commented on code in PR #20868:
URL: https://github.com/apache/kafka/pull/20868#discussion_r2525258135
##########
clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java:
##########
@@ -79,4 +110,55 @@ public void testGetErrorResponse() {
assertEquals(0, errorResponse.throttleTimeMs());
}
}
+
+ @Test
+ public void testTransactionVersion() {
+ // Test that TransactionVersion is set correctly and serialization
handles it properly.
+ List<WriteTxnMarkersRequest.TxnMarkerEntry> markersWithVersion =
Collections.singletonList(
+ new WriteTxnMarkersRequest.TxnMarkerEntry(
+ PRODUCER_ID, PRODUCER_EPOCH, COORDINATOR_EPOCH,
+ RESULT, Collections.singletonList(TOPIC_PARTITION), (short) 2)
+ );
+ WriteTxnMarkersRequest.Builder builder = new
WriteTxnMarkersRequest.Builder(markersWithVersion);
+
+ // Test version 2 - TransactionVersion should be included.
+ WriteTxnMarkersRequest requestV2 = builder.build((short) 2);
+ assertNotNull(requestV2);
+ assertEquals(1, requestV2.markers().size());
+ // Verify TransactionVersion is set to 2 in the request data.
+ assertEquals((byte) 2,
requestV2.data().markers().get(0).transactionVersion());
+ // Verify the request can be serialized for version 2
(TransactionVersion field included).
+ // This should not throw an exception.
+ requestV2.serialize();
+ int sizeV2 = requestV2.sizeInBytes();
+ // Verify TransactionVersion is set to 2 in the marker entry
(markers() reads the field for version >= 2).
+ // This is what the broker sees when receiving the request - markers()
is called on the partition leader side.
+ assertEquals((short) 2,
requestV2.markers().get(0).transactionVersion());
+
+ // Test version 1 - TransactionVersion should be omitted (ignorable
field).
+ WriteTxnMarkersRequest requestV1 = builder.build((short) 1);
+ assertNotNull(requestV1);
+ assertEquals(1, requestV1.markers().size());
+ // Verify TransactionVersion is still set to 2 in the request data
(even for version 1).
+ // This is what the coordinator has when building the request - data()
is used before serialization.
+ // The field value is preserved in the data, but will be omitted
during serialization.
+ assertEquals((byte) 2,
requestV1.data().markers().get(0).transactionVersion());
+ // Verify the request can be serialized for version 1
(TransactionVersion field omitted).
+ // This should not throw an exception even though TransactionVersion
is set to 2
+ // because the field is marked as ignorable.
+ requestV1.serialize();
+ int sizeV1 = requestV1.sizeInBytes();
+ // After serialization, verify TransactionVersion is 0 in the marker
entry for version 1 (field not read for version < 2).
+ // This is what the broker sees when receiving the request - markers()
is called on the partition leader side.
+ assertEquals((short) 0,
requestV1.markers().get(0).transactionVersion());
+
+ // Verify that version 2 is larger than version 1 because it includes
TransactionVersion field.
+ // TransactionVersion is int8 (1 byte), so version 2 should be at
least 1 byte larger.
+ // This check ensures that the serialization logic correctly
includes/excludes the field.
+ assertTrue(sizeV2 > sizeV1,
Review Comment:
done!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]