This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6d1ae8b KAFKA-8326: Introduce List Serde (#6592)
6d1ae8b is described below
commit 6d1ae8bc007ffa1f3b3a9f949db6a99820dab75c
Author: Daniyar Yeralin <[email protected]>
AuthorDate: Thu May 13 18:54:00 2021 -0400
KAFKA-8326: Introduce List Serde (#6592)
Introduce List serde for primitive types or custom serdes with a serializer
and a deserializer according to KIP-466
Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Matthias J.
Sax <[email protected]>, John Roesler <[email protected]>, Michael Noll
<[email protected]>
---
checkstyle/import-control.xml | 1 +
.../apache/kafka/clients/CommonClientConfigs.java | 20 ++
.../common/serialization/ListDeserializer.java | 191 ++++++++++++++++
.../kafka/common/serialization/ListSerializer.java | 142 ++++++++++++
.../apache/kafka/common/serialization/Serdes.java | 30 +++
.../common/serialization/ListDeserializerTest.java | 251 +++++++++++++++++++++
.../common/serialization/ListSerializerTest.java | 153 +++++++++++++
.../common/serialization/SerializationTest.java | 188 +++++++++++++++
docs/streams/developer-guide/datatypes.html | 4 +-
docs/streams/upgrade-guide.html | 5 +
.../org/apache/kafka/streams/StreamsConfig.java | 20 ++
11 files changed, 1004 insertions(+), 1 deletion(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 318384f..c98cfab 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -189,6 +189,7 @@
</subpackage>
<subpackage name="serialization">
+ <allow pkg="org.apache.kafka.clients" />
<allow class="org.apache.kafka.common.errors.SerializationException" />
<allow class="org.apache.kafka.common.header.Headers" />
</subpackage>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 150dac1..58075d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -118,6 +118,26 @@ public class CommonClientConfigs {
+ "elapses the client
will resend the request if necessary or fail the request if "
+ "retries are
exhausted.";
+ public static final String DEFAULT_LIST_KEY_SERDE_INNER_CLASS =
"default.list.key.serde.inner";
+ public static final String DEFAULT_LIST_KEY_SERDE_INNER_CLASS_DOC =
"Default inner class of list serde for key that implements the
<code>org.apache.kafka.common.serialization.Serde</code> interface. "
+ + "This configuration will be read if and only if
<code>default.key.serde</code> configuration is set to
<code>org.apache.kafka.common.serialization.Serdes.ListSerde</code>";
+
+ public static final String DEFAULT_LIST_VALUE_SERDE_INNER_CLASS =
"default.list.value.serde.inner";
+ public static final String DEFAULT_LIST_VALUE_SERDE_INNER_CLASS_DOC =
"Default inner class of list serde for value that implements the
<code>org.apache.kafka.common.serialization.Serde</code> interface. "
+ + "This configuration will be read if and only if
<code>default.value.serde</code> configuration is set to
<code>org.apache.kafka.common.serialization.Serdes.ListSerde</code>";
+
+ public static final String DEFAULT_LIST_KEY_SERDE_TYPE_CLASS =
"default.list.key.serde.type";
+ public static final String DEFAULT_LIST_KEY_SERDE_TYPE_CLASS_DOC =
"Default class for key that implements the <code>java.util.List</code>
interface. "
+ + "This configuration will be read if and only if
<code>default.key.serde</code> configuration is set to
<code>org.apache.kafka.common.serialization.Serdes.ListSerde</code> "
+ + "Note when list serde class is used, one needs to set the inner
serde class that implements the
<code>org.apache.kafka.common.serialization.Serde</code> interface via '"
+ + DEFAULT_LIST_KEY_SERDE_INNER_CLASS + "'";
+
+ public static final String DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS =
"default.list.value.serde.type";
+ public static final String DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS_DOC =
"Default class for value that implements the <code>java.util.List</code>
interface. "
+ + "This configuration will be read if and only if
<code>default.value.serde</code> configuration is set to
<code>org.apache.kafka.common.serialization.Serdes.ListSerde</code> "
+ + "Note when list serde class is used, one needs to set the inner
serde class that implements the
<code>org.apache.kafka.common.serialization.Serde</code> interface via '"
+ + DEFAULT_LIST_VALUE_SERDE_INNER_CLASS + "'";
+
public static final String GROUP_ID_CONFIG = "group.id";
public static final String GROUP_ID_DOC = "A unique string that identifies
the consumer group this consumer belongs to. This property is required if the
consumer uses either the group management functionality by using
<code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
new file mode 100644
index 0000000..272cbad
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import static
org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serdes.ListSerde;
+import org.apache.kafka.common.utils.Utils;
+
+public class ListDeserializer<Inner> implements Deserializer<List<Inner>> {
+
+ private static final Map<Class<? extends Deserializer<?>>, Integer>
FIXED_LENGTH_DESERIALIZERS = mkMap(
+ mkEntry(ShortDeserializer.class, Short.BYTES),
+ mkEntry(IntegerDeserializer.class, Integer.BYTES),
+ mkEntry(FloatDeserializer.class, Float.BYTES),
+ mkEntry(LongDeserializer.class, Long.BYTES),
+ mkEntry(DoubleDeserializer.class, Double.BYTES),
+ mkEntry(UUIDDeserializer.class, 36)
+ );
+
+ private Deserializer<Inner> inner;
+ private Class<?> listClass;
+ private Integer primitiveSize;
+
+ public ListDeserializer() {}
+
+ public <L extends List<Inner>> ListDeserializer(Class<L> listClass,
Deserializer<Inner> inner) {
+ if (listClass == null || inner == null) {
+ throw new IllegalArgumentException("ListDeserializer requires both
\"listClass\" and \"innerDeserializer\" parameters to be provided during
initialization");
+ }
+ this.listClass = listClass;
+ this.inner = inner;
+ this.primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass());
+ }
+
+ public Deserializer<Inner> innerDeserializer() {
+ return inner;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ if (listClass != null || inner != null) {
+ throw new ConfigException("List deserializer was already
initialized using a non-default constructor");
+ }
+ configureListClass(configs, isKey);
+ configureInnerSerde(configs, isKey);
+ }
+
+ private void configureListClass(Map<String, ?> configs, boolean isKey) {
+ String listTypePropertyName = isKey ?
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS :
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS;
+ final Object listClassOrName = configs.get(listTypePropertyName);
+ if (listClassOrName == null) {
+ throw new ConfigException("Not able to determine the list class
because it was neither passed via the constructor nor set in the config.");
+ }
+ try {
+ if (listClassOrName instanceof String) {
+ listClass = Utils.loadClass((String) listClassOrName,
Object.class);
+ } else if (listClassOrName instanceof Class) {
+ listClass = (Class<?>) listClassOrName;
+ } else {
+ throw new KafkaException("Could not determine the list class
instance using \"" + listTypePropertyName + "\" property.");
+ }
+ } catch (final ClassNotFoundException e) {
+ throw new ConfigException(listTypePropertyName, listClassOrName,
"Deserializer's list class \"" + listClassOrName + "\" could not be found.");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void configureInnerSerde(Map<String, ?> configs, boolean isKey) {
+ String innerSerdePropertyName = isKey ?
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS :
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS;
+ final Object innerSerdeClassOrName =
configs.get(innerSerdePropertyName);
+ if (innerSerdeClassOrName == null) {
+ throw new ConfigException("Not able to determine the inner serde
class because it was neither passed via the constructor nor set in the
config.");
+ }
+ try {
+ if (innerSerdeClassOrName instanceof String) {
+ inner = Utils.newInstance((String) innerSerdeClassOrName,
Serde.class).deserializer();
+ } else if (innerSerdeClassOrName instanceof Class) {
+ inner = (Deserializer<Inner>) ((Serde)
Utils.newInstance((Class) innerSerdeClassOrName)).deserializer();
+ } else {
+ throw new KafkaException("Could not determine the inner serde
class instance using \"" + innerSerdePropertyName + "\" property.");
+ }
+ inner.configure(configs, isKey);
+ primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass());
+ } catch (final ClassNotFoundException e) {
+ throw new ConfigException(innerSerdePropertyName,
innerSerdeClassOrName, "Deserializer's inner serde class \"" +
innerSerdeClassOrName + "\" could not be found.");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<Inner> createListInstance(int listSize) {
+ try {
+ Constructor<List<Inner>> listConstructor;
+ try {
+ listConstructor = (Constructor<List<Inner>>)
listClass.getConstructor(Integer.TYPE);
+ return listConstructor.newInstance(listSize);
+ } catch (NoSuchMethodException e) {
+ listConstructor = (Constructor<List<Inner>>)
listClass.getConstructor();
+ return listConstructor.newInstance();
+ }
+ } catch (InstantiationException | IllegalAccessException |
NoSuchMethodException |
+ IllegalArgumentException | InvocationTargetException e) {
+ throw new KafkaException("Could not construct a list instance of
\"" + listClass.getCanonicalName() + "\"", e);
+ }
+ }
+
+ private SerializationStrategy parseSerializationStrategyFlag(final int
serializationStrategyFlag) throws IOException {
+ if (serializationStrategyFlag < 0 || serializationStrategyFlag >=
SerializationStrategy.VALUES.length) {
+ throw new SerializationException("Invalid serialization strategy
flag value");
+ }
+ return SerializationStrategy.VALUES[serializationStrategyFlag];
+ }
+
+ private List<Integer> deserializeNullIndexList(final DataInputStream dis)
throws IOException {
+ int nullIndexListSize = dis.readInt();
+ List<Integer> nullIndexList = new ArrayList<>(nullIndexListSize);
+ while (nullIndexListSize != 0) {
+ nullIndexList.add(dis.readInt());
+ nullIndexListSize--;
+ }
+ return nullIndexList;
+ }
+
+ @Override
+ public List<Inner> deserialize(String topic, byte[] data) {
+ if (data == null) {
+ return null;
+ }
+ try (final DataInputStream dis = new DataInputStream(new
ByteArrayInputStream(data))) {
+ SerializationStrategy serStrategy =
parseSerializationStrategyFlag(dis.readByte());
+ List<Integer> nullIndexList = null;
+ if (serStrategy == SerializationStrategy.CONSTANT_SIZE) {
+ // In CONSTANT_SIZE strategy, indexes of null entries are
decoded from a null index list
+ nullIndexList = deserializeNullIndexList(dis);
+ }
+ final int size = dis.readInt();
+ List<Inner> deserializedList = createListInstance(size);
+ for (int i = 0; i < size; i++) {
+ int entrySize = serStrategy ==
SerializationStrategy.CONSTANT_SIZE ? primitiveSize : dis.readInt();
+ if (entrySize == ListSerde.NULL_ENTRY_VALUE || (nullIndexList
!= null && nullIndexList.contains(i))) {
+ deserializedList.add(null);
+ continue;
+ }
+ byte[] payload = new byte[entrySize];
+ if (dis.read(payload) == -1) {
+ throw new SerializationException("End of the stream was
reached prematurely");
+ }
+ deserializedList.add(inner.deserialize(topic, payload));
+ }
+ return deserializedList;
+ } catch (IOException e) {
+ throw new KafkaException("Unable to deserialize into a List", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (inner != null) {
+ inner.close();
+ }
+ }
+
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
new file mode 100644
index 0000000..6274c9d
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+
+public class ListSerializer<Inner> implements Serializer<List<Inner>> {
+
+ private static final List<Class<? extends Serializer<?>>>
FIXED_LENGTH_SERIALIZERS = Arrays.asList(
+ ShortSerializer.class,
+ IntegerSerializer.class,
+ FloatSerializer.class,
+ LongSerializer.class,
+ DoubleSerializer.class,
+ UUIDSerializer.class);
+
+ private Serializer<Inner> inner;
+ private SerializationStrategy serStrategy;
+
+ public ListSerializer() {}
+
+ public ListSerializer(Serializer<Inner> inner) {
+ if (inner == null) {
+ throw new IllegalArgumentException("ListSerializer requires
\"serializer\" parameter to be provided during initialization");
+ }
+ this.inner = inner;
+ this.serStrategy = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass())
? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE;
+ }
+
+ public Serializer<Inner> getInnerSerializer() {
+ return inner;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ if (inner != null) {
+ throw new ConfigException("List serializer was already initialized
using a non-default constructor");
+ }
+ final String innerSerdePropertyName = isKey ?
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS :
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS;
+ final Object innerSerdeClassOrName =
configs.get(innerSerdePropertyName);
+ if (innerSerdeClassOrName == null) {
+ throw new ConfigException("Not able to determine the serializer
class because it was neither passed via the constructor nor set in the
config.");
+ }
+ try {
+ if (innerSerdeClassOrName instanceof String) {
+ inner = Utils.newInstance((String) innerSerdeClassOrName,
Serde.class).serializer();
+ } else if (innerSerdeClassOrName instanceof Class) {
+ inner = (Serializer<Inner>) ((Serde) Utils.newInstance((Class)
innerSerdeClassOrName)).serializer();
+ } else {
+ throw new KafkaException("Could not create a serializer class
instance using \"" + innerSerdePropertyName + "\" property.");
+ }
+ inner.configure(configs, isKey);
+ serStrategy = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass())
? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE;
+ } catch (final ClassNotFoundException e) {
+ throw new ConfigException(innerSerdePropertyName,
innerSerdeClassOrName, "Serializer class " + innerSerdeClassOrName + " could
not be found.");
+ }
+ }
+
+ private void serializeNullIndexList(final DataOutputStream out,
List<Inner> data) throws IOException {
+ int i = 0;
+ List<Integer> nullIndexList = new ArrayList<>();
+ for (Iterator<Inner> it = data.listIterator(); it.hasNext(); i++) {
+ if (it.next() == null) {
+ nullIndexList.add(i);
+ }
+ }
+ out.writeInt(nullIndexList.size());
+ for (int nullIndex : nullIndexList) {
+ out.writeInt(nullIndex);
+ }
+ }
+
+ @Override
+ public byte[] serialize(String topic, List<Inner> data) {
+ if (data == null) {
+ return null;
+ }
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(baos)) {
+ out.writeByte(serStrategy.ordinal()); // write serialization
strategy flag
+ if (serStrategy == SerializationStrategy.CONSTANT_SIZE) {
+ // In CONSTANT_SIZE strategy, indexes of null entries are
encoded in a null index list
+ serializeNullIndexList(out, data);
+ }
+ final int size = data.size();
+ out.writeInt(size);
+ for (Inner entry : data) {
+ if (entry == null) {
+ if (serStrategy == SerializationStrategy.VARIABLE_SIZE) {
+ out.writeInt(Serdes.ListSerde.NULL_ENTRY_VALUE);
+ }
+ } else {
+ final byte[] bytes = inner.serialize(topic, entry);
+ if (serStrategy == SerializationStrategy.VARIABLE_SIZE) {
+ out.writeInt(bytes.length);
+ }
+ out.write(bytes);
+ }
+ }
+ return baos.toByteArray();
+ } catch (IOException e) {
+ throw new KafkaException("Failed to serialize List", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (inner != null) {
+ inner.close();
+ }
+ }
+
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index 347bf87..4a150e0 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.common.utils.Bytes;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -125,6 +126,27 @@ public class Serdes {
}
}
+ static public final class ListSerde<Inner> extends
WrapperSerde<List<Inner>> {
+
+ final static int NULL_ENTRY_VALUE = -1;
+
+ enum SerializationStrategy {
+ CONSTANT_SIZE,
+ VARIABLE_SIZE;
+
+ public static final SerializationStrategy[] VALUES =
SerializationStrategy.values();
+ }
+
+ public ListSerde() {
+ super(new ListSerializer<>(), new ListDeserializer<>());
+ }
+
+ public <L extends List<Inner>> ListSerde(Class<L> listClass,
Serde<Inner> serde) {
+ super(new ListSerializer<>(serde.serializer()), new
ListDeserializer<>(listClass, serde.deserializer()));
+ }
+
+ }
+
@SuppressWarnings("unchecked")
static public <T> Serde<T> serdeFrom(Class<T> type) {
if (String.class.isAssignableFrom(type)) {
@@ -265,4 +287,12 @@ public class Serdes {
static public Serde<Void> Void() {
return new VoidSerde();
}
+
+ /*
+ * A serde for {@code List} type
+ */
+ static public <L extends List<Inner>, Inner> Serde<List<Inner>>
ListSerde(Class<L> listClass, Serde<Inner> innerSerde) {
+ return new ListSerde<>(listClass, innerSerde);
+ }
+
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
b/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
new file mode 100644
index 0000000..aff01e3
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+
+@SuppressWarnings("unchecked")
+public class ListDeserializerTest {
+ private final ListDeserializer<?> listDeserializer = new
ListDeserializer<>();
+ private final Map<String, Object> props = new HashMap<>();
+ private final String nonExistingClass = "non.existing.class";
+ private static class FakeObject {
+ }
+
+ @Test
+ public void testListKeyDeserializerNoArgConstructorsWithClassNames() {
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS,
ArrayList.class.getName());
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
Serdes.StringSerde.class.getName());
+ listDeserializer.configure(props, true);
+ final Deserializer<?> inner = listDeserializer.innerDeserializer();
+ assertNotNull(inner, "Inner deserializer should be not null");
+ assertTrue(inner instanceof StringDeserializer, "Inner deserializer
type should be StringDeserializer");
+ }
+
+ @Test
+ public void testListValueDeserializerNoArgConstructorsWithClassNames() {
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS,
ArrayList.class.getName());
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
Serdes.IntegerSerde.class.getName());
+ listDeserializer.configure(props, false);
+ final Deserializer<?> inner = listDeserializer.innerDeserializer();
+ assertNotNull(inner, "Inner deserializer should be not null");
+ assertTrue(inner instanceof IntegerDeserializer, "Inner deserializer
type should be IntegerDeserializer");
+ }
+
+ @Test
+ public void testListKeyDeserializerNoArgConstructorsWithClassObjects() {
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS,
ArrayList.class);
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ listDeserializer.configure(props, true);
+ final Deserializer<?> inner = listDeserializer.innerDeserializer();
+ assertNotNull(inner, "Inner deserializer should be not null");
+ assertTrue(inner instanceof StringDeserializer, "Inner deserializer
type should be StringDeserializer");
+ }
+
+ @Test
+ public void testListValueDeserializerNoArgConstructorsWithClassObjects() {
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS,
ArrayList.class);
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ listDeserializer.configure(props, false);
+ final Deserializer<?> inner = listDeserializer.innerDeserializer();
+ assertNotNull(inner, "Inner deserializer should be not null");
+ assertTrue(inner instanceof StringDeserializer, "Inner deserializer
type should be StringDeserializer");
+ }
+
+ @Test
+ public void
testListKeyDeserializerNoArgConstructorsShouldThrowConfigExceptionDueMissingInnerClassProp()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS,
ArrayList.class);
+ final ConfigException exception = assertThrows(
+ ConfigException.class,
+ () -> listDeserializer.configure(props, true)
+ );
+ assertEquals("Not able to determine the inner serde class because "
+ + "it was neither passed via the constructor nor set in the
config.", exception.getMessage());
+ }
+
+ @Test
+ public void
testListValueDeserializerNoArgConstructorsShouldThrowConfigExceptionDueMissingInnerClassProp()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS,
ArrayList.class);
+ final ConfigException exception = assertThrows(
+ ConfigException.class,
+ () -> listDeserializer.configure(props, false)
+ );
+ assertEquals("Not able to determine the inner serde class because "
+ + "it was neither passed via the constructor nor set in the
config.", exception.getMessage());
+ }
+
+ @Test
+ public void
testListKeyDeserializerNoArgConstructorsShouldThrowConfigExceptionDueMissingTypeClassProp()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ final ConfigException exception = assertThrows(
+ ConfigException.class,
+ () -> listDeserializer.configure(props, true)
+ );
+ assertEquals("Not able to determine the list class because "
+ + "it was neither passed via the constructor nor set in the
config.", exception.getMessage());
+ }
+
+ @Test
+ public void
testListValueDeserializerNoArgConstructorsShouldThrowConfigExceptionDueMissingTypeClassProp()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ final ConfigException exception = assertThrows(
+ ConfigException.class,
+ () -> listDeserializer.configure(props, false)
+ );
+ assertEquals("Not able to determine the list class because "
+ + "it was neither passed via the constructor nor set in the
config.", exception.getMessage());
+ }
+
+ @Test
+ public void
testListKeyDeserializerNoArgConstructorsShouldThrowKafkaExceptionDueInvalidTypeClass()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, new
FakeObject());
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ final KafkaException exception = assertThrows(
+ KafkaException.class,
+ () -> listDeserializer.configure(props, true)
+ );
+ assertEquals("Could not determine the list class instance using "
+ + "\"" + CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS +
"\" property.", exception.getMessage());
+ }
+
+ @Test
+ public void
testListValueDeserializerNoArgConstructorsShouldThrowKafkaExceptionDueInvalidTypeClass()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, new
FakeObject());
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ final KafkaException exception = assertThrows(
+ KafkaException.class,
+ () -> listDeserializer.configure(props, false)
+ );
+ assertEquals("Could not determine the list class instance using "
+ + "\"" + CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS +
"\" property.", exception.getMessage());
+ }
+
+ @Test
+ public void
testListKeyDeserializerNoArgConstructorsShouldThrowKafkaExceptionDueInvalidInnerClass()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS,
ArrayList.class);
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, new
FakeObject());
+ final KafkaException exception = assertThrows(
+ KafkaException.class,
+ () -> listDeserializer.configure(props, true)
+ );
+ assertEquals("Could not determine the inner serde class instance using
"
+ + "\"" + CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS +
"\" property.", exception.getMessage());
+ }
+
+ @Test
+ public void
testListValueDeserializerNoArgConstructorsShouldThrowKafkaExceptionDueInvalidInnerClass()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS,
ArrayList.class);
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
new FakeObject());
+ final KafkaException exception = assertThrows(
+ KafkaException.class,
+ () -> listDeserializer.configure(props, false)
+ );
+ assertEquals("Could not determine the inner serde class instance using
"
+ + "\"" + CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS
+ "\" property.", exception.getMessage());
+ }
+
+ @Test
+ public void
testListKeyDeserializerNoArgConstructorsShouldThrowConfigExceptionDueListClassNotFound()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS,
nonExistingClass);
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ final ConfigException exception = assertThrows(
+ ConfigException.class,
+ () -> listDeserializer.configure(props, true)
+ );
+ assertEquals("Invalid value " + nonExistingClass + " for configuration
"
+ + CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS + ":
Deserializer's list class "
+ + "\"" + nonExistingClass + "\" could not be found.",
exception.getMessage());
+ }
+
+ @Test
+ public void
testListValueDeserializerNoArgConstructorsShouldThrowConfigExceptionDueListClassNotFound()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS,
nonExistingClass);
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ final ConfigException exception = assertThrows(
+ ConfigException.class,
+ () -> listDeserializer.configure(props, false)
+ );
+ assertEquals("Invalid value " + nonExistingClass + " for configuration
"
+ + CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS + ":
Deserializer's list class "
+ + "\"" + nonExistingClass + "\" could not be found.",
exception.getMessage());
+ }
+
+ @Test
+ public void
testListKeyDeserializerNoArgConstructorsShouldThrowConfigExceptionDueInnerSerdeClassNotFound()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS,
ArrayList.class);
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
nonExistingClass);
+ final ConfigException exception = assertThrows(
+ ConfigException.class,
+ () -> listDeserializer.configure(props, true)
+ );
+ assertEquals("Invalid value " + nonExistingClass + " for configuration
"
+ + CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS + ":
Deserializer's inner serde class "
+ + "\"" + nonExistingClass + "\" could not be found.",
exception.getMessage());
+ }
+
+ @Test
+ public void
testListValueDeserializerNoArgConstructorsShouldThrowConfigExceptionDueInnerSerdeClassNotFound()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS,
ArrayList.class);
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
nonExistingClass);
+ final ConfigException exception = assertThrows(
+ ConfigException.class,
+ () -> listDeserializer.configure(props, false)
+ );
+ assertEquals("Invalid value " + nonExistingClass + " for configuration
"
+ + CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS + ":
Deserializer's inner serde class "
+ + "\"" + nonExistingClass + "\" could not be found.",
exception.getMessage());
+ }
+
+ @Test
+ public void
testListKeyDeserializerShouldThrowConfigExceptionDueAlreadyInitialized() {
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS,
ArrayList.class);
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ final ListDeserializer<Integer> initializedListDeserializer = new
ListDeserializer<>(ArrayList.class,
+ Serdes.Integer().deserializer());
+ final ConfigException exception = assertThrows(
+ ConfigException.class,
+ () -> initializedListDeserializer.configure(props, true)
+ );
+ assertEquals("List deserializer was already initialized using a
non-default constructor", exception.getMessage());
+ }
+
+ @Test
+ public void
testListValueDeserializerShouldThrowConfigExceptionDueAlreadyInitialized() {
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS,
ArrayList.class);
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ final ListDeserializer<Integer> initializedListDeserializer = new
ListDeserializer<>(ArrayList.class,
+ Serdes.Integer().deserializer());
+ final ConfigException exception = assertThrows(
+ ConfigException.class,
+ () -> initializedListDeserializer.configure(props, true)
+ );
+ assertEquals("List deserializer was already initialized using a
non-default constructor", exception.getMessage());
+ }
+
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
b/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
new file mode 100644
index 0000000..a8ab191
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class ListSerializerTest {
+ private final ListSerializer<?> listSerializer = new ListSerializer<>();
+ private final Map<String, Object> props = new HashMap<>();
+ private final String nonExistingClass = "non.existing.class";
+ private static class FakeObject {
+ }
+
+ @Test
+ public void testListKeySerializerNoArgConstructorsWithClassName() {
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
Serdes.StringSerde.class.getName());
+ listSerializer.configure(props, true);
+ final Serializer<?> inner = listSerializer.getInnerSerializer();
+ assertNotNull(inner, "Inner serializer should be not null");
+ assertTrue(inner instanceof StringSerializer, "Inner serializer type
should be StringSerializer");
+ }
+
+ @Test
+ public void testListValueSerializerNoArgConstructorsWithClassName() {
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
Serdes.StringSerde.class.getName());
+ listSerializer.configure(props, false);
+ final Serializer<?> inner = listSerializer.getInnerSerializer();
+ assertNotNull(inner, "Inner serializer should be not null");
+ assertTrue(inner instanceof StringSerializer, "Inner serializer type
should be StringSerializer");
+ }
+
+ @Test
+ public void testListKeySerializerNoArgConstructorsWithClassObject() {
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ listSerializer.configure(props, true);
+ final Serializer<?> inner = listSerializer.getInnerSerializer();
+ assertNotNull(inner, "Inner serializer should be not null");
+ assertTrue(inner instanceof StringSerializer, "Inner serializer type
should be StringSerializer");
+ }
+
+ @Test
+ public void testListValueSerializerNoArgConstructorsWithClassObject() {
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ listSerializer.configure(props, false);
+ final Serializer<?> inner = listSerializer.getInnerSerializer();
+ assertNotNull(inner, "Inner serializer should be not null");
+ assertTrue(inner instanceof StringSerializer, "Inner serializer type
should be StringSerializer");
+ }
+
+ @Test
+ public void
testListSerializerNoArgConstructorsShouldThrowConfigExceptionDueMissingProp() {
+ ConfigException exception = assertThrows(
+ ConfigException.class,
+ () -> listSerializer.configure(props, true)
+ );
+ assertEquals("Not able to determine the serializer class because it
was neither passed via the constructor nor set in the config.",
exception.getMessage());
+
+ exception = assertThrows(
+ ConfigException.class,
+ () -> listSerializer.configure(props, false)
+ );
+ assertEquals("Not able to determine the serializer class because it
was neither passed via the constructor nor set in the config.",
exception.getMessage());
+ }
+
+ @Test
+ public void
testListKeySerializerNoArgConstructorsShouldThrowKafkaExceptionDueInvalidClass()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, new
FakeObject());
+ final KafkaException exception = assertThrows(
+ KafkaException.class,
+ () -> listSerializer.configure(props, true)
+ );
+ assertEquals("Could not create a serializer class instance using \"" +
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS + "\" property.",
exception.getMessage());
+ }
+
+ @Test
+ public void
testListValueSerializerNoArgConstructorsShouldThrowKafkaExceptionDueInvalidClass()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
new FakeObject());
+ final KafkaException exception = assertThrows(
+ KafkaException.class,
+ () -> listSerializer.configure(props, false)
+ );
+ assertEquals("Could not create a serializer class instance using \"" +
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS + "\" property.",
exception.getMessage());
+ }
+
+ @Test
+ public void
testListKeySerializerNoArgConstructorsShouldThrowKafkaExceptionDueClassNotFound()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
nonExistingClass);
+ final KafkaException exception = assertThrows(
+ KafkaException.class,
+ () -> listSerializer.configure(props, true)
+ );
+ assertEquals("Invalid value non.existing.class for configuration " +
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS + ": Serializer class "
+ nonExistingClass + " could not be found.", exception.getMessage());
+ }
+
+ @Test
+ public void
testListValueSerializerNoArgConstructorsShouldThrowKafkaExceptionDueClassNotFound()
{
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
nonExistingClass);
+ final KafkaException exception = assertThrows(
+ KafkaException.class,
+ () -> listSerializer.configure(props, false)
+ );
+ assertEquals("Invalid value non.existing.class for configuration " +
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS + ": Serializer class
" + nonExistingClass + " could not be found.", exception.getMessage());
+ }
+
+ @Test
+ public void
testListKeySerializerShouldThrowConfigExceptionDueAlreadyInitialized() {
+ props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ final ListSerializer<Integer> initializedListSerializer = new
ListSerializer<>(Serdes.Integer().serializer());
+ final ConfigException exception = assertThrows(
+ ConfigException.class,
+ () -> initializedListSerializer.configure(props, true)
+ );
+ assertEquals("List serializer was already initialized using a
non-default constructor", exception.getMessage());
+ }
+
+ @Test
+ public void
testListValueSerializerShouldThrowConfigExceptionDueAlreadyInitialized() {
+ props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
+ final ListSerializer<Integer> initializedListSerializer = new
ListSerializer<>(Serdes.Integer().serializer());
+ final ConfigException exception = assertThrows(
+ ConfigException.class,
+ () -> initializedListSerializer.configure(props, false)
+ );
+ assertEquals("List serializer was already initialized using a
non-default constructor", exception.getMessage());
+ }
+
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 0446baf..85c09dd 100644
---
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -27,7 +27,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.Stack;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -106,6 +110,190 @@ public class SerializationTest {
}
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void listSerdeShouldReturnEmptyCollection() {
+ List<Integer> testData = Arrays.asList();
+ Serde<List<Integer>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.Integer());
+ assertEquals(testData,
+ listSerde.deserializer().deserialize(topic,
listSerde.serializer().serialize(topic, testData)),
+ "Should get empty collection after serialization and
deserialization on an empty list");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void listSerdeShouldReturnNull() {
+ List<Integer> testData = null;
+ Serde<List<Integer>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.Integer());
+ assertEquals(testData,
+ listSerde.deserializer().deserialize(topic,
listSerde.serializer().serialize(topic, testData)),
+ "Should get null after serialization and deserialization on an
empty list");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void listSerdeShouldRoundtripIntPrimitiveInput() {
+ List<Integer> testData = Arrays.asList(1, 2, 3);
+ Serde<List<Integer>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.Integer());
+ assertEquals(testData,
+ listSerde.deserializer().deserialize(topic,
listSerde.serializer().serialize(topic, testData)),
+ "Should get the original collection of integer primitives after
serialization and deserialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForIntPrimitiveInput() {
+ List<Integer> testData = Arrays.asList(1, 2, 3);
+ Serde<List<Integer>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.Integer());
+ assertEquals(21, listSerde.serializer().serialize(topic,
testData).length,
+ "Should get length of 21 bytes after serialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void listSerdeShouldRoundtripShortPrimitiveInput() {
+ List<Short> testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+ Serde<List<Short>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.Short());
+ assertEquals(testData,
+ listSerde.deserializer().deserialize(topic,
listSerde.serializer().serialize(topic, testData)),
+ "Should get the original collection of short primitives after
serialization and deserialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForShortPrimitiveInput() {
+ List<Short> testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+ Serde<List<Short>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.Short());
+ assertEquals(15, listSerde.serializer().serialize(topic,
testData).length,
+ "Should get length of 15 bytes after serialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void listSerdeShouldRoundtripFloatPrimitiveInput() {
+ List<Float> testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+ Serde<List<Float>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.Float());
+ assertEquals(testData,
+ listSerde.deserializer().deserialize(topic,
listSerde.serializer().serialize(topic, testData)),
+ "Should get the original collection of float primitives after
serialization and deserialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForFloatPrimitiveInput() {
+ List<Float> testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+ Serde<List<Float>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.Float());
+ assertEquals(21, listSerde.serializer().serialize(topic,
testData).length,
+ "Should get length of 21 bytes after serialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void listSerdeShouldRoundtripLongPrimitiveInput() {
+ List<Long> testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+ Serde<List<Long>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.Long());
+ assertEquals(testData,
+ listSerde.deserializer().deserialize(topic,
listSerde.serializer().serialize(topic, testData)),
+ "Should get the original collection of long primitives after
serialization and deserialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForLongPrimitiveInput() {
+ List<Long> testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+ Serde<List<Long>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.Long());
+ assertEquals(33, listSerde.serializer().serialize(topic,
testData).length,
+ "Should get length of 33 bytes after serialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void listSerdeShouldRoundtripDoublePrimitiveInput() {
+ List<Double> testData = Arrays.asList((double) 1, (double) 2, (double)
3);
+ Serde<List<Double>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.Double());
+ assertEquals(testData,
+ listSerde.deserializer().deserialize(topic,
listSerde.serializer().serialize(topic, testData)),
+ "Should get the original collection of double primitives after
serialization and deserialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForDoublePrimitiveInput() {
+ List<Double> testData = Arrays.asList((double) 1, (double) 2, (double)
3);
+ Serde<List<Double>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.Double());
+ assertEquals(33, listSerde.serializer().serialize(topic,
testData).length,
+ "Should get length of 33 bytes after serialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void listSerdeShouldRoundtripUUIDInput() {
+ List<UUID> testData = Arrays.asList(UUID.randomUUID(),
UUID.randomUUID(), UUID.randomUUID());
+ Serde<List<UUID>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.UUID());
+ assertEquals(testData,
+ listSerde.deserializer().deserialize(topic,
listSerde.serializer().serialize(topic, testData)),
+ "Should get the original collection of UUID after serialization
and deserialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForUUIDInput() {
+ List<UUID> testData = Arrays.asList(UUID.randomUUID(),
UUID.randomUUID(), UUID.randomUUID());
+ Serde<List<UUID>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.UUID());
+ assertEquals(117, listSerde.serializer().serialize(topic,
testData).length,
+ "Should get length of 117 bytes after serialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void listSerdeShouldRoundtripNonPrimitiveInput() {
+ List<String> testData = Arrays.asList("A", "B", "C");
+ Serde<List<String>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.String());
+ assertEquals(testData,
+ listSerde.deserializer().deserialize(topic,
listSerde.serializer().serialize(topic, testData)),
+ "Should get the original collection of strings list after
serialization and deserialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void listSerdeShouldRoundtripPrimitiveInputWithNullEntries() {
+ List<Integer> testData = Arrays.asList(1, null, 3);
+ Serde<List<Integer>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.Integer());
+ assertEquals(testData,
+ listSerde.deserializer().deserialize(topic,
listSerde.serializer().serialize(topic, testData)),
+ "Should get the original collection of integer primitives with
null entries "
+ + "after serialization and deserialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void listSerdeShouldRoundtripNonPrimitiveInputWithNullEntries() {
+ List<String> testData = Arrays.asList("A", null, "C");
+ Serde<List<String>> listSerde = Serdes.ListSerde(ArrayList.class,
Serdes.String());
+ assertEquals(testData,
+ listSerde.deserializer().deserialize(topic,
listSerde.serializer().serialize(topic, testData)),
+ "Should get the original collection of strings list with null
entries "
+ + "after serialization and deserialization");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void listSerdeShouldReturnLinkedList() {
+ List<Integer> testData = new LinkedList<>();
+ Serde<List<Integer>> listSerde = Serdes.ListSerde(LinkedList.class,
Serdes.Integer());
+ assertTrue(listSerde.deserializer().deserialize(topic,
listSerde.serializer().serialize(topic, testData))
+ instanceof LinkedList, "Should return List instance of type
LinkedList");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void listSerdeShouldReturnStack() {
+ List<Integer> testData = new Stack<>();
+ Serde<List<Integer>> listSerde = Serdes.ListSerde(Stack.class,
Serdes.Integer());
+ assertTrue(listSerde.deserializer().deserialize(topic,
listSerde.serializer().serialize(topic, testData))
+ instanceof Stack, "Should return List instance of type Stack");
+ }
+
@Test
public void
floatDeserializerShouldThrowSerializationExceptionOnZeroBytes() {
try (Serde<Float> serde = Serdes.Float()) {
diff --git a/docs/streams/developer-guide/datatypes.html
b/docs/streams/developer-guide/datatypes.html
index e6930bc..2201b5b 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -140,10 +140,12 @@
<tr class="row-even"><td>UUID</td>
<td><code class="docutils literal"><span
class="pre">Serdes.UUID()</span></code></td>
</tr>
- </tr>
<tr class="row-odd"><td>Void</td>
<td><code class="docutils literal"><span
class="pre">Serdes.Void()</span></code></td>
</tr>
+ <tr class="row-even"><td>List</td>
+ <td><code class="docutils literal"><span
class="pre">Serdes.ListSerde()</span></code></td>
+ </tr>
</tbody>
</table>
<div class="admonition tip">
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index de747bb..3ae4573 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -150,6 +150,11 @@
(meaning: use broker default replication factor).
The <code>replication.factor</code> value of <code>-1</code> requires
broker version 2.4 or newer.
</p>
+ <p> The new serde type was introduced <code>ListSerde</code>: </p>
+ <ul>
+ <li> Added class <code>ListSerde</code> to (de)serialize
<code>List</code>-based objects </li>
+ <li> Introduced <code>ListSerializer</code> and
<code>ListDeserializer</code> to power the new functionality </li>
+ </ul>
<h3><a id="streams_api_changes_280"
href="#streams_api_changes_280">Streams API changes in 2.8.0</a></h3>
<p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0e9e6a1..274322e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -643,6 +643,26 @@ public class StreamsConfig extends AbstractConfig {
Serdes.ByteArraySerde.class.getName(),
Importance.MEDIUM,
DEFAULT_KEY_SERDE_CLASS_DOC)
+ .define(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
+ Type.CLASS,
+ null,
+ Importance.MEDIUM,
+ CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS_DOC)
+ .define(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
+ Type.CLASS,
+ null,
+ Importance.MEDIUM,
+
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS_DOC)
+ .define(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS,
+ Type.CLASS,
+ null,
+ Importance.MEDIUM,
+ CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS_DOC)
+ .define(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS,
+ Type.CLASS,
+ null,
+ Importance.MEDIUM,
+
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS_DOC)
.define(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
DefaultProductionExceptionHandler.class.getName(),