Repository: samza Updated Branches: refs/heads/master 072457a2e -> e94abca72
SAMZA-1424 SAMZA-1425 SAMZA-1426; Support serdes and persistent state for windows Notable changes: * Made windows durable with support for persistent recoverable stores * New storage format to support multiple messages in windows (the previous storage format of storing the entire message-list as a value in the store incurs significant serde overhead) * Wire `TimeSeriesStore` with the WindowOperator implementation. Testing: * Existing unit tests and integration tests pass with serdes wired-up * Will follow-up with a PR for hello-samza soon. Note: Majority of changes are in `samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java` and github collapsed the diff. Author: Jagadish <jvenkatra...@linkedin.com> Reviewers: Prateek Maheshwari<pmahe...@linkedin.com> Closes #321 from vjagadish1989/window-operator-serde Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e94abca7 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e94abca7 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e94abca7 Branch: refs/heads/master Commit: e94abca72f73a38780c93111f06c69f4aa4255a7 Parents: 072457a Author: Jagadish <jvenkatra...@linkedin.com> Authored: Fri Oct 13 13:22:56 2017 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Fri Oct 13 13:22:56 2017 -0700 ---------------------------------------------------------------------- .../apache/samza/operators/windows/Window.java | 1 - .../samza/operators/windows/WindowKey.java | 3 +- .../apache/samza/operators/windows/Windows.java | 71 ++--- .../windows/internal/WindowInternal.java | 82 +++-- .../apache/samza/operators/impl/TriggerKey.java | 40 +-- .../operators/impl/WindowOperatorImpl.java | 306 +++++++++++-------- .../samza/operators/impl/WindowState.java | 49 --- .../operators/impl/store/TimeSeriesKey.java | 3 +- .../impl/store/TimeSeriesKeySerde.java | 10 +- .../operators/impl/store/TimeSeriesStore.java | 36 +++ .../impl/store/TimeSeriesStoreImpl.java | 60 ++++ .../operators/spec/WindowOperatorSpec.java | 19 +- .../samza/example/PageViewCounterExample.java | 2 +- .../samza/example/RepartitionExample.java | 2 +- .../org/apache/samza/example/WindowExample.java | 2 +- .../samza/execution/TestExecutionPlanner.java | 4 +- .../samza/operators/TestMessageStreamImpl.java | 2 +- .../samza/operators/TestWindowOperator.java | 27 +- .../operators/spec/TestWindowOperatorSpec.java | 9 +- .../samza/zk/TestScheduleAfterDebounceTime.java | 7 +- .../test/operator/RepartitionJoinWindowApp.java | 2 +- .../samza/test/operator/SessionWindowApp.java | 3 +- .../samza/test/operator/TumblingWindowApp.java | 2 +- 23 files changed, 470 insertions(+), 272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java index 6f75993..1c0fa53 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java @@ -104,5 +104,4 @@ public interface Window<M, K, WV> { * @return the {@link Window} function with {@code mode} set as its {@link AccumulationMode}. */ Window<M, K, WV> setAccumulationMode(AccumulationMode mode); - } http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java index 6c66654..550ed1a 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java @@ -66,8 +66,7 @@ public class WindowKey<K> { WindowKey<?> windowKey = (WindowKey<?>) o; if (key != null ? !key.equals(windowKey.key) : windowKey.key != null) return false; - return !(paneId != null ? !paneId.equals(windowKey.paneId) : windowKey.paneId != null); - + return paneId != null ? paneId.equals(windowKey.paneId) : windowKey.paneId == null; } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java index b478cfa..50391ff 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java @@ -26,9 +26,9 @@ import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.operators.windows.internal.WindowType; +import org.apache.samza.serializers.Serde; import java.time.Duration; -import java.util.ArrayList; import java.util.Collection; import java.util.function.Function; import java.util.function.Supplier; @@ -118,18 +118,20 @@ public final class Windows { * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created. * @param aggregator the function to incrementally update the window value. Invoked when a new message * arrives for the window. + * @param keySerde the serde for the window key + * @param windowValueSerde the serde for the window value * @param <M> the type of the input message * @param <WV> the type of the {@link WindowPane} output value * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function. */ - public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow( - Function<? super M, ? extends K> keyFn, Duration interval, - Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator) { + public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval, + Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator, Serde<K> keySerde, + Serde<WV> windowValueSerde) { Trigger<M> defaultTrigger = new TimeTrigger<>(interval); return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator, - (Function<M, K>) keyFn, null, WindowType.TUMBLING); + (Function<M, K>) keyFn, null, WindowType.TUMBLING, keySerde, windowValueSerde, null); } @@ -149,16 +151,18 @@ public final class Windows { * * @param keyFn function to extract key from the message * @param interval the duration in processing time + * @param keySerde the serde for the window key + * @param msgSerde the serde for the input message * @param <M> the type of the input message * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function */ - public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow( - Function<? super M, ? extends K> keyFn, Duration interval) { - FoldLeftFunction<M, Collection<M>> aggregator = createAggregator(); + public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<M, K> keyFn, Duration interval, + Serde<K> keySerde, Serde<M> msgSerde) { - Supplier<Collection<M>> initialValue = ArrayList::new; - return keyedTumblingWindow(keyFn, interval, initialValue, aggregator); + Trigger<M> defaultTrigger = new TimeTrigger<>(interval); + return new WindowInternal<>(defaultTrigger, null, null, keyFn, null, + WindowType.TUMBLING, keySerde, null, msgSerde); } /** @@ -180,18 +184,20 @@ public final class Windows { * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created. * @param aggregator the function to incrementally update the window value. Invoked when a new message * arrives for the window. + * @param windowValueSerde the serde for the window value * @param <M> the type of the input message * @param <WV> the type of the {@link WindowPane} output value * @return the created {@link Window} function */ public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration interval, Supplier<? extends WV> initialValue, - FoldLeftFunction<? super M, WV> aggregator) { + FoldLeftFunction<? super M, WV> aggregator, Serde<WV> windowValueSerde) { Trigger<M> defaultTrigger = new TimeTrigger<>(interval); return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator, - null, null, WindowType.TUMBLING); + null, null, WindowType.TUMBLING, null, windowValueSerde, null); } /** + * * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping * processing time based windows. * @@ -209,14 +215,15 @@ public final class Windows { * </pre> * * @param duration the duration in processing time + * @param msgSerde the serde for the input message * @param <M> the type of the input message * @return the created {@link Window} function */ - public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration) { - FoldLeftFunction<M, Collection<M>> aggregator = createAggregator(); + public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration, Serde<M> msgSerde) { + Trigger<M> defaultTrigger = new TimeTrigger<>(duration); - Supplier<Collection<M>> initialValue = ArrayList::new; - return tumblingWindow(duration, initialValue, aggregator); + return new WindowInternal<>(defaultTrigger, null, null, null, + null, WindowType.TUMBLING, null, null, msgSerde); } /** @@ -244,17 +251,19 @@ public final class Windows { * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created. * @param aggregator the function to incrementally update the window value. Invoked when a new message * arrives for the window. + * @param keySerde the serde for the window key + * @param windowValueSerde the serde for the window value * @param <M> the type of the input message * @param <K> the type of the key in the {@link Window} * @param <WV> the type of the output value in the {@link WindowPane} * @return the created {@link Window} function */ - public static <M, K, WV> Window<M, K, WV> keyedSessionWindow( - Function<? super M, ? extends K> keyFn, Duration sessionGap, - Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator) { + public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<? super M, ? extends K> keyFn, + Duration sessionGap, Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator, + Serde<K> keySerde, Serde<WV> windowValueSerde) { Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap); return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator, - (Function<M, K>) keyFn, null, WindowType.SESSION); + (Function<M, K>) keyFn, null, WindowType.SESSION, keySerde, windowValueSerde, null); } /** @@ -279,25 +288,17 @@ public final class Windows { * * @param keyFn the function to extract the window key from a message} * @param sessionGap the timeout gap for defining the session + * @param keySerde the serde for the window key + * @param msgSerde the serde for the input message * @param <M> the type of the input message * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function */ - public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow( - Function<? super M, ? extends K> keyFn, Duration sessionGap) { - - FoldLeftFunction<M, Collection<M>> aggregator = createAggregator(); - - Supplier<Collection<M>> initialValue = ArrayList::new; - return keyedSessionWindow(keyFn, sessionGap, initialValue, aggregator); - } - + public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<? super M, ? extends K> keyFn, + Duration sessionGap, Serde<K> keySerde, Serde<M> msgSerde) { - private static <M> FoldLeftFunction<M, Collection<M>> createAggregator() { - return (m, c) -> { - c.add(m); - return c; - }; + Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap); + return new WindowInternal<>(defaultTrigger, null, null, (Function<M, K>) keyFn, + null, WindowType.SESSION, keySerde, null, msgSerde); } - } http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java index f6ac301..bc71872 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java @@ -22,6 +22,7 @@ import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.windows.AccumulationMode; import org.apache.samza.operators.windows.Window; +import org.apache.samza.serializers.Serde; import java.util.function.Function; import java.util.function.Supplier; @@ -66,37 +67,46 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> { */ private final WindowType windowType; - private Trigger<M> earlyTrigger; + private final Serde<WK> keySerde; + private final Serde<WV> windowValSerde; + private final Serde<M> msgSerde; + private Trigger<M> earlyTrigger; private Trigger<M> lateTrigger; - private AccumulationMode mode; - public WindowInternal(Trigger<M> defaultTrigger, Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldLeftFunction, Function<M, WK> keyExtractor, Function<M, Long> eventTimeExtractor, WindowType windowType) { + public WindowInternal(Trigger<M> defaultTrigger, Supplier<WV> initializer, FoldLeftFunction<M, WV> foldLeftFunction, + Function<M, WK> keyExtractor, Function<M, Long> eventTimeExtractor, WindowType windowType, Serde<WK> keySerde, + Serde<WV> windowValueSerde, Serde<M> msgSerde) { this.defaultTrigger = defaultTrigger; - this.initializer = initialValue; + this.initializer = initializer; this.foldLeftFunction = foldLeftFunction; this.eventTimeExtractor = eventTimeExtractor; this.keyExtractor = keyExtractor; this.windowType = windowType; - } + this.keySerde = keySerde; + this.windowValSerde = windowValueSerde; + this.msgSerde = msgSerde; - @Override - public Window<M, WK, WV> setEarlyTrigger(Trigger<M> trigger) { - this.earlyTrigger = trigger; - return this; - } + if (defaultTrigger == null) { + throw new IllegalArgumentException("A window must not have a null default trigger"); + } - @Override - public Window<M, WK, WV> setLateTrigger(Trigger<M> trigger) { - this.lateTrigger = trigger; - return this; - } + if (msgSerde == null && windowValueSerde == null) { + throw new IllegalArgumentException("A window must not have a null msg serde and a null windowValue serde"); + } - @Override - public Window<M, WK, WV> setAccumulationMode(AccumulationMode mode) { - this.mode = mode; - return this; + if (foldLeftFunction != null && windowValSerde == null) { + throw new IllegalArgumentException("A window with a FoldLeftFunction must have a windowValue serde"); + } + + if (foldLeftFunction != null && initializer == null) { + throw new IllegalArgumentException("A window with a FoldLeftFunction must have an initializer"); + } + + if (foldLeftFunction == null && initializer != null) { + throw new IllegalArgumentException("A window without a provided FoldLeftFunction must not have an initializer"); + } } public Trigger<M> getDefaultTrigger() { @@ -134,4 +144,38 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> { public AccumulationMode getAccumulationMode() { return mode; } + + public Serde<WK> getKeySerde() { + return keySerde; + } + + public Serde<WV> getWindowValSerde() { + return windowValSerde; + } + + public Serde<M> getMsgSerde() { + return msgSerde; + } + + public AccumulationMode getMode() { + return mode; + } + + @Override + public Window<M, WK, WV> setEarlyTrigger(Trigger<M> trigger) { + this.earlyTrigger = trigger; + return this; + } + + @Override + public Window<M, WK, WV> setLateTrigger(Trigger<M> trigger) { + this.lateTrigger = trigger; + return this; + } + + @Override + public Window<M, WK, WV> setAccumulationMode(AccumulationMode mode) { + this.mode = mode; + return this; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java index c089737..736f167 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java @@ -20,53 +20,57 @@ package org.apache.samza.operators.impl; import org.apache.samza.operators.triggers.FiringType; -import org.apache.samza.operators.windows.WindowKey; /** * Uniquely identifies a trigger firing */ -public class TriggerKey<WK> { +public class TriggerKey<K> { private final FiringType type; - private final WindowKey<WK> key; + private final K key; + private final long timestamp; - public TriggerKey(FiringType type, WindowKey<WK> key) { + public TriggerKey(FiringType type, K key, long timestamp) { if (type == null) { throw new IllegalArgumentException("Firing type cannot be null"); } - if (key == null) { - throw new IllegalArgumentException("WindowKey cannot be null"); - } - this.type = type; this.key = key; + this.timestamp = timestamp; } - /** - * Equality is determined by both the type, and the window key. - */ @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - TriggerKey<WK> that = (TriggerKey<WK>) o; - return type == that.type && key.equals(that.key); + + TriggerKey<?> that = (TriggerKey<?>) o; + + if (timestamp != that.timestamp) { + return false; + } + if (type != that.type) { + return false; + } + return key != null ? key.equals(that.key) : that.key == null; } - /** - * Hashcode is computed by from the type, and the window key. - */ @Override public int hashCode() { int result = type.hashCode(); - result = 31 * result + key.hashCode(); + result = 31 * result + (key != null ? key.hashCode() : 0); + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); return result; } - public WindowKey<WK> getKey() { + public K getKey() { return key; } + public long getTimestamp() { + return timestamp; + } + public FiringType getType() { return type; } http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index 736d71e..42fe46a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -20,7 +20,13 @@ */ package org.apache.samza.operators.impl; +import com.google.common.base.Preconditions; import org.apache.samza.config.Config; +import org.apache.samza.operators.functions.FoldLeftFunction; +import org.apache.samza.operators.impl.store.TimeSeriesKey; +import org.apache.samza.operators.impl.store.TimeSeriesStore; +import org.apache.samza.operators.impl.store.TimeSeriesStoreImpl; +import org.apache.samza.operators.impl.store.TimestampedValue; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.operators.triggers.FiringType; @@ -29,12 +35,12 @@ import org.apache.samza.operators.triggers.TimeTrigger; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.TriggerImpl; import org.apache.samza.operators.triggers.TriggerImpls; -import org.apache.samza.operators.util.InternalInMemoryStore; import org.apache.samza.operators.windows.AccumulationMode; import org.apache.samza.operators.windows.WindowKey; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.operators.windows.internal.WindowType; +import org.apache.samza.storage.kv.ClosableIterator; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; @@ -45,89 +51,119 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; /** * Implementation of a window operator that groups messages into finite windows for processing. * * This class implements the processing logic for various types of windows and triggers. It tracks and manages state for * all open windows, the active triggers that correspond to each of the windows and the pending callbacks. It provides - * an implementation of {@link TriggerScheduler} that {@link TriggerImpl}s can use to schedule and cancel callbacks. It - * also orchestrates the flow of messages through the various {@link TriggerImpl}s. + * an implementation of {@link TriggerScheduler} that {@link TriggerImpl}s can use + * to schedule and cancel callbacks. It also orchestrates the flow of messages through the various + * {@link TriggerImpl}s. + * + * <p> An instance of a {@link TriggerImplHandler} is created corresponding to each {@link Trigger} + * configured for a particular window. For every message added to the window, this class looks up the corresponding + * {@link TriggerImplHandler} for the trigger and invokes {@link TriggerImplHandler#onMessage(TriggerKey, Object, + * MessageCollector, TaskCoordinator)} The {@link TriggerImplHandler} maintains the {@link TriggerImpl} instance along + * with whether it has been canceled yet or not. Then, the {@link TriggerImplHandler} invokes onMessage on underlying + * its {@link TriggerImpl} instance. A {@link TriggerImpl} instance is scoped to a window and its firing determines when + * results for its window are emitted. * - * <p> An instance of a {@link TriggerImplHandler} is created corresponding to each {@link Trigger} configured for a - * particular window. For every message added to the window, this class looks up the corresponding {@link TriggerImplHandler} - * for the trigger and invokes {@link TriggerImplHandler#onMessage(TriggerKey, Object, MessageCollector, TaskCoordinator)}. - * The {@link TriggerImplHandler} maintains the {@link TriggerImpl} instance along with whether it has been canceled yet - * or not. Then, the {@link TriggerImplHandler} invokes onMessage on underlying its {@link TriggerImpl} instance. A - * {@link TriggerImpl} instance is scoped to a window and its firing determines when results for its window are emitted. * The {@link WindowOperatorImpl} checks if the trigger fired and returns the result of the firing. * * @param <M> the type of the incoming message - * @param <WK> the type of the key in this {@link org.apache.samza.operators.MessageStream} - * @param <WV> the type of the value in the emitted window pane + * @param <K> the type of the key in the incoming message * */ -public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK, WV>> { - +public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Object>> { + // Object == Collection<M> || WV private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorImpl.class); - private final WindowOperatorSpec<M, WK, WV> windowOpSpec; + private final WindowOperatorSpec<M, K, Object> windowOpSpec; private final Clock clock; - private final WindowInternal<M, WK, WV> window; - private final KeyValueStore<WindowKey<WK>, WindowState<WV>> store = new InternalInMemoryStore<>(); - private TriggerScheduler<WK> triggerScheduler; + private final WindowInternal<M, K, Object> window; + private final FoldLeftFunction<M, Object> foldLeftFn; + private final Supplier<Object> initializer; + private final Function<M, K> keyFn; - // The trigger state corresponding to each {@link TriggerKey}. - private final Map<TriggerKey<WK>, TriggerImplHandler> triggers = new HashMap<>(); + private final TriggerScheduler<K> triggerScheduler; + private final Map<TriggerKey<K>, TriggerImplHandler> triggers = new HashMap<>(); + private TimeSeriesStore<K, Object> timeSeriesStore; - public WindowOperatorImpl(WindowOperatorSpec<M, WK, WV> windowOpSpec, Clock clock) { + public WindowOperatorImpl(WindowOperatorSpec<M, K, Object> windowOpSpec, Clock clock) { this.windowOpSpec = windowOpSpec; this.clock = clock; this.window = windowOpSpec.getWindow(); + this.foldLeftFn = window.getFoldLeftFunction(); + this.initializer = window.getInitializer(); + this.keyFn = window.getKeyExtractor(); this.triggerScheduler= new TriggerScheduler(clock); } @Override protected void handleInit(Config config, TaskContext context) { - WindowInternal<M, WK, WV> window = windowOpSpec.getWindow(); - if (window.getFoldLeftFunction() != null) { - window.getFoldLeftFunction().init(config, context); + WindowInternal<M, K, Object> window = windowOpSpec.getWindow(); + + KeyValueStore<TimeSeriesKey<K>, Object> store = (KeyValueStore<TimeSeriesKey<K>, Object>) context.getStore(windowOpSpec.getOpName()); + + // For aggregating windows, we use the store in over-write mode since we only retain the aggregated + // value. Else, we use the store in append-mode. + if (foldLeftFn != null) { + foldLeftFn.init(config, context); + timeSeriesStore = new TimeSeriesStoreImpl(store, false); + } else { + timeSeriesStore = new TimeSeriesStoreImpl(store, true); } } @Override - public Collection<WindowPane<WK, WV>> handleMessage( + public Collection<WindowPane<K, Object>> handleMessage( M message, MessageCollector collector, TaskCoordinator coordinator) { LOG.trace("Processing message envelope: {}", message); - List<WindowPane<WK, WV>> results = new ArrayList<>(); + List<WindowPane<K, Object>> results = new ArrayList<>(); - WindowKey<WK> storeKey = getStoreKey(message); - WindowState<WV> existingState = store.get(storeKey); - LOG.trace("Store key ({}) has existing state ({})", storeKey, existingState); - WindowState<WV> newState = applyFoldFunction(existingState, message); + K key = (keyFn != null) ? keyFn.apply(message) : null; + long timestamp = getWindowTimestamp(message); - LOG.trace("New window value: {}, earliest timestamp: {}", - newState.getWindowValue(), newState.getEarliestTimestamp()); - store.put(storeKey, newState); + // For aggregating windows, we only store the aggregated window value. + // For non-aggregating windows, we store all messages in the window. + if (foldLeftFn == null) { + timeSeriesStore.put(key, message, timestamp); // store is in append mode + } else { + List<Object> existingState = getValues(key, timestamp); + Preconditions.checkState(existingState.size() == 1, "WindowState for aggregating windows " + + "must not contain more than one entry per window"); + + Object oldVal = existingState.get(0); + if (oldVal == null) { + LOG.trace("No existing state found for key. Invoking initializer."); + oldVal = initializer.get(); + } - if (window.getEarlyTrigger() != null) { - TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.EARLY, storeKey); + Object aggregatedValue = foldLeftFn.apply(message, oldVal); + timeSeriesStore.put(key, aggregatedValue, timestamp); + } + if (window.getEarlyTrigger() != null) { + TriggerKey<K> triggerKey = new TriggerKey<>(FiringType.EARLY, key, timestamp); TriggerImplHandler triggerImplHandler = getOrCreateTriggerImplHandler(triggerKey, window.getEarlyTrigger()); - Optional<WindowPane<WK, WV>> maybeTriggeredPane = + Optional<WindowPane<K, Object>> maybeTriggeredPane = triggerImplHandler.onMessage(triggerKey, message, collector, coordinator); maybeTriggeredPane.ifPresent(results::add); } if (window.getDefaultTrigger() != null) { - TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.DEFAULT, storeKey); + TriggerKey<K> triggerKey = new TriggerKey<>(FiringType.DEFAULT, key, timestamp); TriggerImplHandler triggerImplHandler = getOrCreateTriggerImplHandler(triggerKey, window.getDefaultTrigger()); - Optional<WindowPane<WK, WV>> maybeTriggeredPane = + Optional<WindowPane<K, Object>> maybeTriggeredPane = triggerImplHandler.onMessage(triggerKey, message, collector, coordinator); maybeTriggeredPane.ifPresent(results::add); } @@ -136,16 +172,15 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK } @Override - public Collection<WindowPane<WK, WV>> handleTimer(MessageCollector collector, TaskCoordinator coordinator) { + public Collection<WindowPane<K, Object>> handleTimer(MessageCollector collector, TaskCoordinator coordinator) { LOG.trace("Processing timer."); - List<WindowPane<WK, WV>> results = new ArrayList<>(); + List<WindowPane<K, Object>> results = new ArrayList<>(); + List<TriggerKey<K>> keys = triggerScheduler.runPendingCallbacks(); - List<TriggerKey<WK>> keys = triggerScheduler.runPendingCallbacks(); - - for (TriggerKey<WK> key : keys) { + for (TriggerKey<K> key : keys) { TriggerImplHandler triggerImplHandler = triggers.get(key); if (triggerImplHandler != null) { - Optional<WindowPane<WK, WV>> maybeTriggeredPane = triggerImplHandler.onTimer(key, collector, coordinator); + Optional<WindowPane<K, Object>> maybeTriggeredPane = triggerImplHandler.onTimer(key, collector, coordinator); maybeTriggeredPane.ifPresent(results::add); } } @@ -154,62 +189,21 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK } @Override - protected OperatorSpec<M, WindowPane<WK, WV>> getOperatorSpec() { + protected OperatorSpec<M, WindowPane<K, Object>> getOperatorSpec() { return windowOpSpec; } @Override protected void handleClose() { - WindowInternal<M, WK, WV> window = windowOpSpec.getWindow(); - - if (window.getFoldLeftFunction() != null) { - window.getFoldLeftFunction().close(); + if (foldLeftFn != null) { + foldLeftFn.close(); } - } - - /** - * Get the key to be used for lookups in the store for this message. - */ - private WindowKey<WK> getStoreKey(M message) { - Function<M, WK> keyExtractor = window.getKeyExtractor(); - WK key = null; - - if (keyExtractor != null) { - key = keyExtractor.apply(message); - } - - String paneId = null; - - if (window.getWindowType() == WindowType.TUMBLING) { - long triggerDurationMs = ((TimeTrigger<M>) window.getDefaultTrigger()).getDuration().toMillis(); - final long now = clock.currentTimeMillis(); - Long windowBoundary = now - now % triggerDurationMs; - paneId = windowBoundary.toString(); - } - - return new WindowKey<>(key, paneId); - } - - private WindowState<WV> applyFoldFunction(WindowState<WV> existingState, M message) { - WV wv; - long earliestTimestamp; - - if (existingState == null) { - LOG.trace("No existing state found for key. Invoking initializer."); - wv = window.getInitializer().get(); - earliestTimestamp = clock.currentTimeMillis(); - } else { - wv = existingState.getWindowValue(); - earliestTimestamp = existingState.getEarliestTimestamp(); + if (timeSeriesStore != null) { + timeSeriesStore.close(); } - - WV newVal = window.getFoldLeftFunction().apply(message, wv); - WindowState<WV> newState = new WindowState(newVal, earliestTimestamp); - - return newState; } - private TriggerImplHandler getOrCreateTriggerImplHandler(TriggerKey<WK> triggerKey, Trigger<M> trigger) { + private TriggerImplHandler getOrCreateTriggerImplHandler(TriggerKey<K> triggerKey, Trigger<M> trigger) { TriggerImplHandler wrapper = triggers.get(triggerKey); if (wrapper != null) { LOG.trace("Returning existing trigger wrapper for {}", triggerKey); @@ -218,7 +212,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK LOG.trace("Creating a new trigger wrapper for {}", triggerKey); - TriggerImpl<M, WK> triggerImpl = TriggerImpls.createTriggerImpl(trigger, clock, triggerKey); + TriggerImpl<M, K> triggerImpl = TriggerImpls.createTriggerImpl(trigger, clock, triggerKey); wrapper = new TriggerImplHandler(triggerKey, triggerImpl); triggers.put(triggerKey, wrapper); @@ -228,25 +222,28 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK /** * Handles trigger firings and returns the optional result. */ - private Optional<WindowPane<WK, WV>> onTriggerFired( - TriggerKey<WK> triggerKey, MessageCollector collector, TaskCoordinator coordinator) { + private Optional<WindowPane<K, Object>> onTriggerFired(TriggerKey<K> triggerKey, MessageCollector collector, + TaskCoordinator coordinator) { LOG.trace("Trigger key {} fired." , triggerKey); TriggerImplHandler wrapper = triggers.get(triggerKey); - WindowKey<WK> windowKey = triggerKey.getKey(); - WindowState<WV> state = store.get(windowKey); + long timestamp = triggerKey.getTimestamp(); + K key = triggerKey.getKey(); + List<Object> existingState = getValues(key, timestamp); - if (state == null) { + if (existingState == null || existingState.size() == 0) { LOG.trace("No state found for triggerKey: {}", triggerKey); return Optional.empty(); } - WindowPane<WK, WV> paneOutput = computePaneOutput(triggerKey, state); + Object windowVal = window.getFoldLeftFunction() == null ? existingState : existingState.get(0); - // Handle accumulation modes. + WindowPane<K, Object> paneOutput = computePaneOutput(triggerKey, windowVal); + + // Handle different accumulation modes. if (window.getAccumulationMode() == AccumulationMode.DISCARDING) { LOG.trace("Clearing state for trigger key: {}", triggerKey); - store.put(windowKey, null); + timeSeriesStore.remove(key, timestamp); } // Cancel all early triggers too when the default trigger fires. Also, clean all state for the key. @@ -256,9 +253,8 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK LOG.trace("Default trigger fired. Canceling triggers for {}", triggerKey); cancelTrigger(triggerKey, true); - cancelTrigger(new TriggerKey(FiringType.EARLY, triggerKey.getKey()), true); - WindowKey<WK> key = triggerKey.getKey(); - store.delete(key); + cancelTrigger(new TriggerKey(FiringType.EARLY, triggerKey.getKey(), triggerKey.getTimestamp()), true); + timeSeriesStore.remove(key, timestamp); } // Cancel non-repeating early triggers. All early triggers should be removed from the "triggers" map only after the @@ -276,22 +272,9 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK /** * Computes the pane output corresponding to a {@link TriggerKey} that fired. */ - private WindowPane<WK, WV> computePaneOutput(TriggerKey<WK> triggerKey, WindowState<WV> state) { - WindowKey<WK> windowKey = triggerKey.getKey(); - WV windowVal = state.getWindowValue(); - - // For session windows, we will create a new window key by using the time of the first message in the window as - //the paneId. - if (window.getWindowType() == WindowType.SESSION) { - windowKey = new WindowKey<>(windowKey.getKey(), Long.toString(state.getEarliestTimestamp())); - } - - // Make a defensive copy so that we are immune to further mutations on the collection - if (windowVal instanceof Collection) { - windowVal = (WV) new ArrayList<>((Collection<WV>) windowVal); - } - - WindowPane<WK, WV> paneOutput = + private WindowPane<K, Object> computePaneOutput(TriggerKey<K> triggerKey, Object windowVal) { + WindowKey<K> windowKey = new WindowKey(triggerKey.getKey(), Long.toString(triggerKey.getTimestamp())); + WindowPane<K, Object> paneOutput = new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType()); LOG.trace("Emitting pane output for trigger key {}", triggerKey); return paneOutput; @@ -300,7 +283,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK /** * Cancels the firing of the {@link TriggerImpl} identified by this {@link TriggerKey} and optionally removes it. */ - private void cancelTrigger(TriggerKey<WK> triggerKey, boolean shouldRemove) { + private void cancelTrigger(TriggerKey<K> triggerKey, boolean shouldRemove) { TriggerImplHandler triggerImplHandler = triggers.get(triggerKey); if (triggerImplHandler != null) { triggerImplHandler.cancel(); @@ -311,19 +294,90 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK } /** + * Computes the timestamp of the window this message should belong to. + * + * In the case of tumbling windows, timestamp of a window is defined as the start timestamp of its corresponding window + * interval. For instance, if the tumbling interval is 10 seconds, all messages that arrive between [1000, 1010] + * are assigned to the window with timestamp "1000" + * + * In the case of session windows, timestamp is defined as the timestamp of the earliest message in the window. + * For instance, if the session gap is 10 seconds, and the first message in the window arrives at "1002" seconds, + * all messages (that arrive within 10 seconds of their previous message) are assigned a timestamp "1002". + * + * @param message the input message + * @return the timestamp of the window this message should belong to + */ + private long getWindowTimestamp(M message) { + if (window.getWindowType() == WindowType.TUMBLING) { + long triggerDurationMs = ((TimeTrigger<M>) window.getDefaultTrigger()).getDuration().toMillis(); + final long now = clock.currentTimeMillis(); + // assign timestamp to be the start timestamp of the window boundary + long timestamp = now - now % triggerDurationMs; + return timestamp; + } else { + K key = keyFn.apply(message); + // get the value with the earliest timestamp for the provided key. + ClosableIterator<TimestampedValue<Object>> iterator = timeSeriesStore.get(key, 0, Long.MAX_VALUE, 1); + List<TimestampedValue<Object>> timestampedValues = toList(iterator); + + // If there are no existing sessions for the key, we return the current timestamp. If not, return the + // timestamp of the earliest message. + long timestamp = (timestampedValues.isEmpty())? clock.currentTimeMillis() : timestampedValues.get(0).getTimestamp(); + + return timestamp; + } + } + + /** + * Return a list of values in the store for the provided key and timestamp + * + * @param key the key to look up in the store + * @param timestamp the timestamp to look up in the store + * @return the list of values for the provided key + */ + private List<Object> getValues(K key, long timestamp) { + ClosableIterator<TimestampedValue<Object>> iterator = timeSeriesStore.get(key, timestamp); + List<TimestampedValue<Object>> timestampedValues = toList(iterator); + List<Object> values = timestampedValues.stream().map(element -> element.getValue()).collect(Collectors.toList()); + return values; + } + + /** + * Returns an unmodifiable list of all elements in the provided iterator. + * The iterator is guaranteed to be closed after its execution. + * + * @param iterator the provided iterator. + * @param <V> the type of elements in the iterator + * @return a list of all elements returned by the iterator + */ + static <V> List<V> toList(ClosableIterator<V> iterator) { + List<V> values = new ArrayList<>(); + try { + while (iterator.hasNext()) { + values.add(iterator.next()); + } + } finally { + if (iterator != null) { + iterator.close(); + } + } + return Collections.unmodifiableList(values); + } + + /** * State corresponding to a created {@link TriggerImpl} instance. */ private class TriggerImplHandler { // The context, and the {@link TriggerImpl} instance corresponding to this triggerKey - private final TriggerImpl<M, WK> impl; + private final TriggerImpl<M, K> impl; // Guard to ensure that we don't invoke onMessage or onTimer on already cancelled triggers private boolean isCancelled = false; - public TriggerImplHandler(TriggerKey<WK> key, TriggerImpl<M, WK> impl) { + public TriggerImplHandler(TriggerKey<K> key, TriggerImpl<M, K> impl) { this.impl = impl; } - public Optional<WindowPane<WK, WV>> onMessage(TriggerKey<WK> triggerKey, M message, + public Optional<WindowPane<K, Object>> onMessage(TriggerKey<K> triggerKey, M message, MessageCollector collector, TaskCoordinator coordinator) { if (!isCancelled) { LOG.trace("Forwarding callbacks for {}", message); @@ -332,7 +386,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK if (impl.shouldFire()) { // repeating trigger can trigger multiple times, So, clear the state to allow future triggerings. if (impl instanceof RepeatingTriggerImpl) { - ((RepeatingTriggerImpl<M, WK>) impl).clear(); + ((RepeatingTriggerImpl<M, K>) impl).clear(); } return onTriggerFired(triggerKey, collector, coordinator); } @@ -340,14 +394,14 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK return Optional.empty(); } - public Optional<WindowPane<WK, WV>> onTimer( - TriggerKey<WK> key, MessageCollector collector, TaskCoordinator coordinator) { + public Optional<WindowPane<K, Object>> onTimer(TriggerKey<K> key, MessageCollector collector, + TaskCoordinator coordinator) { if (impl.shouldFire() && !isCancelled) { LOG.trace("Triggering timer triggers"); // repeating trigger can trigger multiple times, So, clear the trigger to allow future triggerings. if (impl instanceof RepeatingTriggerImpl) { - ((RepeatingTriggerImpl<M, WK>) impl).clear(); + ((RepeatingTriggerImpl<M, K>) impl).clear(); } return onTriggerFired(key, collector, coordinator); } http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java deleted file mode 100644 index 4577a5c..0000000 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -/** - * Wraps the value stored for a particular {@link org.apache.samza.operators.windows.WindowKey} with additional metadata. - */ -class WindowState<WV> { - - private final WV wv; - /** - * Time of the first message in the window - */ - private final long earliestRecvTime; - - WindowState(WV wv, long earliestRecvTime) { - this.wv = wv; - this.earliestRecvTime = earliestRecvTime; - } - - WV getWindowValue() { - return wv; - } - - long getEarliestTimestamp() { - return earliestRecvTime; - } - - @Override - public String toString() { - return String.format("WindowState: {time=%d, value=%s}", earliestRecvTime, wv); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java index fad8ca4..3aa2e62 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKey.java @@ -24,9 +24,10 @@ package org.apache.samza.operators.impl.store; */ public class TimeSeriesKey<K> { + public static final int VERSION = 0; + private final K key; private final long timestamp; - private final long seqNum; public TimeSeriesKey(K k, long time, long seq) { http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java index 273c40a..2e173ab 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesKeySerde.java @@ -57,7 +57,10 @@ public class TimeSeriesKeySerde<K> implements Serde<TimeSeriesKey<K>> { long timestamp = timeSeriesKey.getTimestamp(); long seqNum = timeSeriesKey.getSeqNum(); - byte[] serializedKey = keySerde.toBytes(key); + byte[] serializedKey = null; + if (keySerde != null) { + serializedKey = keySerde.toBytes(key); + } int keySize = serializedKey == null ? 0 : serializedKey.length; // append the timestamp and sequence number to the serialized key bytes @@ -88,8 +91,9 @@ public class TimeSeriesKeySerde<K> implements Serde<TimeSeriesKey<K>> { long seqNum = buf.getLong(); long version = seqNum & ~SEQUENCE_NUM_MASK; - if (version != 0) { - throw new SamzaException("Version is not zero. Sequence number: " + seqNum); + if (version != TimeSeriesKey.VERSION) { + throw new SamzaException(String.format("Invalid version detected in TimeSeriesKey. " + + "Expected Version: %s Actual Version: %s Sequence number: %s", TimeSeriesKey.VERSION, version, seqNum)); } return new TimeSeriesKey(key, timeStamp, seqNum); } http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java index 56d839e..f3d6948 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java @@ -58,6 +58,34 @@ public interface TimeSeriesStore<K, V> { ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp); /** + * Returns upto {@code maxMessages} for the given key in the provided time-range - [{@code startTimestamp}, {@code endTimestamp}) + * + * The values in the returned list are ordered by their timestamp. Values with the same timestamp are returned in the order of insertion. + * If there are no values in the store for the key in the provided time-range, an empty list is returned. + * + * @param key the key to look up in the store + * @param startTimestamp the start timestamp of the range, inclusive + * @param endTimestamp the end timestamp of the range, exclusive + * @param maxMessages the maximum number of messages to return + * @return a list of values with upto {@code maxMessages} elements + */ + ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp, int maxMessages); + + /** + * Returns an iterator over values for the given key and timestamp + * + * Values returned by the iterator are in their insertion order. + * + * <p> The iterator <b>must</b> be closed after use by calling {@link #close}. Not doing so will result in memory leaks. + * + * @param key the key to look up in the store + * @param timestamp the timestamp to look up in the store + * @return an iterator over the values for the given key and timestamp that must be closed after use + * @throws IllegalArgumentException when the provided timestamp is negative + */ + ClosableIterator<TimestampedValue<V>> get(K key, long timestamp); + + /** * Removes all values for this key in the given time-range. * * @param key the key to look up in the store @@ -68,6 +96,14 @@ public interface TimeSeriesStore<K, V> { void remove(K key, long startTimestamp, long endTimeStamp); /** + * Removes all values for the given key and timestamp + * + * @param key the key to look up in the store + * @param timestamp the timestamp to look up in the store + */ + void remove(K key, long timestamp); + + /** * Flushes this time series store, if applicable. */ void flush(); http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java index 5e35219..ff7eee9 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java @@ -26,6 +26,8 @@ import org.apache.samza.storage.kv.KeyValueStore; import java.util.LinkedList; import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** @@ -124,6 +126,17 @@ public class TimeSeriesStoreImpl<K, V> implements TimeSeriesStore<K, V> { } @Override + public ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp, int maxValues) { + ClosableIterator<TimestampedValue<V>> iterator = get(key, startTimestamp, endTimestamp); + return new BoundedClosableIterator<>(iterator, maxValues); + } + + @Override + public ClosableIterator<TimestampedValue<V>> get(K key, long timestamp) { + return get(key, timestamp, timestamp + 1); + } + + @Override public void remove(K key, long startTimestamp, long endTimeStamp) { validateRange(startTimestamp, endTimeStamp); TimeSeriesKey<K> fromKey = new TimeSeriesKey(key, startTimestamp, 0); @@ -140,6 +153,11 @@ public class TimeSeriesStoreImpl<K, V> implements TimeSeriesStore<K, V> { } @Override + public void remove(K key, long timestamp) { + remove(key, timestamp, timestamp + 1); + } + + @Override public void flush() { kvStore.flush(); } @@ -192,4 +210,46 @@ public class TimeSeriesStoreImpl<K, V> implements TimeSeriesStore<K, V> { wrappedIterator.remove(); } } + + /** + * Wraps a {@link ClosableIterator} to only return the specified number of values + * + * @param <T> the type of values in the iterator + */ + private static class BoundedClosableIterator<T> implements ClosableIterator<T> { + + private final AtomicInteger currentCount = new AtomicInteger(0); + private final ClosableIterator<T> wrappedIterator; + private final int maxCount; + + public BoundedClosableIterator(ClosableIterator<T> wrappedIterator, int maxCount) { + this.wrappedIterator = wrappedIterator; + this.maxCount = maxCount; + } + + @Override + public boolean hasNext() { + return wrappedIterator.hasNext() && currentCount.get() < maxCount; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + currentCount.incrementAndGet(); + return wrappedIterator.next(); + } + + @Override + public void remove() { + wrappedIterator.remove(); + } + + @Override + public void close() { + wrappedIterator.close(); + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 75f1427..3c8879f 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -21,6 +21,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.functions.WatermarkFunction; +import org.apache.samza.operators.impl.store.TimeSeriesKeySerde; import org.apache.samza.operators.triggers.AnyTrigger; import org.apache.samza.operators.triggers.RepeatingTrigger; import org.apache.samza.operators.triggers.TimeBasedTrigger; @@ -28,10 +29,13 @@ import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.util.MathUtils; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.serializers.Serde; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -43,7 +47,7 @@ import java.util.stream.Collectors; * @param <WK> the type of key of the window * @param <WV> the type of aggregated value in the window output {@link WindowPane} */ -public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK, WV>> { +public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK, WV>> implements StatefulOperatorSpec { private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorSpec.class); private final WindowInternal<M, WK, WV> window; @@ -117,4 +121,17 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK FoldLeftFunction fn = window.getFoldLeftFunction(); return fn instanceof WatermarkFunction ? (WatermarkFunction) fn : null; } + + @Override + public Collection<StoreDescriptor> getStoreDescriptors() { + String storeName = getOpName(); + String storeFactory = "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"; + + Serde storeKeySerde = new TimeSeriesKeySerde<>(window.getKeySerde()); + Serde storeValSerde = window.getFoldLeftFunction() == null ? window.getMsgSerde() : window.getWindowValSerde(); + + StoreDescriptor descriptor = new StoreDescriptor(storeName, storeFactory, storeKeySerde, storeValSerde, storeName, + Collections.emptyMap()); + return Collections.singletonList(descriptor); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java index 91657ed..c40de7b 100644 --- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java @@ -54,7 +54,7 @@ public class PageViewCounterExample implements StreamApplication { Supplier<Integer> initialValue = () -> 0; FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1; pageViewEvents - .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn) + .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn, null, null) .setEarlyTrigger(Triggers.repeat(Triggers.count(5))) .setAccumulationMode(AccumulationMode.DISCARDING)) .map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane))) http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java index e9bb284..c403406 100644 --- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java @@ -50,7 +50,7 @@ public class RepartitionExample implements StreamApplication { pageViewEvents .partitionBy(pve -> pve.memberId, pve -> pve, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class))) - .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1)) + .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null)) .map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane))) .sendTo(pageViewEventPerMember); } http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/example/WindowExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java index 08c896c..9381e49 100644 --- a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java @@ -54,7 +54,7 @@ public class WindowExample implements StreamApplication { // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive // for 1 minute. inputStream - .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter) + .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter, null) .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceLastMessage(Duration.ofMinutes(1))))) .map(WindowPane::getMessage) .sendTo(outputStream); http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index f6441dc..d98cc72 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -178,11 +178,11 @@ public class TestExecutionPlanner { messageStream1.map(m -> m) .filter(m->true) - .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8))); + .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class))); messageStream2.map(m -> m) .filter(m->true) - .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16))); + .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class))); messageStream1 .join(messageStream2, mock(JoinFunction.class), http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index f23bb14..1426444 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -265,7 +265,7 @@ public class TestMessageStreamImpl { // should compile since TestMessageEnvelope (input for functions) is base class of TestInputMessageEnvelope (M) Window<TestInputMessageEnvelope, String, Integer> window = Windows - .keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator); + .keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator, null, mock(Serde.class)); MessageStream<WindowPane<String, Integer>> windowedStream = inputStream.window(window); ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java index e011121..aee457e 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java @@ -28,6 +28,8 @@ import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.operators.impl.store.TestInMemoryStore; +import org.apache.samza.operators.impl.store.TimeSeriesKeySerde; import org.apache.samza.operators.triggers.FiringType; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.Triggers; @@ -37,6 +39,7 @@ import org.apache.samza.operators.windows.Windows; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.Serde; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.StreamSpec; @@ -70,9 +73,13 @@ public class TestWindowOperator { config = mock(Config.class); taskContext = mock(TaskContextImpl.class); runner = mock(ApplicationRunner.class); + Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde()); + Serde storeValSerde = new IntegerEnvelopeSerde(); + when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("kafka", "integers", new Partition(0)))); when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + when(taskContext.getStore("window-3")).thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde)); when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka")); } @@ -397,7 +404,7 @@ public class TestWindowOperator { Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); inStream .map(m -> m) - .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger) + .window(Windows.keyedTumblingWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde()).setEarlyTrigger(earlyTrigger) .setAccumulationMode(mode)) .sink((message, messageCollector, taskCoordinator) -> { messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); @@ -427,7 +434,7 @@ public class TestWindowOperator { Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); inStream .map(m -> m) - .window(Windows.<IntegerEnvelope>tumblingWindow(duration).setEarlyTrigger(earlyTrigger) + .window(Windows.tumblingWindow(duration, new IntegerEnvelopeSerde()).setEarlyTrigger(earlyTrigger) .setAccumulationMode(mode)) .sink((message, messageCollector, taskCoordinator) -> { messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); @@ -455,7 +462,7 @@ public class TestWindowOperator { inStream .map(m -> m) - .window(Windows.keyedSessionWindow(keyFn, duration) + .window(Windows.keyedSessionWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde()) .setAccumulationMode(mode)) .sink((message, messageCollector, taskCoordinator) -> { messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); @@ -469,4 +476,18 @@ public class TestWindowOperator { super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, key); } } + + private class IntegerEnvelopeSerde implements Serde<IntegerEnvelope> { + private final IntegerSerde intSerde = new IntegerSerde(); + + @Override + public byte[] toBytes(IntegerEnvelope object) { + return intSerde.toBytes((Integer) object.getKey()); + } + + @Override + public IntegerEnvelope fromBytes(byte[] bytes) { + return new IntegerEnvelope(intSerde.fromBytes(bytes)); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java index 12a32b1..f1fb8e2 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.spec; +import org.apache.samza.serializers.Serde; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.internal.WindowInternal; @@ -28,6 +29,8 @@ import org.junit.Test; import java.time.Duration; +import static org.mockito.Mockito.mock; + public class TestWindowOperatorSpec { @Test public void testTriggerIntervalWithNestedTimeTriggers() { @@ -41,7 +44,8 @@ public class TestWindowOperatorSpec { Triggers.timeSinceFirstMessage(Duration.ofMillis(25)), Triggers.timeSinceLastMessage(Duration.ofMillis(15)))))); - WindowInternal window = new WindowInternal(defaultTrigger, null, null, null, null, WindowType.SESSION); + WindowInternal window = new WindowInternal(defaultTrigger, null, null, null, + null, WindowType.SESSION, null, null, mock(Serde.class)); window.setEarlyTrigger(earlyTrigger); window.setLateTrigger(lateTrigger); @@ -54,7 +58,8 @@ public class TestWindowOperatorSpec { Trigger defaultTrigger = Triggers.timeSinceFirstMessage(Duration.ofMillis(150)); Trigger earlyTrigger = Triggers.repeat(Triggers.count(5)); - WindowInternal window = new WindowInternal(defaultTrigger, null, null, null, null, WindowType.SESSION); + WindowInternal window = new WindowInternal(defaultTrigger, null, null, null, + null, WindowType.SESSION, null, null, mock(Serde.class)); window.setEarlyTrigger(earlyTrigger); WindowOperatorSpec spec = new WindowOperatorSpec(window, 0); http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java index a681767..697833b 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java @@ -19,15 +19,16 @@ package org.apache.samza.zk; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Rule; -import org.junit.rules.Timeout; import org.junit.Test; +import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + public class TestScheduleAfterDebounceTime { private static final Logger LOG = LoggerFactory.getLogger(TestScheduleAfterDebounceTime.class); http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java index 517d81f..e35dfb7 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java @@ -68,7 +68,7 @@ public class RepartitionJoinWindowApp implements StreamApplication { .partitionBy(UserPageAdClick::getUserId, upac -> upac, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class))) .map(KV::getValue) - .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, Duration.ofSeconds(3))) + .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, Duration.ofSeconds(3), new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class))) .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size()))) .sendTo(outputStream); } http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java index 974cafc..6410e7d 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java @@ -50,7 +50,8 @@ public class SessionWindowApp implements StreamApplication { pageViews .filter(m -> !FILTER_KEY.equals(m.getUserId())) - .window(Windows.keyedSessionWindow(PageView::getUserId, Duration.ofSeconds(3))) + .window(Windows.keyedSessionWindow(PageView::getUserId, Duration.ofSeconds(3), new StringSerde(), + new JsonSerdeV2<>(PageView.class))) .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size())) .sendTo(outputStream); } http://git-wip-us.apache.org/repos/asf/samza/blob/e94abca7/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java index 151c9d1..5d04f21 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java @@ -51,7 +51,7 @@ public class TumblingWindowApp implements StreamApplication { pageViews .filter(m -> !FILTER_KEY.equals(m.getUserId())) - .window(Windows.keyedTumblingWindow(PageView::getUserId, Duration.ofSeconds(3))) + .window(Windows.keyedTumblingWindow(PageView::getUserId, Duration.ofSeconds(3), null, null)) .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size())) .sendTo(outputStream); }