chia7712 commented on code in PR #19110:
URL: https://github.com/apache/kafka/pull/19110#discussion_r2637114847
##########
clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java:
##########
@@ -59,46 +58,45 @@ public enum ControlRecordType {
private static final Logger log =
LoggerFactory.getLogger(ControlRecordType.class);
- static final short CURRENT_CONTROL_RECORD_KEY_VERSION = 0;
- static final int CURRENT_CONTROL_RECORD_KEY_SIZE = 4;
- private static final Schema CONTROL_RECORD_KEY_SCHEMA_VERSION_V0 = new
Schema(
- new Field("version", Type.INT16),
- new Field("type", Type.INT16));
-
private final short type;
+ private final ByteBuffer buffer;
ControlRecordType(short type) {
this.type = type;
+ ControlRecordTypeSchema schema = new
ControlRecordTypeSchema().setType(type);
+ buffer =
MessageUtil.toVersionPrefixedByteBuffer(ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION,
schema);
}
public short type() {
return type;
}
- public Struct recordKey() {
+ public ByteBuffer recordKey() {
if (this == UNKNOWN)
throw new IllegalArgumentException("Cannot serialize UNKNOWN
control record type");
+ return buffer.duplicate();
+ }
- Struct struct = new Struct(CONTROL_RECORD_KEY_SCHEMA_VERSION_V0);
- struct.set("version", CURRENT_CONTROL_RECORD_KEY_VERSION);
- struct.set("type", type);
- return struct;
+ public int controlRecordKeySize() {
+ return buffer.remaining();
}
public static short parseTypeId(ByteBuffer key) {
Review Comment:
Could you make this private? Callers could use `ControlRecordType
parse(ByteBuffer key)` instead
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1699,8 +1699,8 @@ class KafkaApis(val requestChannel: RequestChannel,
skippedMarkers += 1
} else {
val controlRecordType = marker.transactionResult match {
- case TransactionResult.COMMIT => ControlRecordType.COMMIT
- case TransactionResult.ABORT => ControlRecordType.ABORT
+ case TransactionResult.COMMIT =>
org.apache.kafka.common.record.ControlRecordType.COMMIT
Review Comment:
Those changes are no longer necessary
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]