ableegoldman commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r618826368



##########
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
##########
@@ -77,21 +87,39 @@ public void configure(Map<String, ?> configs, boolean 
isKey) {
         }
     }
 
+    private void serializeNullIndexList(final DataOutputStream out, 
List<Inner> data) throws IOException {
+        List<Integer> nullIndexList = IntStream.range(0, data.size())
+                .filter(i -> data.get(i) == null)
+                .boxed().collect(Collectors.toList());
+        out.writeInt(nullIndexList.size());
+        for (int i : nullIndexList) out.writeInt(i);
+    }
+
     @Override
     public byte[] serialize(String topic, List<Inner> data) {
         if (data == null) {
             return null;
         }
-        final int size = data.size();
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
              final DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeByte(serStrategy.ordinal()); // write serialization 
strategy flag
+            if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) {
+                serializeNullIndexList(out, data);
+            }
+            final int size = data.size();
             out.writeInt(size);
             for (Inner entry : data) {
-                final byte[] bytes = inner.serialize(topic, entry);
-                if (!isFixedLength) {
-                    out.writeInt(bytes.length);
+                if (entry == null) {
+                    if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE);
+                    }
+                } else {
+                    final byte[] bytes = inner.serialize(topic, entry);
+                    if (!isFixedLength || serStrategy == 
SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(bytes.length);

Review comment:
       Awesome. Maybe I misunderstood this comment:
   >Even if we are dealing with primitives, and a user chooses 
SerializationStrategy.NEGATIVE_SIZE, we would have to encode each primitive's 
size in our payload.
   
   or maybe you just wrote that a while ago and it's out of date. Anyways what 
we're doing now sounds good, no reason to encode extra data even if the user 
selects this strategy for some reason. But I do think we should at least log a 
warning telling them they made a bad choice and it will be ignored. Most likely 
they just didn't understand what the parameter meant, and it's a good 
opportunity to enlighten them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to