junrao commented on code in PR #20614:
URL: https://github.com/apache/kafka/pull/20614#discussion_r2422175741


##########
clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java:
##########
@@ -916,40 +916,71 @@ public String documentation() {
         }
     };
 
-    public static final DocumentedType COMPACT_RECORDS = new DocumentedType() {
-        @Override
-        public boolean isNullable() {
-            return true;
-        }
+    public static final DocumentedType RECORDS = new DocumentedType() {
 
         @Override
         public void write(ByteBuffer buffer, Object o) {
-            if (o == null) {
-                COMPACT_NULLABLE_BYTES.write(buffer, null);
-            } else if (o instanceof MemoryRecords) {
+            if (o instanceof MemoryRecords) {
                 MemoryRecords records = (MemoryRecords) o;
-                COMPACT_NULLABLE_BYTES.write(buffer, 
records.buffer().duplicate());
+                BYTES.write(buffer, records.buffer().duplicate());
             } else {
                 throw new IllegalArgumentException("Unexpected record type: " 
+ o.getClass());
             }
         }
 
         @Override
         public MemoryRecords read(ByteBuffer buffer) {
-            ByteBuffer recordsBuffer = (ByteBuffer) 
COMPACT_NULLABLE_BYTES.read(buffer);
-            if (recordsBuffer == null) {
-                return null;
-            } else {
-                return MemoryRecords.readableRecords(recordsBuffer);
-            }
+            ByteBuffer recordsBuffer = (ByteBuffer) BYTES.read(buffer);
+            return MemoryRecords.readableRecords(recordsBuffer);
         }
 
         @Override
         public int sizeOf(Object o) {
-            if (o == null) {
-                return 1;
+            BaseRecords records = (BaseRecords) o;
+            return 4 + records.sizeInBytes();
+        }
+
+        @Override
+        public String typeName() {
+            return "RECORDS";
+        }
+
+        @Override
+        public BaseRecords validate(Object item) {
+            if (item instanceof BaseRecords)

Review Comment:
   This is an existing issue. Since RECORDS currently only supports 
MemoryRecords, we probably should validate that too to be consistent.



##########
clients/src/main/resources/common/message/README.md:
##########
@@ -102,7 +104,7 @@ Guide](https://kafka.apache.org/protocol.html).
 Nullable Fields
 ---------------
 Booleans, ints, and floats can never be null.  However, fields that are 
strings,
-bytes, uuid, records, or arrays may optionally be "nullable".  When a field is 
+bytes, records, struct, or arrays may optionally be "nullable".  When a field 
is 

Review Comment:
   Could we add uuid to the first sentence?



##########
clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java:
##########
@@ -66,6 +66,8 @@ private static void schemaToBnfHtml(Schema schema, 
StringBuilder b, int indentSi
                 b.append(indentStr);
                 b.append(entry.getKey());
                 b.append(" => ");
+                b.append(((Schema) entry.getValue()).typeName());

Review Comment:
   How does NullableSchema fit here?



##########
clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java:
##########
@@ -66,6 +66,8 @@ private static void schemaToBnfHtml(Schema schema, 
StringBuilder b, int indentSi
                 b.append(indentStr);
                 b.append(entry.getKey());
                 b.append(" => ");
+                b.append(((Schema) entry.getValue()).typeName());
+                b.append(" ");

Review Comment:
   Could you post a few request/response to see how the generated html look 
like?



##########
clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java:
##########
@@ -342,7 +342,7 @@ private static boolean retainsBufferReference(Schema 
schema) {
         Schema.Visitor detector = new Schema.Visitor() {
             @Override
             public void visit(Type field) {
-                if (field == BYTES || field == NULLABLE_BYTES || field == 
RECORDS ||
+                if (field == BYTES || field == NULLABLE_BYTES || field == 
NULLABLE_RECORDS ||

Review Comment:
   This should include all 4 variants of RECORDS.



##########
generator/src/main/java/org/apache/kafka/message/SchemaGenerator.java:
##########
@@ -317,8 +317,11 @@ private String fieldTypeToSchemaType(FieldType type,
                         fieldTypeToSchemaType(arrayType.elementType(), false, 
version, fieldFlexibleVersions, false));
             }
         } else if (type.isStruct()) {
-            return String.format("%s.SCHEMA_%d", type,
+            if (nullable)
+                
headerGenerator.addImport(MessageGenerator.NULLABLE_SCHEMA_CLASS);
+            String schemaType = String.format("%s.SCHEMA_%d", type,
                 floorVersion(type.toString(), version));
+            return nullable ? String.format("new NullableSchema(%s)", 
schemaType) : schemaType;

Review Comment:
   This generates the following code. Should we just change Assignment.SCHEMA_0 
to be NullableSchema?
   
    `           new Field("assignment", new 
NullableSchema(Assignment.SCHEMA_0), "null if not provided; the assignment 
otherwise."),`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to