mjsax commented on code in PR #21600:
URL: https://github.com/apache/kafka/pull/21600#discussion_r2867048421


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -165,13 +168,21 @@ public void process(final Record<KIn, VIn> record) {
             observedStreamTime = Math.max(observedStreamTime, timestamp);
             final long windowCloseTime = observedStreamTime - 
windows.gracePeriodMs() - windows.inactivityGap();
 
+            processRecord(record, timestamp, windowCloseTime);
+
+            maybeForwardFinalResult(record, windowCloseTime);
+        }
+
+        private void processRecord(final Record<KIn, VIn> record,
+                                    final long timestamp,
+                                    final long windowCloseTime) {
             final List<KeyValue<Windowed<KIn>, VAgg>> merged = new 
ArrayList<>();
             final SessionWindow newSessionWindow = new 
SessionWindow(timestamp, timestamp);
             SessionWindow mergedWindow = newSessionWindow;
             VAgg agg = initializer.apply();
 
             try (
-                final KeyValueIterator<Windowed<KIn>, VAgg> iterator = 
store.findSessions(
+                final KeyValueIterator<Windowed<KIn>, VAgg> iterator = 
storeWrapper.findSessions(

Review Comment:
   ```suggestion
                   final KeyValueIterator<Windowed<KIn>, 
AggregationWithHeaders<VAgg>> iterator = store.findSessions(
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreWrapper.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionStore;
+
+import java.util.NoSuchElementException;
+
+/**
+ * A wrapper for session stores that abstracts whether the underlying store 
supports headers.
+ * When the store is headers-aware, it delegates to a {@code SessionStore<K, 
AggregationWithHeaders<VAgg>>};
+ * otherwise it delegates to a plain {@code SessionStore<K, VAgg>}.
+ *
+ * @param <K>    The key type
+ * @param <VAgg> The aggregated value type
+ */
+@SuppressWarnings("unchecked")
+public class SessionStoreWrapper<K, VAgg> {

Review Comment:
   I think we don't need this class.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersIteratorAdapter.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import static 
org.apache.kafka.streams.state.internals.SessionToHeadersStoreAdapter.rawAggregationValue;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and
+ * {@link org.apache.kafka.streams.state.SessionStore}.
+ * <p>
+ * When iterating over session entries from a store that stores values with 
headers,
+ * this adapter strips the headers prefix so the caller receives raw 
aggregation bytes
+ * without headers.
+ *
+ * @see SessionToHeadersStoreAdapter
+ */
+class SessionToHeadersIteratorAdapter<K> implements KeyValueIterator<K, 
byte[]> {

Review Comment:
   Do I miss something? Why would we need a generic type?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersIteratorAdapter.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import static 
org.apache.kafka.streams.state.internals.SessionToHeadersStoreAdapter.rawAggregationValue;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and
+ * {@link org.apache.kafka.streams.state.SessionStore}.
+ * <p>
+ * When iterating over session entries from a store that stores values with 
headers,
+ * this adapter strips the headers prefix so the caller receives raw 
aggregation bytes
+ * without headers.
+ *
+ * @see SessionToHeadersStoreAdapter
+ */
+class SessionToHeadersIteratorAdapter<K> implements KeyValueIterator<K, 
byte[]> {

Review Comment:
   ```suggestion
   class SessionToHeadersIteratorAdapter implements KeyValueIterator<Bytes, 
byte[]> {
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -165,13 +168,21 @@ public void process(final Record<KIn, VIn> record) {
             observedStreamTime = Math.max(observedStreamTime, timestamp);
             final long windowCloseTime = observedStreamTime - 
windows.gracePeriodMs() - windows.inactivityGap();
 
+            processRecord(record, timestamp, windowCloseTime);
+
+            maybeForwardFinalResult(record, windowCloseTime);
+        }
+
+        private void processRecord(final Record<KIn, VIn> record,
+                                    final long timestamp,
+                                    final long windowCloseTime) {
             final List<KeyValue<Windowed<KIn>, VAgg>> merged = new 
ArrayList<>();
             final SessionWindow newSessionWindow = new 
SessionWindow(timestamp, timestamp);
             SessionWindow mergedWindow = newSessionWindow;
             VAgg agg = initializer.apply();
 
             try (
-                final KeyValueIterator<Windowed<KIn>, VAgg> iterator = 
store.findSessions(
+                final KeyValueIterator<Windowed<KIn>, VAgg> iterator = 
storeWrapper.findSessions(

Review Comment:
   This is what I would expect here. We update the DSL to work agains 
header-store now.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -127,7 +130,7 @@ public void init(final ProcessorContext<Windowed<KIn>, 
Change<VAgg>> context) {
             droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
             emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(), processorName, metrics);
             emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(), processorName, metrics);
-            store = context.getStateStore(storeName);
+            storeWrapper = new SessionStoreWrapper<>(context, storeName);

Review Comment:
   Given the `Adapter` we insert inside `SessionSToreBuilderWithHeader#build()` 
on the underlying byte-store we don't need to do any wrapping here.
   
   Note that `build()` return a `MeteredSessioneStoreWithHeaders` and we get 
exactly this store back via  `context.getStateStore(storeName)`.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -105,7 +107,7 @@ public void enableSendingOldValues() {
     private class KStreamSessionWindowAggregateProcessor extends
         ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
 
-        private SessionStore<KIn, VAgg> store;
+        private SessionStoreWrapper<KIn, VAgg> storeWrapper;

Review Comment:
   Not sure why we would need this one?
   
   We should just update `SessionStore` to `SessionStoreWithHeaders`.



##########
streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java:
##########
@@ -44,4 +44,13 @@ public interface SessionBytesStoreSupplier extends 
StoreSupplier<SessionStore<By
      * @return retentionPeriod
      */
     long retentionPeriod();
+
+    /**
+     * Whether the store supports headers.
+     *
+     * @return {@code true} if the store supports headers, {@code false} 
otherwise
+     */
+    default boolean withHeaders() {

Review Comment:
   I would hope we don't need this one?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderWithHeaders.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+
+import java.time.Instant;
+import java.util.Objects;
+
+/**
+ * Builder for {@link SessionStoreWithHeaders} instances.
+ *
+ * This is analogous to {@link SessionStoreBuilder}, but uses
+ * {@link AggregationWithHeaders} as the value wrapper and wires up the
+ * header-aware store stack (change-logging, caching, metering).
+ */
+public class SessionStoreBuilderWithHeaders<K, V>
+    extends AbstractStoreBuilder<K, AggregationWithHeaders<V>, 
SessionStoreWithHeaders<K, V>> {
+
+    private final SessionBytesStoreSupplier storeSupplier;
+
+    public SessionStoreBuilderWithHeaders(final SessionBytesStoreSupplier 
storeSupplier,
+                                          final Serde<K> keySerde,
+                                          final Serde<V> valueSerde,
+                                          final Time time) {
+        super(
+            Objects.requireNonNull(storeSupplier, "storeSupplier cannot be 
null").name(),
+            keySerde,
+            valueSerde == null ? null : new 
AggregationWithHeadersSerde<>(valueSerde),
+            time
+        );
+        Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's 
metricsScope can't be null");
+        this.storeSupplier = storeSupplier;
+    }
+
+    @Override
+    public SessionStoreWithHeaders<K, V> build() {
+        SessionStore<Bytes, byte[]> sessionStore = storeSupplier.get();
+
+        if (!(sessionStore instanceof HeadersBytesStore)) {
+            if (sessionStore.persistent()) {
+                sessionStore = new SessionToHeadersStoreAdapter(sessionStore);
+            } else {
+                sessionStore = new 
SessionStoreBuilderWithHeaders.InMemorySessionStoreWithHeadersMarker(sessionStore);

Review Comment:
   ```suggestion
                   sessionStore = new 
InMemorySessionStoreWithHeadersMarker(sessionStore);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to