ableegoldman commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r620720140



##########
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
##########
@@ -77,21 +87,39 @@ public void configure(Map<String, ?> configs, boolean 
isKey) {
         }
     }
 
+    private void serializeNullIndexList(final DataOutputStream out, 
List<Inner> data) throws IOException {
+        List<Integer> nullIndexList = IntStream.range(0, data.size())
+                .filter(i -> data.get(i) == null)
+                .boxed().collect(Collectors.toList());
+        out.writeInt(nullIndexList.size());
+        for (int i : nullIndexList) out.writeInt(i);
+    }
+
     @Override
     public byte[] serialize(String topic, List<Inner> data) {
         if (data == null) {
             return null;
         }
-        final int size = data.size();
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
              final DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeByte(serStrategy.ordinal()); // write serialization 
strategy flag
+            if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) {
+                serializeNullIndexList(out, data);
+            }
+            final int size = data.size();
             out.writeInt(size);
             for (Inner entry : data) {
-                final byte[] bytes = inner.serialize(topic, entry);
-                if (!isFixedLength) {
-                    out.writeInt(bytes.length);
+                if (entry == null) {
+                    if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE);
+                    }
+                } else {
+                    final byte[] bytes = inner.serialize(topic, entry);
+                    if (!isFixedLength || serStrategy == 
SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(bytes.length);

Review comment:
       Ah, sorry if that wasn't clear. Yes I was proposing to ignore the choice 
if a user selects the `VARIABLE_SIZE` strategy with primitive type data. And to 
also log a warning in this case so at least we're not just silently ignoring it.
   
   But I think you may have a point that perhaps we don't need to expose this 
flag at all. There seems to be no reason for a user to explicitly opt-in to the 
`VARIABLE_SIZE` strategy. Perhaps a better way of looking at this is to say 
that this strategy is the default, where the default will be overridden in two 
cases: data is a primitive/known type, or the data is a custom type that the 
user knows to be constant size and thus chooses to opt-in to the 
`CONSTANT_SIZE` strategy.
   
   WDYT? We could simplify the API by making this a boolean parameter instead 
of having them choose a `SerializationStrategy` directly, something like 
`isConstantSize`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to