junrao commented on code in PR #20614:
URL: https://github.com/apache/kafka/pull/20614#discussion_r2422175741
##########
clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java:
##########
@@ -916,40 +916,71 @@ public String documentation() {
}
};
- public static final DocumentedType COMPACT_RECORDS = new DocumentedType() {
- @Override
- public boolean isNullable() {
- return true;
- }
+ public static final DocumentedType RECORDS = new DocumentedType() {
@Override
public void write(ByteBuffer buffer, Object o) {
- if (o == null) {
- COMPACT_NULLABLE_BYTES.write(buffer, null);
- } else if (o instanceof MemoryRecords) {
+ if (o instanceof MemoryRecords) {
MemoryRecords records = (MemoryRecords) o;
- COMPACT_NULLABLE_BYTES.write(buffer,
records.buffer().duplicate());
+ BYTES.write(buffer, records.buffer().duplicate());
} else {
throw new IllegalArgumentException("Unexpected record type: "
+ o.getClass());
}
}
@Override
public MemoryRecords read(ByteBuffer buffer) {
- ByteBuffer recordsBuffer = (ByteBuffer)
COMPACT_NULLABLE_BYTES.read(buffer);
- if (recordsBuffer == null) {
- return null;
- } else {
- return MemoryRecords.readableRecords(recordsBuffer);
- }
+ ByteBuffer recordsBuffer = (ByteBuffer) BYTES.read(buffer);
+ return MemoryRecords.readableRecords(recordsBuffer);
}
@Override
public int sizeOf(Object o) {
- if (o == null) {
- return 1;
+ BaseRecords records = (BaseRecords) o;
+ return 4 + records.sizeInBytes();
+ }
+
+ @Override
+ public String typeName() {
+ return "RECORDS";
+ }
+
+ @Override
+ public BaseRecords validate(Object item) {
+ if (item instanceof BaseRecords)
Review Comment:
This is an existing issue. Since RECORDS currently only supports
MemoryRecords, we probably should validate that too to be consistent.
##########
clients/src/main/resources/common/message/README.md:
##########
@@ -102,7 +104,7 @@ Guide](https://kafka.apache.org/protocol.html).
Nullable Fields
---------------
Booleans, ints, and floats can never be null. However, fields that are
strings,
-bytes, uuid, records, or arrays may optionally be "nullable". When a field is
+bytes, records, struct, or arrays may optionally be "nullable". When a field
is
Review Comment:
Could we add uuid to the first sentence?
##########
clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java:
##########
@@ -66,6 +66,8 @@ private static void schemaToBnfHtml(Schema schema,
StringBuilder b, int indentSi
b.append(indentStr);
b.append(entry.getKey());
b.append(" => ");
+ b.append(((Schema) entry.getValue()).typeName());
Review Comment:
How does NullableSchema fit here?
##########
clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java:
##########
@@ -66,6 +66,8 @@ private static void schemaToBnfHtml(Schema schema,
StringBuilder b, int indentSi
b.append(indentStr);
b.append(entry.getKey());
b.append(" => ");
+ b.append(((Schema) entry.getValue()).typeName());
+ b.append(" ");
Review Comment:
Could you post a few request/response to see how the generated html look
like?
##########
clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java:
##########
@@ -342,7 +342,7 @@ private static boolean retainsBufferReference(Schema
schema) {
Schema.Visitor detector = new Schema.Visitor() {
@Override
public void visit(Type field) {
- if (field == BYTES || field == NULLABLE_BYTES || field ==
RECORDS ||
+ if (field == BYTES || field == NULLABLE_BYTES || field ==
NULLABLE_RECORDS ||
Review Comment:
This should include all 4 variants of RECORDS.
##########
generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java:
##########
@@ -317,8 +317,11 @@ private String fieldTypeToSchemaType(FieldType type,
fieldTypeToSchemaType(arrayType.elementType(), false,
version, fieldFlexibleVersions, false));
}
} else if (type.isStruct()) {
- return String.format("%s.SCHEMA_%d", type,
+ if (nullable)
+
headerGenerator.addImport(MessageGenerator.NULLABLE_SCHEMA_CLASS);
+ String schemaType = String.format("%s.SCHEMA_%d", type,
floorVersion(type.toString(), version));
+ return nullable ? String.format("new NullableSchema(%s)",
schemaType) : schemaType;
Review Comment:
This generates the following code. Should we just change Assignment.SCHEMA_0
to be NullableSchema?
` new Field("assignment", new
NullableSchema(Assignment.SCHEMA_0), "null if not provided; the assignment
otherwise."),`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]