yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r620802248



##########
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
##########
@@ -77,21 +87,39 @@ public void configure(Map<String, ?> configs, boolean 
isKey) {
         }
     }
 
+    private void serializeNullIndexList(final DataOutputStream out, 
List<Inner> data) throws IOException {
+        List<Integer> nullIndexList = IntStream.range(0, data.size())
+                .filter(i -> data.get(i) == null)
+                .boxed().collect(Collectors.toList());
+        out.writeInt(nullIndexList.size());
+        for (int i : nullIndexList) out.writeInt(i);
+    }
+
     @Override
     public byte[] serialize(String topic, List<Inner> data) {
         if (data == null) {
             return null;
         }
-        final int size = data.size();
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
              final DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeByte(serStrategy.ordinal()); // write serialization 
strategy flag
+            if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) {
+                serializeNullIndexList(out, data);
+            }
+            final int size = data.size();
             out.writeInt(size);
             for (Inner entry : data) {
-                final byte[] bytes = inner.serialize(topic, entry);
-                if (!isFixedLength) {
-                    out.writeInt(bytes.length);
+                if (entry == null) {
+                    if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE);
+                    }
+                } else {
+                    final byte[] bytes = inner.serialize(topic, entry);
+                    if (!isFixedLength || serStrategy == 
SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(bytes.length);

Review comment:
       Hmmm, @mjsax reasoning was that in the future we could introduce 
**more** serialization strategies 
https://github.com/apache/kafka/pull/6592#discussion_r370513438
   > However, to allow us to support different serialization format in the 
future, we should add one more magic byte in the very beginning that encodes 
the choose serialization format
   
   As per ignoring the choice, also from @mjsax 
https://github.com/apache/kafka/pull/6592#issuecomment-606277356:
   > I guess it's called freedom of choice :) If we feel strong about it, we 
could of course disallow the "negative size" strategy for primitive types. 
However, it would have the disadvantage that we have a config that, depending 
on the data type you are using, would either be ignored or even throw an error 
if set incorrectly. From a usability point of view, this would be a 
disadvantage. It's always a mental burden to users if they have to think about 
if-then-else cases.
   ...
   Personally, I have a slight preference to allow both strategies for all 
types as I think easy of use is more important, but I am also fine otherwise.
   
   Here is my thought process, if a user chooses a serialization strategy, then 
she probably knows what she is doing. Ofc, the user will have a larger payload, 
and we certainly will notify her that the serialization strategy she chose is 
not optimal for the current type of data, but I don't think we should strictly 
forbid the user from "shooting herself in the foot".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to