This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new daa4e472b4a KAFKA-20134: Implement TimestampedWindowStoreWithHeaders 
(4/N) (#21493)
daa4e472b4a is described below

commit daa4e472b4a029bf7bcbdacaed55f72ae4311900
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Feb 25 02:30:29 2026 +0000

    KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (4/N) (#21493)
    
    This PR adds `ChangeLoggingTimestampedWindowBytesStoreWithHeaders` for
    the  `TimestampedWindowStoreWithHeaders` introduced in KIP-1271.
    
    Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 ...gingTimestampedWindowBytesStoreWithHeaders.java |  60 +++++
 ...TimestampedWindowBytesStoreWithHeadersTest.java | 253 +++++++++++++++++++++
 2 files changed, 313 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeaders.java
new file mode 100644
index 00000000000..374d47892af
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeaders.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.WindowStore;
+
+import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
+import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.rawValue;
+import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.timestamp;
+
+/**
+ * Change-logging wrapper for window stores that support headers.
+ * <p>
+ * This class extends {@link ChangeLoggingWindowBytesStore} and correctly 
handles
+ * the header-aware storage format: 
[headersSize(varint)][headersBytes][timestamp(8)][value]
+ * <p>
+ * Unlike {@link ChangeLoggingTimestampedWindowBytesStore} which uses
+ * {@link ValueAndTimestampDeserializer} for the format [timestamp(8)][value],
+ * this class uses {@link ValueTimestampHeadersDeserializer} to extract
+ * the timestamp from the correct position in the byte array.
+ */
+class ChangeLoggingTimestampedWindowBytesStoreWithHeaders extends 
ChangeLoggingWindowBytesStore {
+
+    ChangeLoggingTimestampedWindowBytesStoreWithHeaders(final 
WindowStore<Bytes, byte[]> bytesStore,
+                                                        final boolean 
retainDuplicates) {
+        super(bytesStore, retainDuplicates, WindowKeySchema::toStoreKeyBinary);
+    }
+
+    @Override
+    void log(final Bytes key,
+             final byte[] valueTimestampHeaders) {
+        internalContext.logChange(
+            name(),
+            key,
+            rawValue(valueTimestampHeaders),
+            valueTimestampHeaders != null
+                ? timestamp(valueTimestampHeaders)
+                : internalContext.recordContext().timestamp(),
+            valueTimestampHeaders != null
+                ? headers(valueTimestampHeaders)
+                : internalContext.recordContext().headers(),
+            wrapped().getPosition()
+        );
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeadersTest.java
new file mode 100644
index 00000000000..6ce3c1637d2
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeadersTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.time.Instant.ofEpochMilli;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class ChangeLoggingTimestampedWindowBytesStoreWithHeadersTest {
+
+    private final byte[] value = {0};
+    private final Bytes bytesKey = Bytes.wrap(value);
+    private final StreamsConfig streamsConfig = streamsConfigMock();
+    private final Headers testHeaders = new RecordHeaders()
+        .add(new RecordHeader("key1", "value1".getBytes()))
+        .add(new RecordHeader("key2", "value2".getBytes()));
+    private final long testTimestamp = 42L;
+    private byte[] valueTimestampHeaders;
+
+    @Mock
+    private WindowStore<Bytes, byte[]> inner;
+    @Mock
+    private ProcessorContextImpl context;
+    private ChangeLoggingTimestampedWindowBytesStoreWithHeaders store;
+
+    private static final Position POSITION = 
Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 1L)))));
+
+    @BeforeEach
+    public void setUp() {
+        final ValueTimestampHeaders<byte[]> valueWithHeaders =
+            ValueTimestampHeaders.make(value, testTimestamp, testHeaders);
+        final ValueTimestampHeadersSerializer<byte[]> serializer =
+            new ValueTimestampHeadersSerializer<>(new ByteArraySerializer());
+        valueTimestampHeaders = serializer.serialize("topic", 
valueWithHeaders);
+
+        store = new ChangeLoggingTimestampedWindowBytesStoreWithHeaders(inner, 
false);
+        store.init(context, store);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        verify(inner).init(context, store);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final InternalMockProcessorContext<String, Long> context = 
mockContext();
+        final WindowStore<Bytes, byte[]> inner = 
mock(InMemoryWindowStore.class);
+        final StateStore outer = new 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders(inner, false);
+
+        outer.init(context, outer);
+        verify(inner).init(context, outer);
+    }
+
+    @Test
+    public void shouldLogPuts() {
+        final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
+        when(inner.getPosition()).thenReturn(Position.emptyPosition());
+        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, "topic", new RecordHeaders()));
+
+        store.put(bytesKey, valueTimestampHeaders, 
context.recordContext().timestamp());
+
+        verify(inner).put(bytesKey, valueTimestampHeaders, 0);
+
+        final ArgumentCaptor<Headers> headersCaptor = 
ArgumentCaptor.forClass(Headers.class);
+        verify(context).logChange(
+            eq(store.name()),
+            eq(key),
+            eq(value),
+            eq(testTimestamp),
+            headersCaptor.capture(),
+            eq(Position.emptyPosition())
+        );
+
+        final Headers capturedHeaders = headersCaptor.getValue();
+        assertEquals(2, capturedHeaders.toArray().length);
+        assertEquals("value1", new 
String(capturedHeaders.lastHeader("key1").value()));
+        assertEquals("value2", new 
String(capturedHeaders.lastHeader("key2").value()));
+    }
+
+    @Test
+    public void shouldLogPutsWithPosition() {
+        final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
+        when(inner.getPosition()).thenReturn(POSITION);
+        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, "topic", new RecordHeaders()));
+
+        store.put(bytesKey, valueTimestampHeaders, 
context.recordContext().timestamp());
+
+        verify(inner).put(bytesKey, valueTimestampHeaders, 0);
+
+        final ArgumentCaptor<Headers> headersCaptor = 
ArgumentCaptor.forClass(Headers.class);
+        verify(context).logChange(
+            eq(store.name()),
+            eq(key),
+            eq(value),
+            eq(testTimestamp),
+            headersCaptor.capture(),
+            eq(POSITION)
+        );
+
+        final Headers capturedHeaders = headersCaptor.getValue();
+        assertEquals(2, capturedHeaders.toArray().length);
+        assertEquals("value1", new 
String(capturedHeaders.lastHeader("key1").value()));
+        assertEquals("value2", new 
String(capturedHeaders.lastHeader("key2").value()));
+    }
+
+    @SuppressWarnings({"resource", "unused"})
+    @Test
+    public void shouldDelegateToUnderlyingStoreWhenFetching() {
+        try (final WindowStoreIterator<byte[]> unused = store.fetch(bytesKey, 
ofEpochMilli(0), ofEpochMilli(10))) {
+            verify(inner).fetch(bytesKey, 0, 10);
+        }
+    }
+
+    @SuppressWarnings({"resource", "unused"})
+    @Test
+    public void shouldDelegateToUnderlyingStoreWhenBackwardFetching() {
+        try (final WindowStoreIterator<byte[]> unused = 
store.backwardFetch(bytesKey, ofEpochMilli(0), ofEpochMilli(10))) {
+            verify(inner).backwardFetch(bytesKey, 0, 10);
+        }
+    }
+
+    @SuppressWarnings({"resource", "unused"})
+    @Test
+    public void shouldDelegateToUnderlyingStoreWhenFetchingRange() {
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> unused = 
store.fetch(bytesKey, bytesKey, ofEpochMilli(0), ofEpochMilli(1))) {
+            verify(inner).fetch(bytesKey, bytesKey, 0, 1);
+        }
+    }
+
+    @SuppressWarnings({"resource", "unused"})
+    @Test
+    public void shouldDelegateToUnderlyingStoreWhenBackwardFetchingRange() {
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> unused = 
store.backwardFetch(bytesKey, bytesKey, ofEpochMilli(0), ofEpochMilli(1))) {
+            verify(inner).backwardFetch(bytesKey, bytesKey, 0, 1);
+        }
+    }
+
+    @Test
+    public void shouldRetainDuplicatesWhenSet() {
+        store = new ChangeLoggingTimestampedWindowBytesStoreWithHeaders(inner, 
true);
+        store.init(context, store);
+        when(inner.getPosition()).thenReturn(Position.emptyPosition());
+        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, "topic", new RecordHeaders()));
+
+        store.put(bytesKey, valueTimestampHeaders, 
context.recordContext().timestamp());
+        store.put(bytesKey, valueTimestampHeaders, 
context.recordContext().timestamp());
+
+        verify(inner, times(2)).put(bytesKey, valueTimestampHeaders, 0);
+
+        final ArgumentCaptor<Headers> headersCaptor = 
ArgumentCaptor.forClass(Headers.class);
+        verify(context, times(2)).logChange(
+            eq(store.name()),
+            any(Bytes.class),
+            eq(value),
+            eq(testTimestamp),
+            headersCaptor.capture(),
+            eq(Position.emptyPosition())
+        );
+
+        final Headers capturedHeaders = headersCaptor.getValue();
+        assertEquals(2, capturedHeaders.toArray().length);
+        assertEquals("value1", new 
String(capturedHeaders.lastHeader("key1").value()));
+        assertEquals("value2", new 
String(capturedHeaders.lastHeader("key2").value()));
+    }
+
+    private InternalMockProcessorContext mockContext() {
+        return new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
+            streamsConfig,
+            MockRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
+            Time.SYSTEM
+        );
+    }
+
+    private StreamsConfig streamsConfigMock() {
+        final StreamsConfig streamsConfig = mock(StreamsConfig.class);
+
+        final Map<String, Object> myValues = new HashMap<>();
+        
myValues.put(StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, 
true);
+        when(streamsConfig.originals()).thenReturn(myValues);
+        when(streamsConfig.values()).thenReturn(Map.of());
+        
when(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("add-id");
+        return streamsConfig;
+    }
+}

Reply via email to