mjsax commented on code in PR #21600:
URL: https://github.com/apache/kafka/pull/21600#discussion_r2867048421
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -165,13 +168,21 @@ public void process(final Record<KIn, VIn> record) {
observedStreamTime = Math.max(observedStreamTime, timestamp);
final long windowCloseTime = observedStreamTime -
windows.gracePeriodMs() - windows.inactivityGap();
+ processRecord(record, timestamp, windowCloseTime);
+
+ maybeForwardFinalResult(record, windowCloseTime);
+ }
+
+ private void processRecord(final Record<KIn, VIn> record,
+ final long timestamp,
+ final long windowCloseTime) {
final List<KeyValue<Windowed<KIn>, VAgg>> merged = new
ArrayList<>();
final SessionWindow newSessionWindow = new
SessionWindow(timestamp, timestamp);
SessionWindow mergedWindow = newSessionWindow;
VAgg agg = initializer.apply();
try (
- final KeyValueIterator<Windowed<KIn>, VAgg> iterator =
store.findSessions(
+ final KeyValueIterator<Windowed<KIn>, VAgg> iterator =
storeWrapper.findSessions(
Review Comment:
```suggestion
final KeyValueIterator<Windowed<KIn>,
AggregationWithHeaders<VAgg>> iterator = store.findSessions(
```
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreWrapper.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionStore;
+
+import java.util.NoSuchElementException;
+
+/**
+ * A wrapper for session stores that abstracts whether the underlying store
supports headers.
+ * When the store is headers-aware, it delegates to a {@code SessionStore<K,
AggregationWithHeaders<VAgg>>};
+ * otherwise it delegates to a plain {@code SessionStore<K, VAgg>}.
+ *
+ * @param <K> The key type
+ * @param <VAgg> The aggregated value type
+ */
+@SuppressWarnings("unchecked")
+public class SessionStoreWrapper<K, VAgg> {
Review Comment:
I think we don't need this class.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersIteratorAdapter.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import static
org.apache.kafka.streams.state.internals.SessionToHeadersStoreAdapter.rawAggregationValue;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and
+ * {@link org.apache.kafka.streams.state.SessionStore}.
+ * <p>
+ * When iterating over session entries from a store that stores values with
headers,
+ * this adapter strips the headers prefix so the caller receives raw
aggregation bytes
+ * without headers.
+ *
+ * @see SessionToHeadersStoreAdapter
+ */
+class SessionToHeadersIteratorAdapter<K> implements KeyValueIterator<K,
byte[]> {
Review Comment:
Do I miss something? Why would we need a generic type?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersIteratorAdapter.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import static
org.apache.kafka.streams.state.internals.SessionToHeadersStoreAdapter.rawAggregationValue;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and
+ * {@link org.apache.kafka.streams.state.SessionStore}.
+ * <p>
+ * When iterating over session entries from a store that stores values with
headers,
+ * this adapter strips the headers prefix so the caller receives raw
aggregation bytes
+ * without headers.
+ *
+ * @see SessionToHeadersStoreAdapter
+ */
+class SessionToHeadersIteratorAdapter<K> implements KeyValueIterator<K,
byte[]> {
Review Comment:
```suggestion
class SessionToHeadersIteratorAdapter implements KeyValueIterator<Bytes,
byte[]> {
```
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -165,13 +168,21 @@ public void process(final Record<KIn, VIn> record) {
observedStreamTime = Math.max(observedStreamTime, timestamp);
final long windowCloseTime = observedStreamTime -
windows.gracePeriodMs() - windows.inactivityGap();
+ processRecord(record, timestamp, windowCloseTime);
+
+ maybeForwardFinalResult(record, windowCloseTime);
+ }
+
+ private void processRecord(final Record<KIn, VIn> record,
+ final long timestamp,
+ final long windowCloseTime) {
final List<KeyValue<Windowed<KIn>, VAgg>> merged = new
ArrayList<>();
final SessionWindow newSessionWindow = new
SessionWindow(timestamp, timestamp);
SessionWindow mergedWindow = newSessionWindow;
VAgg agg = initializer.apply();
try (
- final KeyValueIterator<Windowed<KIn>, VAgg> iterator =
store.findSessions(
+ final KeyValueIterator<Windowed<KIn>, VAgg> iterator =
storeWrapper.findSessions(
Review Comment:
This is what I would expect here. We update the DSL to work agains
header-store now.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -127,7 +130,7 @@ public void init(final ProcessorContext<Windowed<KIn>,
Change<VAgg>> context) {
droppedRecordsSensor = droppedRecordsSensor(threadId,
context.taskId().toString(), metrics);
emittedRecordsSensor = emittedRecordsSensor(threadId,
context.taskId().toString(), processorName, metrics);
emitFinalLatencySensor = emitFinalLatencySensor(threadId,
context.taskId().toString(), processorName, metrics);
- store = context.getStateStore(storeName);
+ storeWrapper = new SessionStoreWrapper<>(context, storeName);
Review Comment:
Given the `Adapter` we insert inside `SessionSToreBuilderWithHeader#build()`
on the underlying byte-store we don't need to do any wrapping here.
Note that `build()` return a `MeteredSessioneStoreWithHeaders` and we get
exactly this store back via `context.getStateStore(storeName)`.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -105,7 +107,7 @@ public void enableSendingOldValues() {
private class KStreamSessionWindowAggregateProcessor extends
ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
- private SessionStore<KIn, VAgg> store;
+ private SessionStoreWrapper<KIn, VAgg> storeWrapper;
Review Comment:
Not sure why we would need this one?
We should just update `SessionStore` to `SessionStoreWithHeaders`.
##########
streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java:
##########
@@ -44,4 +44,13 @@ public interface SessionBytesStoreSupplier extends
StoreSupplier<SessionStore<By
* @return retentionPeriod
*/
long retentionPeriod();
+
+ /**
+ * Whether the store supports headers.
+ *
+ * @return {@code true} if the store supports headers, {@code false}
otherwise
+ */
+ default boolean withHeaders() {
Review Comment:
I would hope we don't need this one?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderWithHeaders.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+
+import java.time.Instant;
+import java.util.Objects;
+
+/**
+ * Builder for {@link SessionStoreWithHeaders} instances.
+ *
+ * This is analogous to {@link SessionStoreBuilder}, but uses
+ * {@link AggregationWithHeaders} as the value wrapper and wires up the
+ * header-aware store stack (change-logging, caching, metering).
+ */
+public class SessionStoreBuilderWithHeaders<K, V>
+ extends AbstractStoreBuilder<K, AggregationWithHeaders<V>,
SessionStoreWithHeaders<K, V>> {
+
+ private final SessionBytesStoreSupplier storeSupplier;
+
+ public SessionStoreBuilderWithHeaders(final SessionBytesStoreSupplier
storeSupplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final Time time) {
+ super(
+ Objects.requireNonNull(storeSupplier, "storeSupplier cannot be
null").name(),
+ keySerde,
+ valueSerde == null ? null : new
AggregationWithHeadersSerde<>(valueSerde),
+ time
+ );
+ Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's
metricsScope can't be null");
+ this.storeSupplier = storeSupplier;
+ }
+
+ @Override
+ public SessionStoreWithHeaders<K, V> build() {
+ SessionStore<Bytes, byte[]> sessionStore = storeSupplier.get();
+
+ if (!(sessionStore instanceof HeadersBytesStore)) {
+ if (sessionStore.persistent()) {
+ sessionStore = new SessionToHeadersStoreAdapter(sessionStore);
+ } else {
+ sessionStore = new
SessionStoreBuilderWithHeaders.InMemorySessionStoreWithHeadersMarker(sessionStore);
Review Comment:
```suggestion
sessionStore = new
InMemorySessionStoreWithHeadersMarker(sessionStore);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]