This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e0d1d232f87 KAFKA-20158: Add ChangeLoggingSessionBytesStoreWithHeaders 
for header-aware session changelog logging   (3/N) (#21567)
e0d1d232f87 is described below

commit e0d1d232f878aa890dbd88da7fc41bce293f5853
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Feb 25 09:11:36 2026 -0500

    KAFKA-20158: Add ChangeLoggingSessionBytesStoreWithHeaders for header-aware 
session changelog logging   (3/N) (#21567)
    
    This covers:
    - New `ChangeLoggingSessionBytesStoreWithHeaders` class (overrides put
    to extract headers from serialized `AggregationWithHeaders` and log them
    separately)
    - New `ChangeLoggingSessionBytesStoreWithHeadersTes`t (6 tests)
    - `rawAggregation()` static method added to
    `AggregationWithHeadersDeserializer`
    - `internalContext` visibility change in
    `ChangeLoggingSessionBytesStore`
    
    Reviewers Uladzislau Blok, Alieh Saeedi <[email protected]> , TengYao
    Chi <[email protected]>
---
 .../AggregationWithHeadersDeserializer.java        |  14 ++
 .../internals/ChangeLoggingSessionBytesStore.java  |   2 +-
 .../ChangeLoggingSessionBytesStoreWithHeaders.java |  71 +++++++
 ...ngeLoggingSessionBytesStoreWithHeadersTest.java | 228 +++++++++++++++++++++
 4 files changed, 314 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
index f28078947ac..d3854842a7e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
@@ -120,6 +120,20 @@ class AggregationWithHeadersDeserializer<AGG> implements 
WrappingNullableDeseria
         return readHeaders(buffer);
     }
 
+    /**
+     * Extract the raw aggregation bytes from serialized 
AggregationWithHeaders,
+     * stripping the headers prefix.
+     */
+    static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
+        if (aggregationWithHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
+        readHeaders(buffer);
+        return readBytes(buffer, buffer.remaining());
+    }
+
     private static Headers readHeaders(final ByteBuffer buffer) {
         final int headersSize = ByteUtils.readVarint(buffer);
         final byte[] rawHeaders = readBytes(buffer, headersSize);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index 8c24d428abc..dcd85eab489 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -35,7 +35,7 @@ public class ChangeLoggingSessionBytesStore
     extends WrappedStateStore<SessionStore<Bytes, byte[]>, byte[], byte[]>
     implements SessionStore<Bytes, byte[]> {
 
-    private InternalProcessorContext<?, ?> internalContext;
+    protected InternalProcessorContext<?, ?> internalContext;
 
     ChangeLoggingSessionBytesStore(final SessionStore<Bytes, byte[]> 
bytesStore) {
         super(bytesStore);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java
new file mode 100644
index 00000000000..1f8bf204dbf
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.SessionStore;
+
+import static 
org.apache.kafka.streams.state.internals.AggregationWithHeadersDeserializer.headers;
+import static 
org.apache.kafka.streams.state.internals.AggregationWithHeadersDeserializer.rawAggregation;
+
+/**
+ * Change-logging wrapper for a session bytes store whose values also carry 
headers.
+ * <p>
+ * The header-aware serialized value format is produced by {@link 
AggregationWithHeadersSerializer}.
+ * <p>
+ * Semantics:
+ *  - The inner store value format is:
+ *        [headersSize(varint)][headersBytes][aggregationBytes]
+ *  - The changelog record value logged via {@code logChange(...)} is just the 
{@code aggregation}
+ *    (no headers prefix), and the headers are logged separately.
+ */
+public class ChangeLoggingSessionBytesStoreWithHeaders
+    extends ChangeLoggingSessionBytesStore {
+
+    ChangeLoggingSessionBytesStoreWithHeaders(final SessionStore<Bytes, 
byte[]> bytesStore) {
+        super(bytesStore);
+    }
+
+    @Override
+    public void remove(final Windowed<Bytes> sessionKey) {
+        wrapped().remove(sessionKey);
+        internalContext.logChange(
+            name(),
+            SessionKeySchema.toBinary(sessionKey),
+            null,
+            internalContext.recordContext().timestamp(),
+            internalContext.recordContext().headers(),
+            wrapped().getPosition()
+        );
+    }
+
+    @Override
+    public void put(final Windowed<Bytes> sessionKey, final byte[] 
aggregationWithHeaders) {
+        wrapped().put(sessionKey, aggregationWithHeaders);
+        internalContext.logChange(
+            name(),
+            SessionKeySchema.toBinary(sessionKey),
+            rawAggregation(aggregationWithHeaders),
+            internalContext.recordContext().timestamp(),
+            aggregationWithHeaders == null
+                ? internalContext.recordContext().headers()
+                : headers(aggregationWithHeaders),
+            wrapped().getPosition()
+        );
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeadersTest.java
new file mode 100644
index 00000000000..af108a8dc34
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeadersTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.SessionStore;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class ChangeLoggingSessionBytesStoreWithHeadersTest {
+
+    private static final String TOPIC = "topic";
+    private static final Position POSITION = 
Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 1L)))));
+
+    @Mock
+    private SessionStore<Bytes, byte[]> inner;
+    @Mock
+    private ProcessorContextImpl context;
+
+    private ChangeLoggingSessionBytesStoreWithHeaders store;
+
+    private final byte[] value1 = {0};
+    private final Bytes bytesKey = Bytes.wrap(value1);
+    private final Windowed<Bytes> key1 = new Windowed<>(bytesKey, new 
SessionWindow(0, 0));
+
+    private final AggregationWithHeadersSerializer<byte[]> serializer =
+        new 
AggregationWithHeadersSerializer<>(Serdes.ByteArray().serializer());
+
+    @BeforeEach
+    public void setUp() {
+        store = new ChangeLoggingSessionBytesStoreWithHeaders(inner);
+        store.init(context, store);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        verify(inner).init(context, store);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldDelegateInit() {
+        final SessionStore<Bytes, byte[]> innerMock = mock(SessionStore.class);
+        final ChangeLoggingSessionBytesStoreWithHeaders outer =
+            new ChangeLoggingSessionBytesStoreWithHeaders(innerMock);
+
+        outer.init(context, outer);
+        verify(innerMock).init(context, outer);
+    }
+
+    @Test
+    public void shouldLogPutWithHeaders() {
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add("key1", "value1".getBytes());
+        final AggregationWithHeaders<byte[]> aggWithHeaders = 
AggregationWithHeaders.make(value1, headers);
+        final byte[] serializedValue = serializer.serialize(TOPIC, 
aggWithHeaders);
+
+        final Bytes binaryKey = SessionKeySchema.toBinary(key1);
+        when(inner.getPosition()).thenReturn(Position.emptyPosition());
+        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, TOPIC, new RecordHeaders()));
+
+        store.put(key1, serializedValue);
+
+        verify(inner).put(key1, serializedValue);
+        verify(context).logChange(
+            store.name(),
+            binaryKey,
+            value1,
+            0L,
+            headers,
+            Position.emptyPosition()
+        );
+    }
+
+    @Test
+    public void shouldLogPutWithPosition() {
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add("key1", "value1".getBytes());
+        final AggregationWithHeaders<byte[]> aggWithHeaders = 
AggregationWithHeaders.make(value1, headers);
+        final byte[] serializedValue = serializer.serialize(TOPIC, 
aggWithHeaders);
+
+        final Bytes binaryKey = SessionKeySchema.toBinary(key1);
+        when(inner.getPosition()).thenReturn(POSITION);
+        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, TOPIC, new RecordHeaders()));
+
+        store.put(key1, serializedValue);
+
+        verify(inner).put(key1, serializedValue);
+        verify(context).logChange(
+            store.name(),
+            binaryKey,
+            value1,
+            0L,
+            headers,
+            POSITION
+        );
+    }
+
+    @Test
+    public void shouldHandleNullValueInPut() {
+        final RecordHeaders contextHeaders = new RecordHeaders();
+        contextHeaders.add("headerKey", "headerValue".getBytes());
+
+        final Bytes binaryKey = SessionKeySchema.toBinary(key1);
+        when(inner.getPosition()).thenReturn(Position.emptyPosition());
+        when(context.recordContext()).thenReturn(new 
ProcessorRecordContext(42L, 0, 0, TOPIC, contextHeaders));
+
+        store.put(key1, null);
+
+        verify(inner).put(key1, null);
+        verify(context).logChange(
+            store.name(),
+            binaryKey,
+            null,
+            42L,
+            contextHeaders,
+            Position.emptyPosition()
+        );
+    }
+
+    @Test
+    public void shouldHandleEmptyHeaders() {
+        final RecordHeaders emptyHeaders = new RecordHeaders();
+        final AggregationWithHeaders<byte[]> aggWithHeaders = 
AggregationWithHeaders.make(value1, emptyHeaders);
+        final byte[] serializedValue = serializer.serialize(TOPIC, 
aggWithHeaders);
+
+        final Bytes binaryKey = SessionKeySchema.toBinary(key1);
+        when(inner.getPosition()).thenReturn(Position.emptyPosition());
+        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, TOPIC, new RecordHeaders()));
+
+        store.put(key1, serializedValue);
+
+        verify(inner).put(key1, serializedValue);
+        verify(context).logChange(
+            store.name(),
+            binaryKey,
+            value1,
+            0L,
+            emptyHeaders,
+            Position.emptyPosition()
+        );
+    }
+
+    @Test
+    public void shouldLogRemoveWithRecordContextHeaders() {
+        final RecordHeaders contextHeaders = new RecordHeaders();
+        contextHeaders.add("contextKey", "contextValue".getBytes());
+
+        final Bytes binaryKey = SessionKeySchema.toBinary(key1);
+        when(inner.getPosition()).thenReturn(Position.emptyPosition());
+        when(context.recordContext()).thenReturn(new 
ProcessorRecordContext(42L, 0, 0, TOPIC, contextHeaders));
+
+        store.remove(key1);
+
+        verify(inner).remove(key1);
+        verify(context).logChange(
+            store.name(),
+            binaryKey,
+            null,
+            42L,
+            contextHeaders,
+            Position.emptyPosition()
+        );
+    }
+
+    @Test
+    public void shouldHandleMultipleHeadersInSingleRecord() {
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add("header1", "value1".getBytes());
+        headers.add("header2", "value2".getBytes());
+        headers.add("header3", "value3".getBytes());
+        final AggregationWithHeaders<byte[]> aggWithHeaders = 
AggregationWithHeaders.make(value1, headers);
+        final byte[] serializedValue = serializer.serialize(TOPIC, 
aggWithHeaders);
+
+        final Bytes binaryKey = SessionKeySchema.toBinary(key1);
+        when(inner.getPosition()).thenReturn(Position.emptyPosition());
+        when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 
0, 0, TOPIC, new RecordHeaders()));
+
+        store.put(key1, serializedValue);
+
+        verify(inner).put(key1, serializedValue);
+        verify(context).logChange(
+            store.name(),
+            binaryKey,
+            value1,
+            0L,
+            headers,
+            Position.emptyPosition()
+        );
+    }
+}

Reply via email to