This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7ed35ab26b7 KAFKA-20134: Implement TimestampedWindowStoreWithHeaders 
(5/N) (#21497)
7ed35ab26b7 is described below

commit 7ed35ab26b773f29f4c45210bf088f6065e1bfeb
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Mar 3 07:39:44 2026 +0000

    KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (5/N) (#21497)
    
    This PR adds required classes or modifies the existing ones to build the
    `TimestampedWindowStoreWithHeaders` introduced in KIP-1271.
    
    Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../TimestampedWindowStoreWithHeadersTest.java     | 551 +++++++++++++++++++++
 .../internals/AbstractReadWriteDecorator.java      |  12 +
 .../org/apache/kafka/streams/state/Stores.java     |  50 ++
 .../MeteredTimestampedWindowStoreWithHeaders.java  |   2 +-
 .../TimestampedToHeadersIteratorAdapter.java       |   3 +
 .../TimestampedToHeadersWindowStoreAdapter.java    | 234 +++++++++
 .../TimestampedWindowStoreWithHeadersBuilder.java  | 229 +++++++++
 ...mestampedWindowStoreWithHeadersBuilderTest.java | 205 ++++++++
 .../apache/kafka/streams/TopologyTestDriver.java   |  34 ++
 9 files changed, 1319 insertions(+), 1 deletion(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
new file mode 100644
index 00000000000..1ae592e8901
--- /dev/null
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class TimestampedWindowStoreWithHeadersTest {
+
+    private static final String STORE_NAME = "headers-window-store";
+    private static final long WINDOW_SIZE_MS = 100L;
+    private static final long RETENTION_MS = 1000L;
+
+    private String inputStream;
+    private String outputStream;
+    private long baseTimestamp;
+
+    private KafkaStreams kafkaStreams;
+
+    private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    private static final Headers HEADERS1 = new RecordHeaders()
+        .add("source", "test".getBytes())
+        .add("version", "1.0".getBytes());
+
+    private static final Headers HEADERS2 = new RecordHeaders()
+        .add("source", "test".getBytes())
+        .add("version", "2.0".getBytes());
+
+    private static final Headers EMPTY_HEADERS = new RecordHeaders();
+
+    public TestInfo testInfo;
+
+    @BeforeAll
+    public static void before() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterAll
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @BeforeEach
+    public void beforeTest(final TestInfo testInfo) throws 
InterruptedException {
+        this.testInfo = testInfo;
+        final String uniqueTestName = safeUniqueTestName(testInfo);
+        inputStream = "input-stream-" + uniqueTestName;
+        outputStream = "output-stream-" + uniqueTestName;
+        CLUSTER.createTopic(inputStream);
+        CLUSTER.createTopic(outputStream);
+
+        baseTimestamp = CLUSTER.time.milliseconds();
+    }
+
+    @AfterEach
+    public void afterTest() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+    @Test
+    public void shouldPutFetchAndDelete() throws Exception {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder.addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data with headers
+        int numRecordsProduced = 0;
+
+        // Window 1: [baseTimestamp, baseTimestamp + WINDOW_SIZE_MS)
+        numRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp, HEADERS1,
+            KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, 
null));
+
+        // Window 1: updates in same window
+        numRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + 50, HEADERS2,
+            KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3, 
"c50"));
+
+        // Window 2: [baseTimestamp + WINDOW_SIZE_MS, baseTimestamp + 2 * 
WINDOW_SIZE_MS)
+        numRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + WINDOW_SIZE_MS,
+            EMPTY_HEADERS,
+            KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"), 
KeyValue.pair(3, null));
+
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        receivedRecords.forEach(receivedRecord -> assertEquals(0, 
receivedRecord.value));
+    }
+
+    @Test
+    public void shouldSetChangelogTopicProperties() throws Exception {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder.addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedWindowStoreWithHeadersContentCheckerProcessor(false), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        produceDataToTopicWithHeaders(inputStream, baseTimestamp, new 
RecordHeaders(), KeyValue.pair(0, "foo"));
+
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            1);
+
+        // verify changelog topic properties
+        final String changelogTopic = 
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + STORE_NAME + 
"-changelog";
+        final Properties changelogTopicConfig = 
CLUSTER.getLogConfig(changelogTopic);
+        assertEquals("compact", 
changelogTopicConfig.getProperty("cleanup.policy"));
+    }
+
+    @Test
+    public void shouldRestore() throws Exception {
+        StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder.addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        int initialRecordsProduced = 0;
+
+        initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp, HEADERS1,
+            KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, 
null));
+
+        initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + 50, HEADERS2,
+            KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3, 
"c50"));
+
+        initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + WINDOW_SIZE_MS, EMPTY_HEADERS,
+            KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"), 
KeyValue.pair(3, "c100"));
+
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced);
+
+        // wipe out state store to trigger restore process on restart
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+
+        // restart app - use processor WITHOUT validation of initial data, 
just write to store
+        streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder.addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce additional records to verify restored store works correctly
+        final Headers finalHeaders = new RecordHeaders().add("final", 
"true".getBytes());
+        final int additionalRecordsProduced = 
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_SIZE_MS, 
finalHeaders,
+            KeyValue.pair(1, "a200"), KeyValue.pair(2, "b200"), 
KeyValue.pair(3, "c200"));
+
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced + additionalRecordsProduced);
+
+        receivedRecords.forEach(receivedRecord -> assertEquals(0, 
receivedRecord.value));
+    }
+
+    @Test
+    public void shouldManualUpgradeFromTimestampedToHeaders() throws Exception 
{
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder.addStateStore(
+                Stores.timestampedWindowStoreBuilder(
+                    Stores.persistentTimestampedWindowStore(STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(TimestampedWindowStoreContentCheckerProcessor::new, 
STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        shouldManualUpgradeFromTimestampedToHeaders(streamsBuilder.build());
+    }
+
+    private void shouldManualUpgradeFromTimestampedToHeaders(final Topology 
originalTopology) throws Exception {
+        // build original timestamped (legacy) topology and start app
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(originalTopology, props);
+        kafkaStreams.start();
+
+        // produce source data to legacy timestamped store (without headers)
+        int initialRecordsProduced = 0;
+        initialRecordsProduced += produceDataToTopic(inputStream, 
baseTimestamp,
+            KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, 
null));
+        initialRecordsProduced += produceDataToTopic(inputStream, 
baseTimestamp + 50,
+            KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3, 
"c50"));
+        initialRecordsProduced += produceDataToTopic(inputStream, 
baseTimestamp + WINDOW_SIZE_MS,
+            KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"), 
KeyValue.pair(3, null));
+
+        List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced);
+
+        receivedRecords.forEach(receivedRecord -> assertEquals(0, 
receivedRecord.value));
+
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+
+        // restart app with headers-aware store to test upgrade path
+        // The store should migrate legacy timestamped data (without headers)
+        // and add empty headers to existing data
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce additional records with headers to verify upgraded store 
works
+        final Headers upgradedHeaders = new RecordHeaders().add("upgraded", 
"true".getBytes());
+        final int additionalRecordsProduced = 
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_SIZE_MS, 
upgradedHeaders,
+            KeyValue.pair(1, "a200"), KeyValue.pair(2, "b200"), 
KeyValue.pair(3, "c200"));
+
+        receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced + additionalRecordsProduced);
+
+        receivedRecords.forEach(receivedRecord -> assertEquals(0, 
receivedRecord.value));
+    }
+
+    private Properties props() {
+        final String safeTestName = safeUniqueTestName(testInfo);
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        return streamsConfiguration;
+    }
+
+    /**
+     * @return number of records produced
+     */
+    @SuppressWarnings("varargs")
+    @SafeVarargs
+    private final int produceDataToTopic(final String topic,
+                                         final long timestamp,
+                                         final KeyValue<Integer, String>... 
keyValues) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            topic,
+            Arrays.asList(keyValues),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class),
+            timestamp);
+        return keyValues.length;
+    }
+
+    /**
+     * Produce records with headers.
+     *
+     * @return number of records produced
+     */
+    @SuppressWarnings("varargs")
+    @SafeVarargs
+    private int produceDataToTopicWithHeaders(final String topic,
+                                              final long timestamp,
+                                              final Headers headers,
+                                              final KeyValue<Integer, 
String>... keyValues) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            topic,
+            Arrays.asList(keyValues),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class),
+            headers,
+            timestamp,
+            false);
+        return keyValues.length;
+    }
+
+    /**
+     * Processor for validating expected contents of a timestamped window 
store with headers, and forwards
+     * the number of failed checks downstream for consumption.
+     */
+    private static class 
TimestampedWindowStoreWithHeadersContentCheckerProcessor implements 
Processor<Integer, String, Integer, Integer> {
+
+        private ProcessorContext<Integer, Integer> context;
+        private TimestampedWindowStoreWithHeaders<Integer, String> store;
+
+        // whether the processor should write records to the store as they 
arrive.
+        private final boolean writeToStore;
+        // in-memory copy of seen data, to validate for testing purposes.
+        // Maps key -> windowStartTime -> ValueTimestampHeaders
+        private final Map<Integer, Map<Long, 
Optional<ValueTimestampHeaders<String>>>> data;
+
+        TimestampedWindowStoreWithHeadersContentCheckerProcessor(final boolean 
writeToStore) {
+            this.writeToStore = writeToStore;
+            this.data = new HashMap<>();
+        }
+
+        @Override
+        public void init(final ProcessorContext<Integer, Integer> context) {
+            this.context = context;
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<Integer, String> record) {
+            final long windowStartTime = record.timestamp() - 
(record.timestamp() % WINDOW_SIZE_MS);
+
+            if (writeToStore) {
+                final ValueTimestampHeaders<String> valueTimestampHeaders =
+                    ValueTimestampHeaders.make(record.value(), 
record.timestamp(), record.headers());
+                store.put(record.key(), valueTimestampHeaders, 
windowStartTime);
+
+                data.computeIfAbsent(record.key(), k -> new HashMap<>());
+                data.get(record.key()).put(windowStartTime, 
Optional.ofNullable(valueTimestampHeaders));
+            }
+
+            //
+            final int failedChecks = checkStoreContents();
+            context.forward(record.withValue(failedChecks));
+        }
+
+        /**
+         * Check expected contents of store, and signal completion by writing 
number of failures to downstream
+         * @return number of failed checks
+         */
+        private int checkStoreContents() {
+            int failedChecks = 0;
+            for (final Map.Entry<Integer, Map<Long, 
Optional<ValueTimestampHeaders<String>>>> keyEntry : data.entrySet()) {
+                final Integer key = keyEntry.getKey();
+
+                for (final Map.Entry<Long, 
Optional<ValueTimestampHeaders<String>>> windowEntry : 
keyEntry.getValue().entrySet()) {
+                    final Long windowStartTime = windowEntry.getKey();
+                    final ValueTimestampHeaders<String> 
expectedValueTimestampHeaders =
+                        windowEntry.getValue().orElse(null);
+
+                    // validate fetch from store
+                    try (final 
WindowStoreIterator<ValueTimestampHeaders<String>> iterator =
+                             store.fetch(key, windowStartTime, 
windowStartTime)) {
+                        final ValueTimestampHeaders<String> 
actualValueTimestampHeaders =
+                            iterator.hasNext() ? iterator.next().value : null;
+                        if (!Objects.equals(actualValueTimestampHeaders, 
expectedValueTimestampHeaders)) {
+                            failedChecks++;
+                        }
+                    }
+                }
+            }
+            return failedChecks;
+        }
+    }
+
+    /**
+     * Processor for validating expected contents of a timestamped window 
store (without headers).
+     * Used for testing the upgrade path from TimestampedWindowStore to 
TimestampedWindowStoreWithHeaders.
+     */
+    private static class TimestampedWindowStoreContentCheckerProcessor 
implements Processor<Integer, String, Integer, Integer> {
+
+        private ProcessorContext<Integer, Integer> context;
+        private TimestampedWindowStore<Integer, String> store;
+
+        // in-memory copy of seen data, to validate for testing purposes.
+        // Maps key -> windowStartTime -> ValueAndTimestamp
+        private final Map<Integer, Map<Long, 
Optional<ValueAndTimestamp<String>>>> data;
+
+        TimestampedWindowStoreContentCheckerProcessor() {
+            this.data = new HashMap<>();
+        }
+
+        @Override
+        public void init(final ProcessorContext<Integer, Integer> context) {
+            this.context = context;
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<Integer, String> record) {
+            final long windowStartTime = record.timestamp() - 
(record.timestamp() % WINDOW_SIZE_MS);
+
+            final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.make(record.value(), record.timestamp());
+            store.put(record.key(), valueAndTimestamp, windowStartTime);
+
+            data.computeIfAbsent(record.key(), k -> new HashMap<>());
+            data.get(record.key()).put(windowStartTime, 
Optional.ofNullable(valueAndTimestamp));
+
+            final int failedChecks = checkStoreContents();
+            context.forward(record.withValue(failedChecks));
+        }
+
+        /**
+         * Check expected contents of store, and signal completion by writing
+         * @return number of failed checks
+         */
+        private int checkStoreContents() {
+            int failedChecks = 0;
+            for (final Map.Entry<Integer, Map<Long, 
Optional<ValueAndTimestamp<String>>>> keyEntry : data.entrySet()) {
+                final Integer key = keyEntry.getKey();
+
+                for (final Map.Entry<Long, 
Optional<ValueAndTimestamp<String>>> windowEntry : 
keyEntry.getValue().entrySet()) {
+                    final Long windowStartTime = windowEntry.getKey();
+                    final ValueAndTimestamp<String> expectedValueAndTimestamp 
= windowEntry.getValue().orElse(null);
+
+                    // validate fetch from store
+                    try (final WindowStoreIterator<ValueAndTimestamp<String>> 
iterator =
+                             store.fetch(key, windowStartTime, 
windowStartTime)) {
+                        final ValueAndTimestamp<String> 
actualValueAndTimestamp =
+                            iterator.hasNext() ? iterator.next().value : null;
+                        if (!Objects.equals(actualValueAndTimestamp, 
expectedValueAndTimestamp)) {
+                            failedChecks++;
+                        }
+                    }
+                }
+            }
+            return failedChecks;
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 8748843f034..3b3d57298fe 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.streams.state.VersionedKeyValueStore;
@@ -71,6 +72,8 @@ abstract class AbstractReadWriteDecorator<T extends 
StateStore, K, V> extends Wr
             return new 
VersionedKeyValueStoreReadWriteDecorator<>((VersionedKeyValueStore<?, ?>) 
store);
         } else if (store instanceof KeyValueStore) {
             return new KeyValueStoreReadWriteDecorator<>((KeyValueStore<?, ?>) 
store);
+        } else if (store instanceof TimestampedWindowStoreWithHeaders) {
+            return new 
TimestampedWindowStoreWithHeadersReadWriteDecorator<>((TimestampedWindowStoreWithHeaders<?,
 ?>) store);
         } else if (store instanceof TimestampedWindowStore) {
             return new 
TimestampedWindowStoreReadWriteDecorator<>((TimestampedWindowStore<?, ?>) 
store);
         } else if (store instanceof WindowStore) {
@@ -272,6 +275,15 @@ abstract class AbstractReadWriteDecorator<T extends 
StateStore, K, V> extends Wr
         }
     }
 
+    static class TimestampedWindowStoreWithHeadersReadWriteDecorator<K, V>
+        extends WindowStoreReadWriteDecorator<K, ValueTimestampHeaders<V>>
+        implements TimestampedWindowStoreWithHeaders<K, V> {
+
+        TimestampedWindowStoreWithHeadersReadWriteDecorator(final 
TimestampedWindowStoreWithHeaders<K, V> inner) {
+            super(inner);
+        }
+    }
+
     static class SessionStoreReadWriteDecorator<K, AGG>
         extends AbstractReadWriteDecorator<SessionStore<K, AGG>, K, AGG>
         implements SessionStore<K, AGG> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index b1e8234f3c4..1ad4adc6e91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -32,6 +32,7 @@ import 
org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import 
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
 import 
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderWithHeaders;
 import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.TimestampedWindowStoreWithHeadersBuilder;
 import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 
@@ -332,6 +333,37 @@ public final class Stores {
         return persistentWindowStore(name, retentionPeriod, windowSize, 
retainDuplicates, true);
     }
 
+    /**
+     * Creates a persistent {@link WindowBytesStoreSupplier} that preserves 
timestamps and headers.
+     *
+     * @param name                  name of the store (cannot be {@code null})
+     * @param retentionPeriod       length of time to retain data in the store 
(cannot be negative)
+     * @param windowSize            size of the windows (cannot be negative)
+     * @param retainDuplicates      whether or not to retain duplicates
+     * @return an instance of {@link WindowBytesStoreSupplier}
+     * @throws IllegalArgumentException if {@code retentionPeriod} is smaller 
than {@code windowSize}
+     */
+    public static WindowBytesStoreSupplier 
persistentTimestampedWindowStoreWithHeaders(final String name,
+                                                                               
        final Duration retentionPeriod,
+                                                                               
        final Duration windowSize,
+                                                                               
        final boolean retainDuplicates) throws IllegalArgumentException {
+        Objects.requireNonNull(name, "name cannot be null");
+        final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
+        final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
+        final long windowSizeMs = validateMillisecondDuration(windowSize, 
wsMsgPrefix);
+
+        final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
+
+        return new RocksDbWindowBytesStoreSupplier(
+            name,
+            retentionMs,
+            defaultSegmentInterval,
+            windowSizeMs,
+            retainDuplicates,
+            
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS);
+    }
+
     private static WindowBytesStoreSupplier persistentWindowStore(final String 
name,
                                                                   final 
Duration retentionPeriod,
                                                                   final 
Duration windowSize,
@@ -598,6 +630,24 @@ public final class Stores {
         return new TimestampedWindowStoreBuilder<>(supplier, keySerde, 
valueSerde, Time.SYSTEM);
     }
 
+    /**
+     * Creates a {@link StoreBuilder} that can be used to build a {@link 
TimestampedWindowStoreWithHeaders}.
+     *
+     * @param supplier      a {@link WindowBytesStoreSupplier} (cannot be 
{@code null})
+     * @param keySerde      the key serde to use
+     * @param valueSerde    the value serde to use
+     * @param <K>           key type
+     * @param <V>           value type
+     * @return an instance of {@link StoreBuilder} that can build a {@link 
TimestampedWindowStoreWithHeaders}
+     */
+    public static <K, V> StoreBuilder<TimestampedWindowStoreWithHeaders<K, V>> 
timestampedWindowStoreWithHeadersBuilder(
+            final WindowBytesStoreSupplier supplier,
+            final Serde<K> keySerde,
+            final Serde<V> valueSerde) {
+        Objects.requireNonNull(supplier, "supplier cannot be null");
+        return new TimestampedWindowStoreWithHeadersBuilder<>(supplier, 
keySerde, valueSerde, Time.SYSTEM);
+    }
+
     /**
      * Creates a {@link StoreBuilder} that can be used to build a {@link 
SessionStore}.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index c8c6d187124..e982365da17 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -66,7 +66,7 @@ class MeteredTimestampedWindowStoreWithHeaders<K, V>
     @Override
     public void put(final K key, final ValueTimestampHeaders<V> value, final 
long windowStartTimestamp) {
         Objects.requireNonNull(key, "key cannot be null");
-        final Headers headers = value.headers() == null ? new RecordHeaders() 
: value.headers();
+        final Headers headers = value == null || value.headers() == null ? new 
RecordHeaders() : value.headers();
         try {
             maybeMeasureLatency(
                 () -> wrapped().put(keyBytes(key, headers), 
serdes.rawValue(value, headers), windowStartTimestamp),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
index 6a0a23e1c36..569fe10fe25 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
@@ -54,6 +54,9 @@ class TimestampedToHeadersIteratorAdapter<K> implements 
KeyValueIterator<K, byte
     @Override
     public KeyValue<K, byte[]> next() {
         final KeyValue<K, byte[]> timestampedKeyValue = innerIterator.next();
+        if (timestampedKeyValue == null) {
+            return null;
+        }
         return KeyValue.pair(timestampedKeyValue.key, 
convertToHeaderFormat(timestampedKeyValue.value));
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
new file mode 100644
index 00000000000..f53cbc68285
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * Adapter for backward compatibility between {@link 
TimestampedWindowStoreWithHeaders}
+ * and {@link TimestampedWindowStore}.
+ * <p>
+ * If a user provides a supplier for {@code TimestampedWindowStore} (without 
headers) when building
+ * a {@code TimestampedWindowStoreWithHeaders}, this adapter translates 
between the timestamped
+ * {@code byte[]} format and the timestamped-with-headers {@code byte[]} 
format.
+ * <p>
+ * Format conversion:
+ * <ul>
+ *   <li>Write: {@code [headers][timestamp][value]} → {@code 
[timestamp][value]} (strip headers)</li>
+ *   <li>Read: {@code [timestamp][value]} → {@code 
[headers][timestamp][value]} (add empty headers)</li>
+ * </ul>
+ */
+public class TimestampedToHeadersWindowStoreAdapter implements 
WindowStore<Bytes, byte[]> {
+    private final WindowStore<Bytes, byte[]> store;
+
+    public TimestampedToHeadersWindowStoreAdapter(final WindowStore<Bytes, 
byte[]> store) {
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        if (!(store instanceof TimestampedBytesStore)) {
+            throw new IllegalArgumentException("Provided store must be a 
timestamped store, but it is not.");
+        }
+        this.store = store;
+    }
+
+    /**
+     * Extract raw timestamped value (timestamp + value) from serialized 
ValueTimestampHeaders.
+     * This strips the headers portion but keeps timestamp and value intact.
+     *
+     * Format conversion:
+     * Input:  [headersSize(varint)][headers][timestamp(8)][value]
+     * Output: [timestamp(8)][value]
+     */
+    // TODO: should be extract to util class, tracked by KAFKA-20205
+    static byte[] rawTimestampedValue(final byte[] rawValueTimestampHeaders) {
+        if (rawValueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        // Skip headers, keep timestamp + value
+        buffer.position(buffer.position() + headersSize);
+
+        final byte[] result = new byte[buffer.remaining()];
+        buffer.get(result);
+        return result;
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] 
valueWithTimestampAndHeaders, final long windowStartTimestamp) {
+        store.put(key, rawTimestampedValue(valueWithTimestampAndHeaders), 
windowStartTimestamp);
+    }
+
+    @Override
+    public byte[] fetch(final Bytes key, final long timestamp) {
+        return convertToHeaderFormat(store.fetch(key, timestamp));
+    }
+
+    @Override
+    public WindowStoreIterator<byte[]> fetch(final Bytes key, final long 
timeFrom, final long timeTo) {
+        return new 
TimestampedWindowToHeadersWindowStoreIteratorAdapter(store.fetch(key, timeFrom, 
timeTo));
+    }
+
+    @Override
+    public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant 
timeFrom, final Instant timeTo) throws IllegalArgumentException {
+        return new 
TimestampedWindowToHeadersWindowStoreIteratorAdapter(store.fetch(key, timeFrom, 
timeTo));
+    }
+
+    @Override
+    public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final 
long timeFrom, final long timeTo) {
+        return new 
TimestampedWindowToHeadersWindowStoreIteratorAdapter(store.backwardFetch(key, 
timeFrom, timeTo));
+    }
+
+    @Override
+    public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final 
Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+        return new 
TimestampedWindowToHeadersWindowStoreIteratorAdapter(store.backwardFetch(key, 
timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
keyFrom, final Bytes keyTo,
+                                                           final long 
timeFrom, final long timeTo) {
+        return new TimestampedToHeadersIteratorAdapter<>(store.fetch(keyFrom, 
keyTo, timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
keyFrom, final Bytes keyTo,
+                                                           final Instant 
timeFrom, final Instant timeTo) throws IllegalArgumentException {
+        return new TimestampedToHeadersIteratorAdapter<>(store.fetch(keyFrom, 
keyTo, timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes 
keyFrom, final Bytes keyTo,
+                                                                   final long 
timeFrom, final long timeTo) {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.backwardFetch(keyFrom, keyTo, 
timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes 
keyFrom, final Bytes keyTo,
+                                                                   final 
Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.backwardFetch(keyFrom, keyTo, 
timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long 
timeFrom, final long timeTo) {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.fetchAll(timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant 
timeFrom, final Instant timeTo) throws IllegalArgumentException {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.fetchAll(timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final 
long timeFrom, final long timeTo) {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.backwardFetchAll(timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final 
Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.backwardFetchAll(timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+        return new TimestampedToHeadersIteratorAdapter<>(store.all());
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
+        return new TimestampedToHeadersIteratorAdapter<>(store.backwardAll());
+    }
+
+    @Override
+    public String name() {
+        return store.name();
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        store.init(context, root);
+    }
+
+    @Override
+    public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+        store.commit(changelogOffsets);
+    }
+
+    @Override
+    public void close() {
+        store.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return store.isOpen();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+
+        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped window stores with headers yet.");
+    }
+
+    @Override
+    public Position getPosition() {
+        return store.getPosition();
+    }
+
+    /**
+     * Iterator adapter for WindowStoreIterator that converts timestamp-only 
values
+     * to timestamp-with-headers format by adding empty headers.
+     */
+    private static class TimestampedWindowToHeadersWindowStoreIteratorAdapter
+        extends TimestampedToHeadersIteratorAdapter<Long>
+        implements WindowStoreIterator<byte[]> {
+
+        TimestampedWindowToHeadersWindowStoreIteratorAdapter(final 
KeyValueIterator<Long, byte[]> innerIterator) {
+            super(innerIterator);
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
new file mode 100644
index 00000000000..9fc363bb40e
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+/**
+ * Store builder for {@link TimestampedWindowStoreWithHeaders}.
+ * <p>
+ * This builder creates header-aware timestamped window stores that preserve 
record headers
+ * alongside values and timestamps. It wraps the underlying bytes store with 
the necessary
+ * layers (logging, caching, metering) to provide a fully-functional store.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class TimestampedWindowStoreWithHeadersBuilder<K, V>
+    extends AbstractStoreBuilder<K, ValueTimestampHeaders<V>, 
TimestampedWindowStoreWithHeaders<K, V>> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TimestampedWindowStoreWithHeadersBuilder.class);
+
+    private final WindowBytesStoreSupplier storeSupplier;
+
+    public TimestampedWindowStoreWithHeadersBuilder(final 
WindowBytesStoreSupplier storeSupplier,
+                                                    final Serde<K> keySerde,
+                                                    final Serde<V> valueSerde,
+                                                    final Time time) {
+        super(storeSupplier.name(), keySerde, valueSerde == null ? null : new 
ValueTimestampHeadersSerde<>(valueSerde), time);
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's 
metricsScope can't be null");
+        this.storeSupplier = storeSupplier;
+    }
+
+    @Override
+    public TimestampedWindowStoreWithHeaders<K, V> build() {
+        WindowStore<Bytes, byte[]> store = storeSupplier.get();
+
+        if (!(store instanceof HeadersBytesStore)) {
+            if (store.persistent()) {
+                store = new TimestampedToHeadersWindowStoreAdapter(store);
+            } else {
+                store = new 
InMemoryTimestampedWindowStoreWithHeadersMarker(store);
+            }
+        }
+
+        if (storeSupplier.retainDuplicates() && enableCaching) {
+            LOG.warn("Disabling caching for {} since store was configured to 
retain duplicates", storeSupplier.name());
+            enableCaching = false;
+        }
+
+        return new MeteredTimestampedWindowStoreWithHeaders<>(
+            maybeWrapCaching(maybeWrapLogging(store)),
+            storeSupplier.windowSize(),
+            storeSupplier.metricsScope(),
+            time,
+            keySerde,
+            valueSerde);
+    }
+
+    private WindowStore<Bytes, byte[]> maybeWrapCaching(final 
WindowStore<Bytes, byte[]> inner) {
+        if (!enableCaching) {
+            return inner;
+        }
+
+        final boolean isTimeOrdered = isTimeOrderedStore(inner);
+        if (isTimeOrdered) {
+            return new TimeOrderedCachingWindowStore(
+                inner,
+                storeSupplier.windowSize(),
+                storeSupplier.segmentIntervalMs());
+        }
+
+        return new CachingWindowStore(
+            inner,
+            storeSupplier.windowSize(),
+            storeSupplier.segmentIntervalMs());
+    }
+
+    private boolean isTimeOrderedStore(final StateStore stateStore) {
+        if (stateStore instanceof RocksDBTimeOrderedWindowStore) {
+            return true;
+        }
+        if (stateStore instanceof WrappedStateStore) {
+            return isTimeOrderedStore(((WrappedStateStore<?, ?, ?>) 
stateStore).wrapped());
+        }
+        return false;
+    }
+
+    private WindowStore<Bytes, byte[]> maybeWrapLogging(final 
WindowStore<Bytes, byte[]> inner) {
+        if (!enableLogging) {
+            return inner;
+        }
+        return new ChangeLoggingTimestampedWindowBytesStoreWithHeaders(inner, 
storeSupplier.retainDuplicates());
+    }
+
+    public long retentionPeriod() {
+        return storeSupplier.retentionPeriod();
+    }
+
+    /**
+     * Marker wrapper for in-memory window stores that support both timestamps 
and headers.
+     * <p>
+     * This wrapper indicates that the underlying store understands the 
value-with-headers format.
+     * The actual in-memory store doesn't need to change since it operates on 
raw bytes.
+     */
+    private static final class InMemoryTimestampedWindowStoreWithHeadersMarker
+        extends WrappedStateStore<WindowStore<Bytes, byte[]>, Bytes, byte[]>
+        implements WindowStore<Bytes, byte[]>, TimestampedBytesStore, 
HeadersBytesStore {
+
+        private InMemoryTimestampedWindowStoreWithHeadersMarker(final 
WindowStore<Bytes, byte[]> wrapped) {
+            super(wrapped);
+            if (wrapped.persistent()) {
+                throw new IllegalArgumentException("Provided store must not be 
a persistent store, but it is.");
+            }
+        }
+
+        @Override
+        public void put(final Bytes key,
+                        final byte[] value,
+                        final long windowStartTimestamp) {
+            wrapped().put(key, value, windowStartTimestamp);
+        }
+
+        @Override
+        public byte[] fetch(final Bytes key,
+                            final long time) {
+            return wrapped().fetch(key, time);
+        }
+
+        @Override
+        public WindowStoreIterator<byte[]> fetch(final Bytes key,
+                                                 final long timeFrom,
+                                                 final long timeTo) {
+            return wrapped().fetch(key, timeFrom, timeTo);
+        }
+
+        @Override
+        public WindowStoreIterator<byte[]> backwardFetch(final Bytes key,
+                                                         final long timeFrom,
+                                                         final long timeTo) {
+            return wrapped().backwardFetch(key, timeFrom, timeTo);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
keyFrom,
+                                                               final Bytes 
keyTo,
+                                                               final long 
timeFrom,
+                                                               final long 
timeTo) {
+            return wrapped().fetch(keyFrom, keyTo, timeFrom, timeTo);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final 
Bytes keyFrom,
+                                                                       final 
Bytes keyTo,
+                                                                       final 
long timeFrom,
+                                                                       final 
long timeTo) {
+            return wrapped().backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long 
timeFrom,
+                                                                  final long 
timeTo) {
+            return wrapped().fetchAll(timeFrom, timeTo);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFetchAll(final long timeFrom,
+                                                                          
final long timeTo) {
+            return wrapped().backwardFetchAll(timeFrom, timeTo);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+            return wrapped().all();
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
+            return wrapped().backwardAll();
+        }
+
+        @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+                                        final PositionBound positionBound,
+                                        final QueryConfig config) {
+            throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped window stores with headers yet.");
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilderTest.java
new file mode 100644
index 00000000000..304a84c6976
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilderTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class TimestampedWindowStoreWithHeadersBuilderTest {
+    private static final String STORE_NAME = "name";
+    private static final String METRICS_SCOPE = "metricsScope";
+
+    @Mock
+    private WindowBytesStoreSupplier supplier;
+    @Mock
+    private RocksDBTimestampedWindowStoreWithHeaders 
timestampedStoreWithHeaders;
+
+    private TimestampedWindowStoreWithHeadersBuilder<String, String> builder;
+
+    public void setUp() {
+        when(supplier.name()).thenReturn(STORE_NAME);
+        when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
+        when(supplier.get()).thenReturn(timestampedStoreWithHeaders);
+
+        builder = new TimestampedWindowStoreWithHeadersBuilder<>(
+            supplier,
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime());
+    }
+
+    @Test
+    public void shouldHaveMeteredStoreAsOuterStore() {
+        setUp();
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
builder.build();
+        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreByDefault() {
+        setUp();
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
builder.build();
+        final StateStore next = ((WrappedStateStore) store).wrapped();
+        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
next);
+    }
+
+    @Test
+    public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+        setUp();
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
builder.withLoggingDisabled().build();
+        final StateStore next = ((WrappedStateStore) store).wrapped();
+        assertSame(timestampedStoreWithHeaders, next);
+    }
+
+    @Test
+    public void shouldHaveCachingStoreWhenEnabled() {
+        setUp();
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
builder.withCachingEnabled().build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
+        assertInstanceOf(CachingWindowStore.class, wrapped);
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+        setUp();
+        final TimestampedWindowStoreWithHeaders<String, String> store = builder
+            .withLoggingEnabled(Collections.emptyMap())
+            .build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
+        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
wrapped);
+        assertSame(timestampedStoreWithHeaders, ((WrappedStateStore) 
wrapped).wrapped());
+    }
+
+    @Test
+    public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+        setUp();
+        final TimestampedWindowStoreWithHeaders<String, String> store = builder
+            .withLoggingEnabled(Collections.emptyMap())
+            .withCachingEnabled()
+            .build();
+        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore changeLogging = (WrappedStateStore) 
caching.wrapped();
+        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
+        assertInstanceOf(CachingWindowStore.class, caching);
+        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
changeLogging);
+        assertSame(timestampedStoreWithHeaders, changeLogging.wrapped());
+    }
+
+    @Test
+    public void shouldNotWrapHeadersByteStore() {
+        when(supplier.name()).thenReturn(STORE_NAME);
+        when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
+        when(supplier.get()).thenReturn(new 
RocksDBTimestampedWindowStoreWithHeaders(
+            new RocksDBTimestampedSegmentedBytesStoreWithHeaders(
+                "name",
+                "metric-scope",
+                10L,
+                5L,
+                new WindowKeySchema()),
+            false,
+            1L));
+
+        builder = new TimestampedWindowStoreWithHeadersBuilder<>(
+            supplier,
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime());
+
+        final TimestampedWindowStoreWithHeaders<String, String> store = builder
+            .withLoggingDisabled()
+            .withCachingDisabled()
+            .build();
+        assertInstanceOf(RocksDBTimestampedWindowStoreWithHeaders.class, 
((WrappedStateStore) store).wrapped());
+    }
+
+    @Test
+    public void shouldWrapTimestampedStoreAsHeadersStore() {
+        when(supplier.name()).thenReturn(STORE_NAME);
+        when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
+        when(supplier.get()).thenReturn(new RocksDBTimestampedWindowStore(
+            new RocksDBTimestampedSegmentedBytesStore(
+                "name",
+                "metric-scope",
+                10L,
+                5L,
+                new WindowKeySchema()),
+            false,
+            1L));
+
+        builder = new TimestampedWindowStoreWithHeadersBuilder<>(
+            supplier,
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime());
+
+        final TimestampedWindowStoreWithHeaders<String, String> store = builder
+            .withLoggingDisabled()
+            .withCachingDisabled()
+            .build();
+        assertInstanceOf(TimestampedToHeadersWindowStoreAdapter.class, 
((WrappedStateStore) store).wrapped());
+    }
+
+    @Test
+    public void shouldDisableCachingWithRetainDuplicates() {
+        when(supplier.name()).thenReturn(STORE_NAME);
+        when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
+        when(supplier.retainDuplicates()).thenReturn(true);
+        when(supplier.get()).thenReturn(timestampedStoreWithHeaders);
+
+        builder = new TimestampedWindowStoreWithHeadersBuilder<>(
+            supplier,
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime());
+
+        final TimestampedWindowStoreWithHeaders<String, String> store = builder
+            .withCachingEnabled()
+            .withLoggingDisabled()
+            .build();
+
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        // Caching should be automatically disabled when retainDuplicates is 
true
+        assertSame(timestampedStoreWithHeaders, wrapped);
+    }
+
+    @Test
+    public void shouldThrowNullPointerIfInnerIsNull() {
+        assertThrows(NullPointerException.class, () -> new 
TimestampedWindowStoreWithHeadersBuilder<>(null, Serdes.String(), 
Serdes.String(), new MockTime()));
+    }
+}
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 6ebf325bda0..b15f1f2ab27 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -84,6 +84,7 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.streams.state.VersionedKeyValueStore;
@@ -879,6 +880,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
+     * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
      */
     public Map<String, StateStore> getAllStateStores() {
@@ -911,6 +913,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
+     * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
      */
     public StateStore getStateStore(final String name) throws 
IllegalArgumentException {
@@ -992,6 +995,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
+     * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
      */
     @SuppressWarnings("unchecked")
@@ -1020,6 +1024,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
+     * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
      */
     @SuppressWarnings("unchecked")
@@ -1044,6 +1049,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
+     * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
      */
     @SuppressWarnings("unchecked")
@@ -1068,6 +1074,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
+     * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
      */
     @SuppressWarnings("unchecked")
@@ -1097,6 +1104,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getVersionedKeyValueStore(String)
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getTimestampedWindowStore(String)
+     * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
      */
     @SuppressWarnings("unchecked")
@@ -1133,6 +1141,31 @@ public class TopologyTestDriver implements Closeable {
         return store instanceof TimestampedWindowStore ? 
(TimestampedWindowStore<K, V>) store : null;
     }
 
+    /**
+     * Get the {@link TimestampedWindowStoreWithHeaders} with the given name.
+     * The store can be a "regular" or global store.
+     * <p>
+     * This is often useful in test cases to pre-populate the store before the 
test case instructs the topology to
+     * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, 
and/or to check the store afterward.
+     *
+     * @param name the name of the store
+     * @return the window store, or {@code null} if no {@link 
TimestampedWindowStoreWithHeaders} has been registered with the given name
+     * @see #getAllStateStores()
+     * @see #getStateStore(String)
+     * @see #getKeyValueStore(String)
+     * @see #getTimestampedKeyValueStore(String)
+     * @see #getVersionedKeyValueStore(String)
+     * @see #getTimestampedKeyValueStoreWithHeaders(String)
+     * @see #getWindowStore(String)
+     * @see #getTimestampedWindowStore(String)
+     * @see #getSessionStore(String)
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> WindowStore<K, ValueTimestampHeaders<V>> 
getTimestampedWindowStoreWithHeaders(final String name) {
+        final StateStore store = getStateStore(name, false);
+        return store instanceof TimestampedWindowStoreWithHeaders ? 
(TimestampedWindowStoreWithHeaders<K, V>) store : null;
+    }
+
     /**
      * Get the {@link SessionStore} with the given name.
      * The store can be a "regular" or global store.
@@ -1150,6 +1183,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
+     * @see #getTimestampedWindowStoreWithHeaders(String)
      */
     @SuppressWarnings("unchecked")
     public <K, V> SessionStore<K, V> getSessionStore(final String name) {

Reply via email to