This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7bd979bb4ff KAFKA-20173: Propagate headers into serde 3/N (#21681)
7bd979bb4ff is described below
commit 7bd979bb4ff842b90ecde4ae8eb6580544f303a9
Author: Uladzislau Blok <[email protected]>
AuthorDate: Mon Mar 9 04:35:04 2026 +0100
KAFKA-20173: Propagate headers into serde 3/N (#21681)
Follow-up for:
- https://github.com/apache/kafka/pull/21490
- https://github.com/apache/kafka/pull/21536
This PR fixes headers propagation in `ListSerde`.
Reviewers: Matthias J. Sax <[email protected]>
---
checkstyle/import-control.xml | 1 +
.../common/serialization/ListDeserializer.java | 9 ++++++-
.../kafka/common/serialization/ListSerializer.java | 9 ++++++-
.../common/serialization/ListDeserializerTest.java | 28 ++++++++++++++++++++++
.../common/serialization/ListSerializerTest.java | 27 +++++++++++++++++++++
5 files changed, 72 insertions(+), 2 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 86659a1856d..a160d34c04a 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -196,6 +196,7 @@
<subpackage name="serialization">
<allow pkg="org.apache.kafka.clients" />
<allow class="org.apache.kafka.common.header.Headers" />
+ <allow class="org.apache.kafka.common.header.internals.RecordHeaders" />
</subpackage>
<subpackage name="utils">
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
index 4fe1313bda7..77ff3e68419 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
@@ -20,6 +20,8 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes.ListSerde;
import org.apache.kafka.common.utils.Utils;
@@ -160,6 +162,11 @@ public class ListDeserializer<Inner> implements
Deserializer<List<Inner>> {
@Override
public List<Inner> deserialize(String topic, byte[] data) {
+ return deserialize(topic, new RecordHeaders(), data);
+ }
+
+ @Override
+ public List<Inner> deserialize(String topic, Headers headers, byte[] data)
{
if (data == null) {
return null;
}
@@ -184,7 +191,7 @@ public class ListDeserializer<Inner> implements
Deserializer<List<Inner>> {
log.trace("Deserialized list so far: {}",
deserializedList); // avoid logging actual data above TRACE level since it may
contain sensitive information
throw new SerializationException("End of the stream was
reached prematurely");
}
- deserializedList.add(inner.deserialize(topic, payload));
+ deserializedList.add(inner.deserialize(topic, headers,
payload));
}
return deserializedList;
} catch (IOException e) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
index 99c6ef5ce85..d56d9414d72 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@@ -107,6 +109,11 @@ public class ListSerializer<Inner> implements
Serializer<List<Inner>> {
@Override
public byte[] serialize(String topic, List<Inner> data) {
+ return serialize(topic, new RecordHeaders(), data);
+ }
+
+ @Override
+ public byte[] serialize(String topic, Headers headers, List<Inner> data) {
if (data == null) {
return null;
}
@@ -125,7 +132,7 @@ public class ListSerializer<Inner> implements
Serializer<List<Inner>> {
out.writeInt(Serdes.ListSerde.NULL_ENTRY_VALUE);
}
} else {
- final byte[] bytes = inner.serialize(topic, entry);
+ final byte[] bytes = inner.serialize(topic, headers,
entry);
if (serStrategy == SerializationStrategy.VARIABLE_SIZE) {
out.writeInt(bytes.length);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
b/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
index 2bfb7a86334..a3f4bec36e3 100644
---
a/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
@@ -19,17 +19,27 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@SuppressWarnings("unchecked")
public class ListDeserializerTest {
@@ -249,4 +259,22 @@ public class ListDeserializerTest {
assertEquals("List deserializer was already initialized using a
non-default constructor", exception.getMessage());
}
+ @Test
+ public void shouldPassHeadersToUnderlyingDeserializer() {
+ final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
+ when(mockDeserializer.deserialize(anyString(), any(Headers.class),
any(byte[].class))).thenReturn("test-value");
+
+ final String topic = "topic";
+ final List<String> data = List.of("test-value");
+ final byte[] serializedData = new
ListSerializer<>(Serdes.String().serializer()).serialize(topic, data);
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+
+ final ListDeserializer<String> testDeserializer = new
ListDeserializer<>(ArrayList.class, mockDeserializer);
+
+ testDeserializer.deserialize(topic, headers, serializedData);
+
+ verify(mockDeserializer).deserialize(eq(topic), eq(headers),
any(byte[].class));
+ verify(mockDeserializer, never()).deserialize(anyString(),
any(byte[].class));
+ }
+
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
b/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
index 4ebaafbecea..e9984a8a7a8 100644
---
a/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java
@@ -19,16 +19,26 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class ListSerializerTest {
@@ -151,4 +161,21 @@ public class ListSerializerTest {
assertEquals("List serializer was already initialized using a
non-default constructor", exception.getMessage());
}
+ @Test
+ public void shouldPassHeadersToUnderlyingSerializer() {
+ final Serializer<String> mockSerializer = mock(StringSerializer.class);
+ when(mockSerializer.serialize(anyString(), any(Headers.class),
anyString())).thenReturn("test-value".getBytes());
+
+ final String topic = "topic";
+ final List<String> data = List.of("test-key");
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+
+ final ListSerializer<String> testSerializer = new
ListSerializer<>(mockSerializer);
+
+ testSerializer.serialize(topic, headers, data);
+
+ verify(mockSerializer).serialize(eq(topic), eq(headers),
eq("test-key"));
+ verify(mockSerializer, never()).serialize(anyString(), anyString());
+ }
+
}