This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0025e0757fa KAFKA-20173: Propagate headers into serde 2/N (#21536)
0025e0757fa is described below

commit 0025e0757faf32d0bf855aff8005baa4f56d721f
Author: Uladzislau Blok <[email protected]>
AuthorDate: Sun Mar 8 20:59:42 2026 +0100

    KAFKA-20173: Propagate headers into serde 2/N (#21536)
    
    Follow-up for https://github.com/apache/kafka/pull/21490
    
    This PR concentrate on `WindowKeySchema` and related classes:
    - `PrefixedWindowKeySchemas`
    - `TimeWindowedSerializer`
    - `TimeWindowedDeserializer`
    
    Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../streams/kstream/TimeWindowedDeserializer.java  | 11 +++++++--
 .../streams/kstream/TimeWindowedSerializer.java    |  9 +++++++-
 ...rgedSortedCacheWindowStoreKeyValueIterator.java |  6 +++--
 .../state/internals/PrefixedWindowKeySchemas.java  |  7 ++++--
 .../streams/state/internals/WindowKeySchema.java   | 24 +++++++++++++++----
 .../kstream/SessionWindowedDeserializerTest.java   |  9 ++++----
 .../kstream/SessionWindowedSerializerTest.java     |  4 ++--
 .../kstream/TimeWindowedDeserializerTest.java      | 27 ++++++++++++++++++++++
 .../kstream/TimeWindowedSerializerTest.java        | 27 ++++++++++++++++++++++
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java |  1 +
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |  5 ++--
 .../state/internals/WindowKeySchemaTest.java       |  7 +++---
 12 files changed, 113 insertions(+), 24 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
index 26fcbac785f..4725974c7da 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Utils;
@@ -70,6 +72,11 @@ public class TimeWindowedDeserializer<T> implements 
Deserializer<Windowed<T>> {
 
     @Override
     public Windowed<T> deserialize(final String topic, final byte[] data) {
+        return deserialize(topic, new RecordHeaders(), data);
+    }
+
+    @Override
+    public Windowed<T> deserialize(final String topic, final Headers headers, 
final byte[] data) {
         WindowedSerdes.verifyInnerDeserializerNotNull(inner, this);
 
         if (data == null || data.length == 0) {
@@ -78,11 +85,11 @@ public class TimeWindowedDeserializer<T> implements 
Deserializer<Windowed<T>> {
 
         // toStoreKeyBinary was used to serialize the data.
         if (this.isChangelogTopic) {
-            return WindowKeySchema.fromStoreKey(data, windowSize, inner, 
topic);
+            return WindowKeySchema.fromStoreKey(data, windowSize, inner, 
headers, topic);
         }
 
         // toBinary was used to serialize the data
-        return WindowKeySchema.from(data, windowSize, inner, topic);
+        return WindowKeySchema.from(data, windowSize, inner, headers, topic);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
index 7cd13afc1d3..b185d830e9e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
@@ -88,13 +90,18 @@ public class TimeWindowedSerializer<T> implements 
WindowedSerializer<T> {
 
     @Override
     public byte[] serialize(final String topic, final Windowed<T> data) {
+        return serialize(topic, new RecordHeaders(), data);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final Headers headers, final 
Windowed<T> data) {
         WindowedSerdes.verifyInnerSerializerNotNull(inner, this);
 
         if (data == null) {
             return null;
         }
 
-        return WindowKeySchema.toBinary(data, inner, topic);
+        return WindowKeySchema.toBinary(data, inner, headers, topic);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
index 28df5a50538..abcdf93f002 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
@@ -76,7 +78,7 @@ class MergedSortedCacheWindowStoreKeyValueIterator
     @Override
     Windowed<Bytes> deserializeCacheKey(final Bytes cacheKey) {
         final byte[] binaryKey = cacheFunction.key(cacheKey).get();
-        return storeKeyToWindowKey.toWindowKey(binaryKey, windowSize, 
serdes.keyDeserializer(), serdes.topic());
+        return storeKeyToWindowKey.toWindowKey(binaryKey, windowSize, 
serdes.keyDeserializer(), new RecordHeaders(), serdes.topic());
     }
 
     @Override
@@ -92,7 +94,7 @@ class MergedSortedCacheWindowStoreKeyValueIterator
 
     @FunctionalInterface
     interface StoreKeyToWindowKey {
-        Windowed<Bytes> toWindowKey(final byte[] binaryKey, final long 
windowSize, final Deserializer<Bytes> deserializer, final String topic);
+        Windowed<Bytes> toWindowKey(final byte[] binaryKey, final long 
windowSize, final Deserializer<Bytes> deserializer, final Headers headers, 
final String topic);
     }
 
     @FunctionalInterface
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
index 1b149f5e0f2..d8ef4acedc7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Window;
@@ -169,8 +170,9 @@ public class PrefixedWindowKeySchemas {
         public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
                                                    final long windowSize,
                                                    final Deserializer<K> 
deserializer,
+                                                   final Headers headers,
                                                    final String topic) {
-            final K key = deserializer.deserialize(topic, 
extractStoreKeyBytes(binaryKey));
+            final K key = deserializer.deserialize(topic, headers, 
extractStoreKeyBytes(binaryKey));
             final Window window = extractStoreWindow(binaryKey, windowSize);
             return new Windowed<>(key, window);
         }
@@ -376,8 +378,9 @@ public class PrefixedWindowKeySchemas {
         public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
                                                    final long windowSize,
                                                    final Deserializer<K> 
deserializer,
+                                                   final Headers headers,
                                                    final String topic) {
-            final K key = deserializer.deserialize(topic, 
extractStoreKeyBytes(binaryKey));
+            final K key = deserializer.deserialize(topic, headers, 
extractStoreKeyBytes(binaryKey));
             final Window window = extractStoreWindow(binaryKey, windowSize);
             return new Windowed<>(key, window);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 9178da500d1..2d9c8705cc4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -125,11 +126,11 @@ public class WindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
     }
 
     // for pipe serdes
-
     public static <K> byte[] toBinary(final Windowed<K> timeKey,
                                       final Serializer<K> serializer,
+                                      final Headers headers,
                                       final String topic) {
-        final byte[] bytes = serializer.serialize(topic, timeKey.key());
+        final byte[] bytes = serializer.serialize(topic, headers, 
timeKey.key());
         final ByteBuffer buf = ByteBuffer.allocate(bytes.length + 
TIMESTAMP_SIZE);
         buf.put(bytes);
         buf.putLong(timeKey.window().start());
@@ -140,10 +141,11 @@ public class WindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
     public static <K> Windowed<K> from(final byte[] binaryKey,
                                        final long windowSize,
                                        final Deserializer<K> deserializer,
+                                       final Headers headers,
                                        final String topic) {
         final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE];
         System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
-        final K key = deserializer.deserialize(topic, bytes);
+        final K key = deserializer.deserialize(topic, headers, bytes);
         final Window window = extractWindow(binaryKey, windowSize);
         return new Windowed<>(key, window);
     }
@@ -156,7 +158,6 @@ public class WindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
     }
 
     // for store serdes
-
     public static Bytes toStoreKeyBinary(final Bytes key,
                                          final long timestamp,
                                          final int seqnum) {
@@ -218,6 +219,8 @@ public class WindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
         return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - 
SEQNUM_SIZE);
     }
 
+    // TODO: Remove this method when MeteredWindowStore will use headers 
version
+    @Deprecated
     public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
                                                final long windowSize,
                                                final Deserializer<K> 
deserializer,
@@ -227,10 +230,21 @@ public class WindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
         return new Windowed<>(key, window);
     }
 
+    public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
+                                               final long windowSize,
+                                               final Deserializer<K> 
deserializer,
+                                               final Headers headers,
+                                               final String topic) {
+        final K key = deserializer.deserialize(topic, headers, 
extractStoreKeyBytes(binaryKey));
+        final Window window = extractStoreWindow(binaryKey, windowSize);
+        return new Windowed<>(key, window);
+    }
+
     public static <K> Windowed<K> fromStoreKey(final Windowed<Bytes> 
windowedKey,
                                                final Deserializer<K> 
deserializer,
+                                               final Headers headers,
                                                final String topic) {
-        final K key = deserializer.deserialize(topic, windowedKey.key().get());
+        final K key = deserializer.deserialize(topic, headers, 
windowedKey.key().get());
         return new Windowed<>(key, windowedKey.window());
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
index 0635457783f..35adaed1ee3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 
 import org.junit.jupiter.api.Test;
 
@@ -118,14 +118,13 @@ public class SessionWindowedDeserializerTest {
     }
 
     @Test
-    public void shouldPassHeadersToUnderlyingSerializer() {
+    public void shouldPassHeadersToUnderlyingDeserializer() {
         final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
         when(mockDeserializer.deserialize(anyString(), any(Headers.class), 
any(byte[].class))).thenReturn("test-value");
 
-        final String key = "test-key";
-        final Windowed<String> windowed = new Windowed<>(key, new 
TimeWindow(0, 1));
-        final byte[] data = new 
SessionWindowedSerializer<>(Serdes.String().serializer()).serialize("dummy", 
windowed);
         final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+        final Windowed<String> windowed = new Windowed<>("test-key", new 
SessionWindow(0, 1));
+        final byte[] data = new 
SessionWindowedSerializer<>(Serdes.String().serializer()).serialize("dummy", 
headers, windowed);
 
         final SessionWindowedDeserializer<String> testDeserializer = new 
SessionWindowedDeserializer<>(mockDeserializer);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
index 649450e14a9..d2e15f542a1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 
 import org.junit.jupiter.api.Test;
 
@@ -123,7 +123,7 @@ public class SessionWindowedSerializerTest {
         when(mockSerializer.serialize(anyString(), any(Headers.class), 
anyString())).thenReturn("test-value".getBytes());
 
         final String key = "test-key";
-        final Windowed<String> data = new Windowed<>(key, new TimeWindow(0, 
1));
+        final Windowed<String> data = new Windowed<>(key, new SessionWindow(0, 
1));
         final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
 
         final SessionWindowedSerializer<String> testSerializer = new 
SessionWindowedSerializer<>(mockSerializer);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
index d2485c7785d..0b6f17a5f27 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
@@ -17,11 +17,14 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 
 import org.junit.jupiter.api.Test;
 
@@ -34,6 +37,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TimeWindowedDeserializerTest {
     private final long windowSize = 5000000;
@@ -163,4 +173,21 @@ public class TimeWindowedDeserializerTest {
         props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS, 
"some.non.existent.class");
         assertThrows(ConfigException.class, () -> 
timeWindowedDeserializer.configure(props, false));
     }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingDeserializer() {
+        final Deserializer<String> mockDeserializer = 
mock(StringDeserializer.class);
+        when(mockDeserializer.deserialize(anyString(), any(Headers.class), 
any(byte[].class))).thenReturn("test-value");
+
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+        final Windowed<String> windowed = new Windowed<>("test-key", new 
TimeWindow(0, 1));
+        final byte[] data = new 
TimeWindowedSerializer<>(Serdes.String().serializer()).serialize("dummy", 
headers, windowed);
+
+        final TimeWindowedDeserializer<String> testDeserializer = new 
TimeWindowedDeserializer<>(mockDeserializer, 1L);
+
+        testDeserializer.deserialize("dummy", headers, data);
+
+        verify(mockDeserializer).deserialize(anyString(), eq(headers), 
any(byte[].class));
+        verify(mockDeserializer, never()).deserialize(anyString(), 
any(byte[].class));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
index 5fd96f72c1b..54dcfa57142 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
@@ -17,11 +17,14 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 
 import org.junit.jupiter.api.Test;
 
@@ -31,6 +34,13 @@ import java.util.Map;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TimeWindowedSerializerTest {
     private final TimeWindowedSerializer<?> timeWindowedSerializer = new 
TimeWindowedSerializer<>(Serdes.String().serializer());
@@ -106,4 +116,21 @@ public class TimeWindowedSerializerTest {
         props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS, 
"some.non.existent.class");
         assertThrows(ConfigException.class, () -> 
timeWindowedSerializer.configure(props, false));
     }
+
+    @Test
+    public void shouldPassHeadersToUnderlyingSerializer() {
+        final Serializer<String> mockSerializer = mock(StringSerializer.class);
+        when(mockSerializer.serialize(anyString(), any(Headers.class), 
anyString())).thenReturn("test-value".getBytes());
+
+        final String key = "test-key";
+        final Windowed<String> data = new Windowed<>(key, new TimeWindow(0, 
1));
+        final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes());
+
+        final TimeWindowedSerializer<String> testSerializer = new 
TimeWindowedSerializer<>(mockSerializer);
+
+        testSerializer.serialize("dummy", headers, data);
+
+        verify(mockSerializer).serialize(anyString(), eq(headers), eq(key));
+        verify(mockSerializer, never()).serialize(anyString(), eq(key));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 35503edb9cc..d3c7c2dec63 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1689,6 +1689,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                             next.key.get(),
                             windowSizeForTimeWindow,
                             stateSerdes.keyDeserializer(),
+                            new RecordHeaders(),
                             stateSerdes.topic()
                         ),
                         stateSerdes.valueDeserializer().deserialize("dummy", 
next.value)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 1e8e4ea0a42..38cb1a268fe 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -896,15 +896,16 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                             next.key.get(),
                             windowSizeForTimeWindow,
                             stateSerdes.keyDeserializer(),
+                            new RecordHeaders(),
                             stateSerdes.topic()
                         ),
-                        stateSerdes.valueDeserializer().deserialize("dummy", 
next.value)
+                        stateSerdes.valueDeserializer().deserialize("dummy", 
new RecordHeaders(), next.value)
                     );
                     results.add(deserialized);
                 } else if (schema instanceof SessionKeySchema) {
                     final KeyValue<Windowed<String>, Long> deserialized = 
KeyValue.pair(
                         SessionKeySchema.from(next.key.get(), 
stateSerdes.keyDeserializer(), new RecordHeaders(), "dummy"),
-                        stateSerdes.valueDeserializer().deserialize("dummy", 
next.value)
+                        stateSerdes.valueDeserializer().deserialize("dummy", 
new RecordHeaders(), next.value)
                     );
                     results.add(deserialized);
                 } else {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
index b170c750d7a..597fe5a8288 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -480,13 +481,13 @@ public class WindowKeySchemaTest {
         final Windowed<String> result;
         if (schemaType == SchemaType.WindowKeySchema) {
             result = WindowKeySchema.fromStoreKey(serialized.get(),
-                endTime - startTime, stateSerdes.keyDeserializer(), 
stateSerdes.topic());
+                endTime - startTime, stateSerdes.keyDeserializer(), new 
RecordHeaders(), stateSerdes.topic());
         } else if (schemaType == SchemaType.PrefixedTimeFirstSchema) {
             result = TimeFirstWindowKeySchema.fromStoreKey(serialized.get(),
-                endTime - startTime, stateSerdes.keyDeserializer(), 
stateSerdes.topic());
+                endTime - startTime, stateSerdes.keyDeserializer(), new 
RecordHeaders(), stateSerdes.topic());
         } else {
             result = KeyFirstWindowKeySchema.fromStoreKey(serialized.get(),
-                endTime - startTime, stateSerdes.keyDeserializer(), 
stateSerdes.topic());
+                endTime - startTime, stateSerdes.keyDeserializer(), new 
RecordHeaders(), stateSerdes.topic());
         }
         assertEquals(windowedKey, result);
     }

Reply via email to