yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r620802248



##########
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
##########
@@ -77,21 +87,39 @@ public void configure(Map<String, ?> configs, boolean 
isKey) {
         }
     }
 
+    private void serializeNullIndexList(final DataOutputStream out, 
List<Inner> data) throws IOException {
+        List<Integer> nullIndexList = IntStream.range(0, data.size())
+                .filter(i -> data.get(i) == null)
+                .boxed().collect(Collectors.toList());
+        out.writeInt(nullIndexList.size());
+        for (int i : nullIndexList) out.writeInt(i);
+    }
+
     @Override
     public byte[] serialize(String topic, List<Inner> data) {
         if (data == null) {
             return null;
         }
-        final int size = data.size();
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
              final DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeByte(serStrategy.ordinal()); // write serialization 
strategy flag
+            if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) {
+                serializeNullIndexList(out, data);
+            }
+            final int size = data.size();
             out.writeInt(size);
             for (Inner entry : data) {
-                final byte[] bytes = inner.serialize(topic, entry);
-                if (!isFixedLength) {
-                    out.writeInt(bytes.length);
+                if (entry == null) {
+                    if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE);
+                    }
+                } else {
+                    final byte[] bytes = inner.serialize(topic, entry);
+                    if (!isFixedLength || serStrategy == 
SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(bytes.length);

Review comment:
       Hmmm, @mjsax reasoning was that in the future we could introduce 
**more** serialization strategies 
(https://github.com/apache/kafka/pull/6592#discussion_r370513438)
   > However, to allow us to support different serialization format in the 
future, we should add one more magic byte in the very beginning that encodes 
the choose serialization format
   

##########
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
##########
@@ -77,21 +87,39 @@ public void configure(Map<String, ?> configs, boolean 
isKey) {
         }
     }
 
+    private void serializeNullIndexList(final DataOutputStream out, 
List<Inner> data) throws IOException {
+        List<Integer> nullIndexList = IntStream.range(0, data.size())
+                .filter(i -> data.get(i) == null)
+                .boxed().collect(Collectors.toList());
+        out.writeInt(nullIndexList.size());
+        for (int i : nullIndexList) out.writeInt(i);
+    }
+
     @Override
     public byte[] serialize(String topic, List<Inner> data) {
         if (data == null) {
             return null;
         }
-        final int size = data.size();
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
              final DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeByte(serStrategy.ordinal()); // write serialization 
strategy flag
+            if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) {
+                serializeNullIndexList(out, data);
+            }
+            final int size = data.size();
             out.writeInt(size);
             for (Inner entry : data) {
-                final byte[] bytes = inner.serialize(topic, entry);
-                if (!isFixedLength) {
-                    out.writeInt(bytes.length);
+                if (entry == null) {
+                    if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE);
+                    }
+                } else {
+                    final byte[] bytes = inner.serialize(topic, entry);
+                    if (!isFixedLength || serStrategy == 
SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(bytes.length);

Review comment:
       Hmmm, @mjsax reasoning was that in the future we could introduce 
**more** serialization strategies 
https://github.com/apache/kafka/pull/6592#discussion_r370513438
   > However, to allow us to support different serialization format in the 
future, we should add one more magic byte in the very beginning that encodes 
the choose serialization format
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to