This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7b46af69aa4 KAFKA-20158: Add AggregationWithHeaders, serialization
support and tests (1/N) (#21511)
7b46af69aa4 is described below
commit 7b46af69aa495ea612e1b771e8f5b1b5b89862f8
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Feb 24 09:36:36 2026 -0500
KAFKA-20158: Add AggregationWithHeaders, serialization support and tests
(1/N) (#21511)
This PR introduces `AggregationWithHeaders` and serialization support
introduced in KIP-1271 for storing session aggregations with headers.
---
.../streams/state/AggregationWithHeaders.java | 114 ++++++++++++++++++
.../AggregationWithHeadersDeserializer.java | 128 ++++++++++++++++++++
.../AggregationWithHeadersSerializer.java | 106 +++++++++++++++++
.../streams/state/AggregationWithHeadersTest.java | 130 +++++++++++++++++++++
.../AggregationWithHeadersDeserializerTest.java | 119 +++++++++++++++++++
.../AggregationWithHeadersSerializerTest.java | 93 +++++++++++++++
6 files changed, 690 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/AggregationWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/AggregationWithHeaders.java
new file mode 100644
index 00000000000..0e1173c2ed2
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/AggregationWithHeaders.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.header.Headers;
+
+import java.util.Objects;
+
+/**
+ * Combines an aggregated value with its associated record headers.
+ * This is used by SessionStoreWithHeaders to store session aggregations along
with headers.
+ *
+ * @param <AGG> the aggregation type
+ */
+public final class AggregationWithHeaders<AGG> {
+
+ private final AGG aggregation;
+ private final Headers headers;
+
+ private AggregationWithHeaders(final AGG aggregation, final Headers
headers) {
+ Objects.requireNonNull(headers, "headers must not be null");
+ this.aggregation = aggregation;
+ this.headers = headers;
+ }
+
+ /**
+ * Create a new {@link AggregationWithHeaders} instance if the provided
{@code aggregation} is not {@code null}.
+ *
+ * @param aggregation the aggregation
+ * @param headers the headers (may be {@code null}, treated as empty)
+ * @param <AGG> the type of the aggregation
+ * @return a new {@link AggregationWithHeaders} instance if the provided
{@code aggregation} is not {@code null};
+ * otherwise {@code null} is returned
+ */
+ public static <AGG> AggregationWithHeaders<AGG> make(final AGG
aggregation, final Headers headers) {
+ if (aggregation == null) {
+ return null;
+ }
+ return new AggregationWithHeaders<>(aggregation, headers);
+ }
+
+ /**
+ * Create a new {@link AggregationWithHeaders} instance.
+ * The provided {@code aggregation} may be {@code null}.
+ *
+ * @param aggregation the aggregation (may be {@code null})
+ * @param headers the headers (may be {@code null}, treated as empty)
+ * @param <AGG> the type of the aggregation
+ * @return a new {@link AggregationWithHeaders} instance
+ */
+ public static <AGG> AggregationWithHeaders<AGG> makeAllowNullable(final
AGG aggregation, final Headers headers) {
+ return new AggregationWithHeaders<>(aggregation, headers);
+ }
+
+ /**
+ * Return the wrapped {@code aggregation} of the given {@code
aggregationWithHeaders} parameter
+ * if the parameter is not {@code null}.
+ *
+ * @param aggregationWithHeaders an {@link AggregationWithHeaders}
instance; can be {@code null}
+ * @param <AGG> the type of the aggregation
+ * @return the wrapped {@code aggregation} of {@code
aggregationWithHeaders} if not {@code null}; otherwise {@code null}
+ */
+ public static <AGG> AGG getAggregationOrNull(final
AggregationWithHeaders<AGG> aggregationWithHeaders) {
+ return aggregationWithHeaders == null ? null :
aggregationWithHeaders.aggregation;
+ }
+
+ public AGG aggregation() {
+ return aggregation;
+ }
+
+ public Headers headers() {
+ return headers;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof AggregationWithHeaders)) {
+ return false;
+ }
+ final AggregationWithHeaders<?> that = (AggregationWithHeaders<?>) o;
+ return Objects.equals(aggregation, that.aggregation)
+ && Objects.equals(this.headers, that.headers);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(aggregation, headers);
+ }
+
+ @Override
+ public String toString() {
+ return "AggregationWithHeaders{" +
+ "aggregation=" + aggregation +
+ ", headers=" + headers +
+ '}';
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
new file mode 100644
index 00000000000..f28078947ac
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+/**
+ * Deserializer for AggregationWithHeaders.
+ * Deserialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][aggregation]
+ * <p>
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes:
+ * - For null/empty headers: headersSize = 0, headersBytes is omitted (0
bytes)
+ * - For non-empty headers: headersSize > 0, serialized headers in the
format [count(varint)][header1][header2]... to be processed by
HeadersDeserializer.
+ * - aggregation: Serialized aggregation to be deserialized with the provided
aggregation deserializer
+ * <p>
+ * This is used by KIP-1271 to deserialize aggregations with headers from
session state stores.
+ */
+class AggregationWithHeadersDeserializer<AGG> implements
WrappingNullableDeserializer<AggregationWithHeaders<AGG>, Void, AGG> {
+
+ public final Deserializer<AGG> aggregationDeserializer;
+
+ AggregationWithHeadersDeserializer(final Deserializer<AGG>
aggregationDeserializer) {
+ Objects.requireNonNull(aggregationDeserializer);
+ this.aggregationDeserializer = aggregationDeserializer;
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ aggregationDeserializer.configure(configs, isKey);
+ }
+
+ @Override
+ public AggregationWithHeaders<AGG> deserialize(final String topic, final
byte[] aggregationWithHeaders) {
+ if (aggregationWithHeaders == null) {
+ return null;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
+ final Headers headers = readHeaders(buffer);
+ final byte[] rawAggregation = readBytes(buffer, buffer.remaining());
+ final AGG aggregation = aggregationDeserializer.deserialize(topic,
headers, rawAggregation);
+
+ return AggregationWithHeaders.makeAllowNullable(aggregation, headers);
+ }
+
+ @Override
+ public void close() {
+ aggregationDeserializer.close();
+ }
+
+ @Override
+ public void setIfUnset(final SerdeGetter getter) {
+ initNullableDeserializer(aggregationDeserializer, getter);
+ }
+
+
+ /**
+ * Reads the specified number of bytes from the buffer with validation.
+ *
+ * @param buffer the ByteBuffer to read from
+ * @param length the number of bytes to read
+ * @return the byte array containing the read bytes
+ * @throws SerializationException if buffer doesn't have enough bytes or
length is negative
+ */
+ private static byte[] readBytes(final ByteBuffer buffer, final int length)
{
+ if (length < 0) {
+ throw new SerializationException(
+ "Invalid AggregationWithHeaders format: negative length " +
length
+ );
+ }
+ if (buffer.remaining() < length) {
+ throw new SerializationException(
+ "Invalid AggregationWithHeaders format: expected " + length +
+ " bytes but only " + buffer.remaining() + " bytes
remaining"
+ );
+ }
+ final byte[] bytes = new byte[length];
+ buffer.get(bytes);
+ return bytes;
+ }
+
+ /**
+ * Extract headers from serialized AggregationWithHeaders.
+ */
+ static Headers headers(final byte[] rawAggregationWithHeaders) {
+ if (rawAggregationWithHeaders == null) {
+ return null;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(rawAggregationWithHeaders);
+ return readHeaders(buffer);
+ }
+
+ private static Headers readHeaders(final ByteBuffer buffer) {
+ final int headersSize = ByteUtils.readVarint(buffer);
+ final byte[] rawHeaders = readBytes(buffer, headersSize);
+ return HeadersDeserializer.deserialize(rawHeaders);
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
new file mode 100644
index 00000000000..1781d1fa970
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+/**
+ * Serializer for AggregationWithHeaders.
+ <p>
+ * Serialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][aggregation]
+ * <p>
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes:
+ * - For null/empty headers: headersSize = 0, headersBytes is omitted (0
bytes)
+ * - For non-empty headers: headersSize > 0, serialized headers
([count(varint)][header1][header2]...) from HeadersSerializer
+ * - aggregation: Serialized aggregation using the provided aggregation
serializer
+ * <p>
+ * This is used by KIP-1271 to serialize aggregations with headers for session
state stores.
+ */
+class AggregationWithHeadersSerializer<AGG> implements
WrappingNullableSerializer<AggregationWithHeaders<AGG>, Void, AGG> {
+ public final Serializer<AGG> aggregationSerializer;
+
+ AggregationWithHeadersSerializer(final Serializer<AGG>
aggregationSerializer) {
+ Objects.requireNonNull(aggregationSerializer);
+ this.aggregationSerializer = aggregationSerializer;
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ aggregationSerializer.configure(configs, isKey);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final
AggregationWithHeaders<AGG> aggregationWithHeaders) {
+ if (aggregationWithHeaders == null) {
+ return null;
+ }
+ return serialize(topic, aggregationWithHeaders.aggregation(),
aggregationWithHeaders.headers());
+ }
+
+ private byte[] serialize(final String topic, final AGG plainAggregation,
final Headers headers) {
+ if (plainAggregation == null) {
+ return null;
+ }
+
+ final byte[] rawAggregation = aggregationSerializer.serialize(topic,
headers, plainAggregation);
+
+ if (rawAggregation == null) {
+ return null;
+ }
+
+ final byte[] rawHeaders = HeadersSerializer.serialize(headers);
+
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(baos)) {
+
+ ByteUtils.writeVarint(rawHeaders.length, out);
+ out.write(rawHeaders);
+ out.write(rawAggregation);
+
+ return baos.toByteArray();
+ } catch (final IOException e) {
+ throw new SerializationException("Failed to serialize
AggregationWithHeaders on topic: " + topic, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ aggregationSerializer.close();
+ }
+
+ @Override
+ public void setIfUnset(final SerdeGetter getter) {
+ initNullableSerializer(aggregationSerializer, getter);
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/AggregationWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/AggregationWithHeadersTest.java
new file mode 100644
index 00000000000..c7a6d025180
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/AggregationWithHeadersTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AggregationWithHeadersTest {
+
+ @Test
+ public void shouldCreateAggregationWithHeaders() {
+ final Long aggregation = 100L;
+ final Headers headers = new RecordHeaders();
+ headers.add("key1", "value1".getBytes());
+
+ final AggregationWithHeaders<Long> aggregationWithHeaders =
AggregationWithHeaders.make(aggregation, headers);
+
+ assertNotNull(aggregationWithHeaders);
+ assertEquals(aggregation, aggregationWithHeaders.aggregation());
+ assertEquals(headers, aggregationWithHeaders.headers());
+ }
+
+ @Test
+ public void shouldReturnNullForNullAggregation() {
+ final AggregationWithHeaders<Long> aggregationWithHeaders =
AggregationWithHeaders.make(null, new RecordHeaders());
+ assertNull(aggregationWithHeaders);
+ }
+
+ @Test
+ public void shouldNotCreateWithNullHeaders() {
+ final Long aggregation = 100L;
+ assertThrows(NullPointerException.class, () ->
AggregationWithHeaders.make(aggregation, null));
+ }
+
+ @Test
+ public void shouldAllowNullableAggregation() {
+ final AggregationWithHeaders<Long> aggregationWithHeaders =
AggregationWithHeaders.makeAllowNullable(null, new RecordHeaders());
+
+ assertNotNull(aggregationWithHeaders);
+ assertNull(aggregationWithHeaders.aggregation());
+ }
+
+ @Test
+ public void shouldGetAggregationOrNull() {
+ final Long aggregation = 100L;
+ final AggregationWithHeaders<Long> aggregationWithHeaders =
AggregationWithHeaders.make(aggregation, new RecordHeaders());
+
+ assertEquals(aggregation,
AggregationWithHeaders.getAggregationOrNull(aggregationWithHeaders));
+ assertNull(AggregationWithHeaders.getAggregationOrNull(null));
+ }
+
+ @Test
+ public void shouldImplementEquals() {
+ final Long aggregation = 100L;
+ final Headers headers1 = new RecordHeaders();
+ headers1.add("key1", "value1".getBytes());
+
+ final Headers headers2 = new RecordHeaders();
+ headers2.add("key1", "value1".getBytes());
+
+ final AggregationWithHeaders<Long> aggregationWithHeaders1 =
AggregationWithHeaders.make(aggregation, headers1);
+ final AggregationWithHeaders<Long> aggregationWithHeaders2 =
AggregationWithHeaders.make(aggregation, headers2);
+
+ assertEquals(aggregationWithHeaders1, aggregationWithHeaders2);
+ assertEquals(aggregationWithHeaders1.hashCode(),
aggregationWithHeaders2.hashCode());
+ }
+
+ @Test
+ public void shouldNotBeEqualWithDifferentAggregations() {
+ final Headers headers = new RecordHeaders();
+
+ final AggregationWithHeaders<Long> aggregationWithHeaders1 =
AggregationWithHeaders.make(100L, headers);
+ final AggregationWithHeaders<Long> aggregationWithHeaders2 =
AggregationWithHeaders.make(200L, headers);
+
+ assertNotEquals(aggregationWithHeaders1, aggregationWithHeaders2);
+ }
+
+ @Test
+ public void shouldNotBeEqualWithDifferentHeaders() {
+ final Long aggregation = 100L;
+
+ final Headers headers1 = new RecordHeaders();
+ headers1.add("key1", "value1".getBytes());
+
+ final Headers headers2 = new RecordHeaders();
+ headers2.add("key2", "value2".getBytes());
+
+ final AggregationWithHeaders<Long> aggregationWithHeaders1 =
AggregationWithHeaders.make(aggregation, headers1);
+ final AggregationWithHeaders<Long> aggregationWithHeaders2 =
AggregationWithHeaders.make(aggregation, headers2);
+
+ assertNotEquals(aggregationWithHeaders1, aggregationWithHeaders2);
+ }
+
+ @Test
+ public void shouldImplementToString() {
+ final Long aggregation = 100L;
+ final Headers headers = new RecordHeaders();
+ headers.add("key1", "value1".getBytes());
+
+ final AggregationWithHeaders<Long> aggregationWithHeaders =
AggregationWithHeaders.make(aggregation, headers);
+ final String toString = aggregationWithHeaders.toString();
+
+ assertNotNull(toString);
+ assertTrue(toString.contains("aggregation=100"));
+ assertTrue(toString.contains("headers="));
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
new file mode 100644
index 00000000000..d517f7387a2
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class AggregationWithHeadersDeserializerTest {
+
+ private final Deserializer<Long> longDeserializer =
Serdes.Long().deserializer();
+ private final AggregationWithHeadersDeserializer<Long> deserializer = new
AggregationWithHeadersDeserializer<>(longDeserializer);
+
+ @Test
+ public void shouldDeserializeNullAsNull() {
+ final AggregationWithHeaders<Long> result =
deserializer.deserialize("topic", null);
+ assertNull(result);
+ }
+
+ @Test
+ public void shouldDeserializeAggregationWithEmptyHeaders() {
+ final Long aggregation = 100L;
+ final Headers headers = new RecordHeaders();
+ final AggregationWithHeaders<Long> aggregationWithHeaders =
AggregationWithHeaders.make(aggregation, headers);
+
+ final AggregationWithHeadersSerializer<Long> serializer = new
AggregationWithHeadersSerializer<>(Serdes.Long().serializer());
+ final byte[] serialized = serializer.serialize("topic",
aggregationWithHeaders);
+
+ final AggregationWithHeaders<Long> result =
deserializer.deserialize("topic", serialized);
+
+ assertNotNull(result);
+ assertEquals(aggregation, result.aggregation());
+ assertNotNull(result.headers());
+ }
+
+ @Test
+ public void shouldDeserializeAggregationWithHeaders() {
+ final Long aggregation = 100L;
+ final Headers headers = new RecordHeaders();
+ headers.add("key1", "value1".getBytes());
+ headers.add("key2", "value2".getBytes());
+ final AggregationWithHeaders<Long> aggregationWithHeaders =
AggregationWithHeaders.make(aggregation, headers);
+
+ final AggregationWithHeadersSerializer<Long> serializer = new
AggregationWithHeadersSerializer<>(Serdes.Long().serializer());
+ final byte[] serialized = serializer.serialize("topic",
aggregationWithHeaders);
+
+ final AggregationWithHeaders<Long> result =
deserializer.deserialize("topic", serialized);
+
+ assertNotNull(result);
+ assertEquals(aggregation, result.aggregation());
+ assertNotNull(result.headers());
+
+ final Iterator<Header> iterator = result.headers().iterator();
+ final Header header1 = iterator.next();
+ assertEquals("key1", header1.key());
+ assertArrayEquals("value1".getBytes(), header1.value());
+
+ final Header header2 = iterator.next();
+ assertEquals("key2", header2.key());
+ assertArrayEquals("value2".getBytes(), header2.value());
+ }
+
+ @Test
+ public void shouldThrowOnInvalidFormat() {
+ final byte[] invalidData = new byte[]{0x01, 0x02};
+ assertThrows(SerializationException.class, () ->
deserializer.deserialize("topic", invalidData));
+ }
+
+ @Test
+ public void shouldExtractHeaders() {
+ final Long aggregation = 100L;
+ final Headers headers = new RecordHeaders();
+ headers.add("key1", "value1".getBytes());
+ final AggregationWithHeaders<Long> aggregationWithHeaders =
AggregationWithHeaders.make(aggregation, headers);
+
+ final AggregationWithHeadersSerializer<Long> serializer = new
AggregationWithHeadersSerializer<>(Serdes.Long().serializer());
+ final byte[] serialized = serializer.serialize("topic",
aggregationWithHeaders);
+
+ final Headers extractedHeaders =
AggregationWithHeadersDeserializer.headers(serialized);
+ assertNotNull(extractedHeaders);
+
+ final Header header = extractedHeaders.iterator().next();
+ assertEquals("key1", header.key());
+ assertArrayEquals("value1".getBytes(), header.value());
+ }
+
+ @Test
+ public void shouldReturnNullForNullInput() {
+ assertNull(AggregationWithHeadersDeserializer.headers(null));
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializerTest.java
new file mode 100644
index 00000000000..d7a61d0378d
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AggregationWithHeadersSerializerTest {
+
+ private final Serializer<Long> longSerializer = Serdes.Long().serializer();
+ private final AggregationWithHeadersSerializer<Long> serializer = new
AggregationWithHeadersSerializer<>(longSerializer);
+
+ @Test
+ public void shouldSerializeNullAsNull() {
+ final byte[] result = serializer.serialize("topic", null);
+ assertNull(result);
+ }
+
+ @Test
+ public void shouldSerializeAggregationWithEmptyHeaders() {
+ final Long aggregation = 100L;
+ final Headers headers = new RecordHeaders();
+ final AggregationWithHeaders<Long> aggregationWithHeaders =
AggregationWithHeaders.make(aggregation, headers);
+
+ final byte[] result = serializer.serialize("topic",
aggregationWithHeaders);
+
+ assertNotNull(result);
+ assertTrue(result.length > 0);
+ }
+
+ @Test
+ public void shouldSerializeAggregationWithHeaders() {
+ final Long aggregation = 100L;
+ final Headers headers = new RecordHeaders();
+ headers.add("key1", "value1".getBytes());
+ headers.add("key2", "value2".getBytes());
+ final AggregationWithHeaders<Long> aggregationWithHeaders =
AggregationWithHeaders.make(aggregation, headers);
+
+ final byte[] result = serializer.serialize("topic",
aggregationWithHeaders);
+
+ assertNotNull(result);
+ assertTrue(result.length > 0);
+ }
+
+ @Test
+ public void shouldSerializeAndDeserializeConsistently() {
+ final Long aggregation = 100L;
+ final Headers headers = new RecordHeaders();
+ headers.add("key1", "value1".getBytes());
+ headers.add("key2", null);
+ final AggregationWithHeaders<Long> aggregationWithHeaders =
AggregationWithHeaders.make(aggregation, headers);
+
+ final byte[] serialized = serializer.serialize("topic",
aggregationWithHeaders);
+ final AggregationWithHeadersDeserializer<Long> deserializer = new
AggregationWithHeadersDeserializer<>(Serdes.Long().deserializer());
+ final AggregationWithHeaders<Long> deserialized =
deserializer.deserialize("topic", serialized);
+
+ assertNotNull(deserialized);
+ assertEquals(aggregation, deserialized.aggregation());
+ assertNotNull(deserialized.headers());
+ }
+
+ @Test
+ public void shouldHandleNullAggregationInAggregationWithHeaders() {
+ final AggregationWithHeaders<Long> aggregationWithHeaders =
AggregationWithHeaders.makeAllowNullable(null, new RecordHeaders());
+ final byte[] result = serializer.serialize("topic",
aggregationWithHeaders);
+
+ assertNull(result);
+ }
+}
\ No newline at end of file