This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0025e0757fa KAFKA-20173: Propagate headers into serde 2/N (#21536)
0025e0757fa is described below
commit 0025e0757faf32d0bf855aff8005baa4f56d721f
Author: Uladzislau Blok <[email protected]>
AuthorDate: Sun Mar 8 20:59:42 2026 +0100
KAFKA-20173: Propagate headers into serde 2/N (#21536)
Follow-up for https://github.com/apache/kafka/pull/21490
This PR concentrate on `WindowKeySchema` and related classes:
- `PrefixedWindowKeySchemas`
- `TimeWindowedSerializer`
- `TimeWindowedDeserializer`
Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../streams/kstream/TimeWindowedDeserializer.java | 11 +++++++--
.../streams/kstream/TimeWindowedSerializer.java | 9 +++++++-
...rgedSortedCacheWindowStoreKeyValueIterator.java | 6 +++--
.../state/internals/PrefixedWindowKeySchemas.java | 7 ++++--
.../streams/state/internals/WindowKeySchema.java | 24 +++++++++++++++----
.../kstream/SessionWindowedDeserializerTest.java | 9 ++++----
.../kstream/SessionWindowedSerializerTest.java | 4 ++--
.../kstream/TimeWindowedDeserializerTest.java | 27 ++++++++++++++++++++++
.../kstream/TimeWindowedSerializerTest.java | 27 ++++++++++++++++++++++
...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 1 +
.../AbstractRocksDBSegmentedBytesStoreTest.java | 5 ++--
.../state/internals/WindowKeySchemaTest.java | 7 +++---
12 files changed, 113 insertions(+), 24 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
index 26fcbac785f..4725974c7da 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Utils;
@@ -70,6 +72,11 @@ public class TimeWindowedDeserializer<T> implements
Deserializer<Windowed<T>> {
@Override
public Windowed<T> deserialize(final String topic, final byte[] data) {
+ return deserialize(topic, new RecordHeaders(), data);
+ }
+
+ @Override
+ public Windowed<T> deserialize(final String topic, final Headers headers,
final byte[] data) {
WindowedSerdes.verifyInnerDeserializerNotNull(inner, this);
if (data == null || data.length == 0) {
@@ -78,11 +85,11 @@ public class TimeWindowedDeserializer<T> implements
Deserializer<Windowed<T>> {
// toStoreKeyBinary was used to serialize the data.
if (this.isChangelogTopic) {
- return WindowKeySchema.fromStoreKey(data, windowSize, inner,
topic);
+ return WindowKeySchema.fromStoreKey(data, windowSize, inner,
headers, topic);
}
// toBinary was used to serialize the data
- return WindowKeySchema.from(data, windowSize, inner, topic);
+ return WindowKeySchema.from(data, windowSize, inner, headers, topic);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
index 7cd13afc1d3..b185d830e9e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
@@ -88,13 +90,18 @@ public class TimeWindowedSerializer<T> implements
WindowedSerializer<T> {
@Override
public byte[] serialize(final String topic, final Windowed<T> data) {
+ return serialize(topic, new RecordHeaders(), data);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers, final
Windowed<T> data) {
WindowedSerdes.verifyInnerSerializerNotNull(inner, this);
if (data == null) {
return null;
}
- return WindowKeySchema.toBinary(data, inner, topic);
+ return WindowKeySchema.toBinary(data, inner, headers, topic);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
index 28df5a50538..abcdf93f002 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
@@ -76,7 +78,7 @@ class MergedSortedCacheWindowStoreKeyValueIterator
@Override
Windowed<Bytes> deserializeCacheKey(final Bytes cacheKey) {
final byte[] binaryKey = cacheFunction.key(cacheKey).get();
- return storeKeyToWindowKey.toWindowKey(binaryKey, windowSize,
serdes.keyDeserializer(), serdes.topic());
+ return storeKeyToWindowKey.toWindowKey(binaryKey, windowSize,
serdes.keyDeserializer(), new RecordHeaders(), serdes.topic());
}
@Override
@@ -92,7 +94,7 @@ class MergedSortedCacheWindowStoreKeyValueIterator
@FunctionalInterface
interface StoreKeyToWindowKey {
- Windowed<Bytes> toWindowKey(final byte[] binaryKey, final long
windowSize, final Deserializer<Bytes> deserializer, final String topic);
+ Windowed<Bytes> toWindowKey(final byte[] binaryKey, final long
windowSize, final Deserializer<Bytes> deserializer, final Headers headers,
final String topic);
}
@FunctionalInterface
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
index 1b149f5e0f2..d8ef4acedc7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Window;
@@ -169,8 +170,9 @@ public class PrefixedWindowKeySchemas {
public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
final long windowSize,
final Deserializer<K>
deserializer,
+ final Headers headers,
final String topic) {
- final K key = deserializer.deserialize(topic,
extractStoreKeyBytes(binaryKey));
+ final K key = deserializer.deserialize(topic, headers,
extractStoreKeyBytes(binaryKey));
final Window window = extractStoreWindow(binaryKey, windowSize);
return new Windowed<>(key, window);
}
@@ -376,8 +378,9 @@ public class PrefixedWindowKeySchemas {
public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
final long windowSize,
final Deserializer<K>
deserializer,
+ final Headers headers,
final String topic) {
- final K key = deserializer.deserialize(topic,
extractStoreKeyBytes(binaryKey));
+ final K key = deserializer.deserialize(topic, headers,
extractStoreKeyBytes(binaryKey));
final Window window = extractStoreWindow(binaryKey, windowSize);
return new Windowed<>(key, window);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 9178da500d1..2d9c8705cc4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
@@ -125,11 +126,11 @@ public class WindowKeySchema implements
RocksDBSegmentedBytesStore.KeySchema {
}
// for pipe serdes
-
public static <K> byte[] toBinary(final Windowed<K> timeKey,
final Serializer<K> serializer,
+ final Headers headers,
final String topic) {
- final byte[] bytes = serializer.serialize(topic, timeKey.key());
+ final byte[] bytes = serializer.serialize(topic, headers,
timeKey.key());
final ByteBuffer buf = ByteBuffer.allocate(bytes.length +
TIMESTAMP_SIZE);
buf.put(bytes);
buf.putLong(timeKey.window().start());
@@ -140,10 +141,11 @@ public class WindowKeySchema implements
RocksDBSegmentedBytesStore.KeySchema {
public static <K> Windowed<K> from(final byte[] binaryKey,
final long windowSize,
final Deserializer<K> deserializer,
+ final Headers headers,
final String topic) {
final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE];
System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
- final K key = deserializer.deserialize(topic, bytes);
+ final K key = deserializer.deserialize(topic, headers, bytes);
final Window window = extractWindow(binaryKey, windowSize);
return new Windowed<>(key, window);
}
@@ -156,7 +158,6 @@ public class WindowKeySchema implements
RocksDBSegmentedBytesStore.KeySchema {
}
// for store serdes
-
public static Bytes toStoreKeyBinary(final Bytes key,
final long timestamp,
final int seqnum) {
@@ -218,6 +219,8 @@ public class WindowKeySchema implements
RocksDBSegmentedBytesStore.KeySchema {
return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length -
SEQNUM_SIZE);
}
+ // TODO: Remove this method when MeteredWindowStore will use headers
version
+ @Deprecated
public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
final long windowSize,
final Deserializer<K>
deserializer,
@@ -227,10 +230,21 @@ public class WindowKeySchema implements
RocksDBSegmentedBytesStore.KeySchema {
return new Windowed<>(key, window);
}
+ public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
+ final long windowSize,
+ final Deserializer<K>
deserializer,
+ final Headers headers,
+ final String topic) {
+ final K key = deserializer.deserialize(topic, headers,
extractStoreKeyBytes(binaryKey));
+ final Window window = extractStoreWindow(binaryKey, windowSize);
+ return new Windowed<>(key, window);
+ }
+
public static <K> Windowed<K> fromStoreKey(final Windowed<Bytes>
windowedKey,
final Deserializer<K>
deserializer,
+ final Headers headers,
final String topic) {
- final K key = deserializer.deserialize(topic, windowedKey.key().get());
+ final K key = deserializer.deserialize(topic, headers,
windowedKey.key().get());
return new Windowed<>(key, windowedKey.window());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
index 0635457783f..35adaed1ee3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializerTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.junit.jupiter.api.Test;
@@ -118,14 +118,13 @@ public class SessionWindowedDeserializerTest {
}
@Test
- public void shouldPassHeadersToUnderlyingSerializer() {
+ public void shouldPassHeadersToUnderlyingDeserializer() {
final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
when(mockDeserializer.deserialize(anyString(), any(Headers.class),
any(byte[].class))).thenReturn("test-value");
- final String key = "test-key";
- final Windowed<String> windowed = new Windowed<>(key, new
TimeWindow(0, 1));
- final byte[] data = new
SessionWindowedSerializer<>(Serdes.String().serializer()).serialize("dummy",
windowed);
final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+ final Windowed<String> windowed = new Windowed<>("test-key", new
SessionWindow(0, 1));
+ final byte[] data = new
SessionWindowedSerializer<>(Serdes.String().serializer()).serialize("dummy",
headers, windowed);
final SessionWindowedDeserializer<String> testDeserializer = new
SessionWindowedDeserializer<>(mockDeserializer);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
index 649450e14a9..d2e15f542a1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowedSerializerTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.junit.jupiter.api.Test;
@@ -123,7 +123,7 @@ public class SessionWindowedSerializerTest {
when(mockSerializer.serialize(anyString(), any(Headers.class),
anyString())).thenReturn("test-value".getBytes());
final String key = "test-key";
- final Windowed<String> data = new Windowed<>(key, new TimeWindow(0,
1));
+ final Windowed<String> data = new Windowed<>(key, new SessionWindow(0,
1));
final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
final SessionWindowedSerializer<String> testSerializer = new
SessionWindowedSerializer<>(mockSerializer);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
index d2485c7785d..0b6f17a5f27 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializerTest.java
@@ -17,11 +17,14 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.junit.jupiter.api.Test;
@@ -34,6 +37,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TimeWindowedDeserializerTest {
private final long windowSize = 5000000;
@@ -163,4 +173,21 @@ public class TimeWindowedDeserializerTest {
props.put(TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS,
"some.non.existent.class");
assertThrows(ConfigException.class, () ->
timeWindowedDeserializer.configure(props, false));
}
+
+ @Test
+ public void shouldPassHeadersToUnderlyingDeserializer() {
+ final Deserializer<String> mockDeserializer =
mock(StringDeserializer.class);
+ when(mockDeserializer.deserialize(anyString(), any(Headers.class),
any(byte[].class))).thenReturn("test-value");
+
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+ final Windowed<String> windowed = new Windowed<>("test-key", new
TimeWindow(0, 1));
+ final byte[] data = new
TimeWindowedSerializer<>(Serdes.String().serializer()).serialize("dummy",
headers, windowed);
+
+ final TimeWindowedDeserializer<String> testDeserializer = new
TimeWindowedDeserializer<>(mockDeserializer, 1L);
+
+ testDeserializer.deserialize("dummy", headers, data);
+
+ verify(mockDeserializer).deserialize(anyString(), eq(headers),
any(byte[].class));
+ verify(mockDeserializer, never()).deserialize(anyString(),
any(byte[].class));
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
index 5fd96f72c1b..54dcfa57142 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowedSerializerTest.java
@@ -17,11 +17,14 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.junit.jupiter.api.Test;
@@ -31,6 +34,13 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TimeWindowedSerializerTest {
private final TimeWindowedSerializer<?> timeWindowedSerializer = new
TimeWindowedSerializer<>(Serdes.String().serializer());
@@ -106,4 +116,21 @@ public class TimeWindowedSerializerTest {
props.put(TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS,
"some.non.existent.class");
assertThrows(ConfigException.class, () ->
timeWindowedSerializer.configure(props, false));
}
+
+ @Test
+ public void shouldPassHeadersToUnderlyingSerializer() {
+ final Serializer<String> mockSerializer = mock(StringSerializer.class);
+ when(mockSerializer.serialize(anyString(), any(Headers.class),
anyString())).thenReturn("test-value".getBytes());
+
+ final String key = "test-key";
+ final Windowed<String> data = new Windowed<>(key, new TimeWindow(0,
1));
+ final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes());
+
+ final TimeWindowedSerializer<String> testSerializer = new
TimeWindowedSerializer<>(mockSerializer);
+
+ testSerializer.serialize("dummy", headers, data);
+
+ verify(mockSerializer).serialize(anyString(), eq(headers), eq(key));
+ verify(mockSerializer, never()).serialize(anyString(), eq(key));
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 35503edb9cc..d3c7c2dec63 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1689,6 +1689,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
next.key.get(),
windowSizeForTimeWindow,
stateSerdes.keyDeserializer(),
+ new RecordHeaders(),
stateSerdes.topic()
),
stateSerdes.valueDeserializer().deserialize("dummy",
next.value)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 1e8e4ea0a42..38cb1a268fe 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -896,15 +896,16 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
next.key.get(),
windowSizeForTimeWindow,
stateSerdes.keyDeserializer(),
+ new RecordHeaders(),
stateSerdes.topic()
),
- stateSerdes.valueDeserializer().deserialize("dummy",
next.value)
+ stateSerdes.valueDeserializer().deserialize("dummy",
new RecordHeaders(), next.value)
);
results.add(deserialized);
} else if (schema instanceof SessionKeySchema) {
final KeyValue<Windowed<String>, Long> deserialized =
KeyValue.pair(
SessionKeySchema.from(next.key.get(),
stateSerdes.keyDeserializer(), new RecordHeaders(), "dummy"),
- stateSerdes.valueDeserializer().deserialize("dummy",
next.value)
+ stateSerdes.valueDeserializer().deserialize("dummy",
new RecordHeaders(), next.value)
);
results.add(deserialized);
} else {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
index b170c750d7a..597fe5a8288 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@@ -480,13 +481,13 @@ public class WindowKeySchemaTest {
final Windowed<String> result;
if (schemaType == SchemaType.WindowKeySchema) {
result = WindowKeySchema.fromStoreKey(serialized.get(),
- endTime - startTime, stateSerdes.keyDeserializer(),
stateSerdes.topic());
+ endTime - startTime, stateSerdes.keyDeserializer(), new
RecordHeaders(), stateSerdes.topic());
} else if (schemaType == SchemaType.PrefixedTimeFirstSchema) {
result = TimeFirstWindowKeySchema.fromStoreKey(serialized.get(),
- endTime - startTime, stateSerdes.keyDeserializer(),
stateSerdes.topic());
+ endTime - startTime, stateSerdes.keyDeserializer(), new
RecordHeaders(), stateSerdes.topic());
} else {
result = KeyFirstWindowKeySchema.fromStoreKey(serialized.get(),
- endTime - startTime, stateSerdes.keyDeserializer(),
stateSerdes.topic());
+ endTime - startTime, stateSerdes.keyDeserializer(), new
RecordHeaders(), stateSerdes.topic());
}
assertEquals(windowedKey, result);
}