Specification of various Window and Trigger APIs in Samza - Defined APIs for specifying different types of windows - sessions, tumbling, global and keyed variants. - Defined APIs for specifying early and late triggers for a window. - Standardized all above Window types to be expressed as a combination of default, early and late triggers. - Defined classes for different types of trigger specifications. - Hide the WindowState class from programmers and move it from samza-api to samza-operator. We can choose to add it later if need be. - Removed some implementation classes in Window and Trigger. We can revisit them later when we implement Windows. - New API for specifying Time durations meaningfully in Samza. - Unit tests for most of the above changes. - Misc. Documentation, readability related changes to public APIs.
Author: vjagadish1989 <jvenk...@linkedin.com> Reviewers: Yi Pan <nickpa...@gmail.com>, Prateek Maheshwari <pmahe...@linkedin.com> Closes #30 from vjagadish/samza-operator-v3 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6dc33a85 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6dc33a85 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6dc33a85 Branch: refs/heads/master Commit: 6dc33a8504e3e2335cffe3317be1c0ccb5448458 Parents: f01b286 Author: vjagadish1989 <jvenk...@linkedin.com> Authored: Mon Jan 30 13:50:16 2017 -0800 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Mon Jan 30 13:50:16 2017 -0800 ---------------------------------------------------------------------- .../apache/samza/operators/MessageStream.java | 46 ++- .../samza/operators/triggers/AnyTrigger.java | 39 ++ .../samza/operators/triggers/CountTrigger.java | 38 ++ .../triggers/DurationCharacteristic.java | 26 ++ .../operators/triggers/RepeatingTrigger.java | 34 ++ .../triggers/TimeSinceFirstMessageTrigger.java | 46 +++ .../triggers/TimeSinceLastMessageTrigger.java | 44 +++ .../samza/operators/triggers/TimeTrigger.java | 44 +++ .../samza/operators/triggers/Trigger.java | 34 ++ .../samza/operators/triggers/Triggers.java | 108 ++++++ .../operators/windows/AccumulationMode.java | 34 ++ .../samza/operators/windows/SessionWindow.java | 102 ----- .../samza/operators/windows/StoreFunctions.java | 67 ---- .../apache/samza/operators/windows/Trigger.java | 94 ----- .../samza/operators/windows/TriggerBuilder.java | 320 ---------------- .../apache/samza/operators/windows/Window.java | 90 ++++- .../samza/operators/windows/WindowFn.java | 59 --- .../samza/operators/windows/WindowKey.java | 46 +++ .../samza/operators/windows/WindowOutput.java | 51 --- .../samza/operators/windows/WindowPane.java | 57 +++ .../samza/operators/windows/WindowState.java | 85 ----- .../apache/samza/operators/windows/Windows.java | 369 ++++++++++++++++--- .../windows/internal/WindowInternal.java | 110 ++++++ .../samza/operators/windows/TestTrigger.java | 68 ---- .../operators/windows/TestTriggerBuilder.java | 226 ------------ .../operators/windows/TestWindowOutput.java | 5 +- .../samza/operators/windows/TestWindows.java | 109 ------ .../org/apache/samza/task/AsyncRunLoop.java | 4 +- .../samza/operators/MessageStreamImpl.java | 26 +- .../apache/samza/operators/StateStoreImpl.java | 56 --- .../samza/operators/impl/OperatorImpls.java | 5 +- .../impl/SessionWindowOperatorImpl.java | 67 ---- .../operators/impl/WindowOperatorImpl.java | 40 ++ .../samza/operators/spec/OperatorSpecs.java | 31 +- .../operators/spec/PartialJoinOperatorSpec.java | 24 -- .../operators/spec/WindowOperatorSpec.java | 92 +---- .../samza/operators/spec/WindowState.java | 85 +++++ .../apache/samza/operators/BroadcastTask.java | 35 +- .../samza/operators/TestMessageStreamImpl.java | 18 - .../samza/operators/TestStateStoreImpl.java | 72 ---- .../org/apache/samza/operators/WindowTask.java | 17 +- .../samza/operators/impl/TestOperatorImpls.java | 11 +- .../operators/impl/TestSessionWindowImpl.java | 111 ------ .../samza/operators/spec/TestOperatorSpecs.java | 50 +-- 44 files changed, 1294 insertions(+), 1801 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java index d18536b..6a2f95b 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -26,8 +26,7 @@ import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.windows.Window; -import org.apache.samza.operators.windows.WindowOutput; -import org.apache.samza.operators.windows.WindowState; +import org.apache.samza.operators.windows.WindowPane; import java.util.Collection; @@ -46,8 +45,8 @@ public interface MessageStream<M extends MessageEnvelope> { * Applies the provided 1:1 {@link MapFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the * transformed {@link MessageStream}. * - * @param mapFn the function to transform a {@link MessageEnvelope} to another {@link MessageEnvelope} - * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream} + * @param mapFn the function to transform a {@link MessageEnvelope} to another {@link MessageEnvelope} + * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream} * @return the transformed {@link MessageStream} */ <TM extends MessageEnvelope> MessageStream<TM> map(MapFunction<M, TM> mapFn); @@ -56,8 +55,8 @@ public interface MessageStream<M extends MessageEnvelope> { * Applies the provided 1:n {@link FlatMapFunction} to transform a {@link MessageEnvelope} in this {@link MessageStream} * to n {@link MessageEnvelope}s in the transformed {@link MessageStream} * - * @param flatMapFn the function to transform a {@link MessageEnvelope} to zero or more {@link MessageEnvelope}s - * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream} + * @param flatMapFn the function to transform a {@link MessageEnvelope} to zero or more {@link MessageEnvelope}s + * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream} * @return the transformed {@link MessageStream} */ <TM extends MessageEnvelope> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn); @@ -69,7 +68,7 @@ public interface MessageStream<M extends MessageEnvelope> { * The {@link FilterFunction} is a predicate which determines whether a {@link MessageEnvelope} in this {@link MessageStream} * should be retained in the transformed {@link MessageStream}. * - * @param filterFn the predicate to filter {@link MessageEnvelope}s from this {@link MessageStream} + * @param filterFn the predicate to filter {@link MessageEnvelope}s from this {@link MessageStream} * @return the transformed {@link MessageStream} */ MessageStream<M> filter(FilterFunction<M> filterFn); @@ -78,38 +77,37 @@ public interface MessageStream<M extends MessageEnvelope> { * Allows sending {@link MessageEnvelope}s in this {@link MessageStream} to an output * {@link org.apache.samza.system.SystemStream} using the provided {@link SinkFunction}. * - * @param sinkFn the function to send {@link MessageEnvelope}s in this stream to output systems + * @param sinkFn the function to send {@link MessageEnvelope}s in this stream to output systems */ void sink(SinkFunction<M> sinkFn); /** - * Groups the {@link MessageEnvelope}s in this {@link MessageStream} according to the provided {@link Window} semantics + * Groups and processes the {@link MessageEnvelope}s in this {@link MessageStream} according to the provided {@link Window} * (e.g. tumbling, sliding or session windows) and returns the transformed {@link MessageStream} of - * {@link WindowOutput}s. + * {@link WindowPane}s. * <p> * Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows. * - * @param window the {@link Window} to group and process {@link MessageEnvelope}s from this {@link MessageStream} - * @param <WK> the type of key in the {@link WindowOutput} from the {@link Window} - * @param <WV> the type of value in the {@link WindowOutput} from the {@link Window} - * @param <WS> the type of window state kept in the {@link Window} - * @param <WM> the type of {@link WindowOutput} in the transformed {@link MessageStream} - * @return the transformed {@link MessageStream} + * @param window the window to group and process {@link MessageEnvelope}s from this {@link MessageStream} + * @param <K> the type of key in the {@link MessageEnvelope} in this {@link MessageStream}. If a key is specified, + * panes are emitted per-key. + * @param <WV> the type of value in the {@link WindowPane} in the transformed {@link MessageStream} + * @param <WM> the type of {@link WindowPane} in the transformed {@link MessageStream} + * @return the transformed {@link MessageStream} */ - <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window( - Window<M, WK, WV, WM> window); + <K, WV, WM extends WindowPane<K, WV>> MessageStream<WM> window(Window<M, K, WV, WM> window); /** * Joins this {@link MessageStream} with another {@link MessageStream} using the provided pairwise {@link JoinFunction}. * <p> * We currently only support 2-way joins. * - * @param otherStream the other {@link MessageStream} to be joined with - * @param joinFn the function to join {@link MessageEnvelope}s from this and the other {@link MessageStream} - * @param <K> the type of join key - * @param <OM> the type of {@link MessageEnvelope}s in the other stream - * @param <RM> the type of {@link MessageEnvelope}s resulting from the {@code joinFn} - * @return the joined {@link MessageStream} + * @param otherStream the other {@link MessageStream} to be joined with + * @param joinFn the function to join {@link MessageEnvelope}s from this and the other {@link MessageStream} + * @param <K> the type of join key + * @param <OM> the type of {@link MessageEnvelope}s in the other stream + * @param <RM> the type of {@link MessageEnvelope}s resulting from the {@code joinFn} + * @return the joined {@link MessageStream} */ <K, OM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<M, OM, RM> joinFn); http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java new file mode 100644 index 0000000..3ca4e9a --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java @@ -0,0 +1,39 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.data.MessageEnvelope; +import java.util.List; + +/** + * A {@link Trigger} fires as soon as any of its individual triggers has fired. + */ +public class AnyTrigger<M extends MessageEnvelope> implements Trigger { + + private final List<Trigger> triggers; + + AnyTrigger(List<Trigger> triggers) { + this.triggers = triggers; + } + + public List<Trigger> getTriggers() { + return triggers; + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java new file mode 100644 index 0000000..ba14928 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.data.MessageEnvelope; + +/** + * A {@link Trigger} that fires when the number of messages in the {@link org.apache.samza.operators.windows.WindowPane} + * reaches the specified count. + */ +public class CountTrigger<M extends MessageEnvelope> implements Trigger { + + private final long count; + + CountTrigger(long count) { + this.count = count; + } + + public long getCount() { + return count; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/DurationCharacteristic.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/DurationCharacteristic.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/DurationCharacteristic.java new file mode 100644 index 0000000..36dab72 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/DurationCharacteristic.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +/** + * Indicates whether the associated time duration is in event time or processing time. + */ +public enum DurationCharacteristic { + PROCESSING_TIME, EVENT_TIME +} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java new file mode 100644 index 0000000..ae9564d --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.data.MessageEnvelope; + +/** + * A {@link Trigger} that repeats its underlying trigger forever. + */ +class RepeatingTrigger<M extends MessageEnvelope> implements Trigger<M> { + + private final Trigger<M> trigger; + + RepeatingTrigger(Trigger<M> trigger) { + this.trigger = trigger; + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java new file mode 100644 index 0000000..13fc3cd --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.data.MessageEnvelope; + +import java.time.Duration; + +/* + * A {@link Trigger} that fires after the specified duration has passed since the first {@link MessageEnvelope} in + * the window pane. + */ +public class TimeSinceFirstMessageTrigger<M extends MessageEnvelope> implements Trigger { + + private final Duration duration; + private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; + + TimeSinceFirstMessageTrigger(Duration duration) { + this.duration = duration; + } + + public Duration getDuration() { + return duration; + } + + public DurationCharacteristic getCharacteristic() { + return characteristic; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java new file mode 100644 index 0000000..0150d86 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.data.MessageEnvelope; + +import java.time.Duration; + +/* + * A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s in the window pane for the specified duration. + */ +public class TimeSinceLastMessageTrigger<M extends MessageEnvelope> implements Trigger { + + private final Duration duration; + private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; + + TimeSinceLastMessageTrigger(Duration duration) { + this.duration = duration; + } + + public Duration getDuration() { + return duration; + } + + public DurationCharacteristic getCharacteristic() { + return characteristic; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java new file mode 100644 index 0000000..ed7fef7 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.data.MessageEnvelope; + +import java.time.Duration; + +/* + * A {@link Trigger} that fires after the specified duration in processing time. + */ +public class TimeTrigger<M extends MessageEnvelope> implements Trigger { + + private final Duration duration; + private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; + + public TimeTrigger(Duration duration) { + this.duration = duration; + } + + public Duration getDuration() { + return duration; + } + + public DurationCharacteristic getCharacteristic() { + return characteristic; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java new file mode 100644 index 0000000..6dc4f43 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + + +import org.apache.samza.operators.data.MessageEnvelope; + +/** + * Marker interface for all triggers. The firing of a trigger indicates the completion of a window pane. + * + * <p> Use the {@link Triggers} APIs to create a {@link Trigger}. + * + * @param <M> the type of the incoming {@link MessageEnvelope} + */ +public interface Trigger<M extends MessageEnvelope> { + +} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java new file mode 100644 index 0000000..f27cfd8 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.data.MessageEnvelope; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * API for creating {@link Trigger} instances to be used with a {@link org.apache.samza.operators.windows.Window}. + * + * <p> The below example groups an input into tumbling windows of 10s and emits early results periodically every 4s in + * processing time, or for every 50 messages. It also specifies that window results are accumulating. + * + * <pre> {@code + * MessageStream<> windowedStream = stream.window(Windows.tumblingWindow(Duration.of(10, TimeUnit.SECONDS)) + * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.of(4, TimeUnit.SECONDS)))))) + * .accumulateFiredPanes()); + * }</pre> + * + * @param <M> the type of input {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream} + */ +@InterfaceStability.Unstable +public final class Triggers<M extends MessageEnvelope> { + + private Triggers() { } + + /** + * Creates a {@link Trigger} that fires when the number of {@link MessageEnvelope}s in the pane + * reaches the specified count. + * + * @param count the number of {@link MessageEnvelope}s to fire the trigger after + * @return the created trigger + */ + public static Trigger count(long count) { + return new CountTrigger(count); + } + + /** + * Creates a trigger that fires after the specified duration has passed since the first {@link MessageEnvelope} in + * the pane. + * + * @param duration the duration since the first element + * @return the created trigger + */ + public static Trigger timeSinceFirstMessage(Duration duration) { + return new TimeSinceFirstMessageTrigger(duration); + } + + /** + * Creates a trigger that fires when there is no new {@link MessageEnvelope} for the specified duration in the pane. + * + * @param duration the duration since the last element + * @return the created trigger + */ + public static Trigger timeSinceLastMessage(Duration duration) { + return new TimeSinceLastMessageTrigger(duration); + } + + /** + * Creates a trigger that fires when any of the provided triggers fire. + * + * @param <M> the type of input {@link MessageEnvelope} in the window + * @param triggers the individual triggers + * @return the created trigger + */ + public static <M extends MessageEnvelope> Trigger any(Trigger<M>... triggers) { + List<Trigger> triggerList = new ArrayList<>(); + for (Trigger trigger : triggers) { + triggerList.add(trigger); + } + return new AnyTrigger(Collections.unmodifiableList(triggerList)); + } + + /** + * Repeats the provided trigger forever. + * + * <p>Creating a {@link RepeatingTrigger} from an {@link AnyTrigger} is equivalent to creating an {@link AnyTrigger} from + * its individual {@link RepeatingTrigger}s. + * + * @param <M> the type of input {@link MessageEnvelope} in the window + * @param trigger the individual trigger to repeat + * @return the created trigger + */ + public static <M extends MessageEnvelope> Trigger repeat(Trigger<M> trigger) { + return new RepeatingTrigger<>(trigger); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/AccumulationMode.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/AccumulationMode.java b/samza-api/src/main/java/org/apache/samza/operators/windows/AccumulationMode.java new file mode 100644 index 0000000..f8c435c --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/AccumulationMode.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +/** + * Specifies how a {@link Window} should process its previously emitted {@link WindowPane}s. + * + * <p> There are two types of {@link AccumulationMode}s: + * <ul> + * <li> ACCUMULATING: Specifies that window panes should include all messages collected for the window (key) so far, even if they were + * included in previously emitted window panes. + * <li> DISCARDING: Specifies that window panes should only include messages collected for this window (key) since the last emitted + * window pane. + * </ul> + */ +public enum AccumulationMode { + ACCUMULATING, DISCARDING +} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java b/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java deleted file mode 100644 index 287025c..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.windows; - -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.storage.kv.Entry; - -import java.util.function.BiFunction; -import java.util.function.Function; - -/** - * This class defines a session window function class - * - * @param <M> the type of input {@link MessageEnvelope} - * @param <WK> the type of session key in the session window - * @param <WV> the type of output value in each session window - */ -public class SessionWindow<M extends MessageEnvelope, WK, WV> implements Window<M, WK, WV, WindowOutput<WK, WV>> { - - /** - * Constructor. Made private s.t. it can only be instantiated via the static API methods in {@link Windows} - * - * @param sessionKeyFunction function to get the session key from the input {@link MessageEnvelope} - * @param aggregator function to calculate the output value based on the input {@link MessageEnvelope} and current output value - */ - SessionWindow(Function<M, WK> sessionKeyFunction, BiFunction<M, WV, WV> aggregator) { - this.wndKeyFunction = sessionKeyFunction; - this.aggregator = aggregator; - } - - /** - * function to calculate the window key from input {@link MessageEnvelope} - */ - private final Function<M, WK> wndKeyFunction; - - /** - * function to calculate the output value from the input {@link MessageEnvelope} and the current output value - */ - private final BiFunction<M, WV, WV> aggregator; - - /** - * trigger condition that determines when to send the {@link WindowOutput} - */ - private Trigger<M, WindowState<WV>> trigger = null; - - //TODO: need to create a set of {@link StoreFunctions} that is default to input {@link MessageEnvelope} type for {@link Window} - private StoreFunctions<M, WK, WindowState<WV>> storeFunctions = null; - - /** - * Public API methods start here - */ - - /** - * Public API method to define the watermark trigger for the window operator - * - * @param wndTrigger {@link Trigger} function defines the watermark trigger for this {@link SessionWindow} - * @return The window operator w/ the defined watermark trigger - */ - @Override - public Window<M, WK, WV, WindowOutput<WK, WV>> setTriggers(TriggerBuilder<M, WV> wndTrigger) { - this.trigger = wndTrigger.build(); - return this; - } - - private BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() { - // TODO: actual implementation of the main session window logic, based on the wndKeyFunction, aggregator, and triggers; - return null; - } - - public WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> getInternalWindowFn() { - return new WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>>() { - - @Override public BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFn() { - return SessionWindow.this.getTransformFunc(); - } - - @Override public StoreFunctions<M, WK, WindowState<WV>> getStoreFns() { - return SessionWindow.this.storeFunctions; - } - - @Override public Trigger<M, WindowState<WV>> getTrigger() { - return SessionWindow.this.trigger; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java b/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java deleted file mode 100644 index 0d40761..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.windows; - -import org.apache.samza.operators.data.MessageEnvelope; - -import java.util.function.BiFunction; -import java.util.function.Function; - -/** - * The store functions that are used by window and partial join operators to store and retrieve buffered {@link MessageEnvelope}s - * and partial aggregation results. - * - * @param <SK> the type of key used to store the operator state - * @param <SS> the type of operator state. E.g. could be the partial aggregation result for a window, or a buffered - * input {@link MessageEnvelope} from the join stream for a join - */ -public class StoreFunctions<M extends MessageEnvelope, SK, SS> { - /** - * Function that returns the key to query in the operator state store for a particular {@link MessageEnvelope}. - * This 1:1 function only returns a single key for the incoming {@link MessageEnvelope}. This is sufficient to support - * non-overlapping windows and unique-key based joins. - * - * TODO: for windows that overlaps (i.e. sliding windows and hopping windows) and non-unique-key-based join, - * the query to the state store is usually a range scan. We need to add a rangeKeyFinder function - * (or make this function return a collection) to map from a single input {@link MessageEnvelope} to a range of keys in the store. - */ - private final Function<M, SK> storeKeyFn; - - /** - * Function to update the store entry based on the current operator state and the incoming {@link MessageEnvelope}. - * - * TODO: this is assuming a 1:1 mapping from the input {@link MessageEnvelope} to the store entry. When implementing sliding/hopping - * windows and non-unique-key-based join, we may need to include the corresponding state key in addition to the - * state value. Alternatively this can be called once for each store key for the {@link MessageEnvelope}. - */ - private final BiFunction<M, SS, SS> stateUpdaterFn; - - public StoreFunctions(Function<M, SK> storeKeyFn, BiFunction<M, SS, SS> stateUpdaterFn) { - this.storeKeyFn = storeKeyFn; - this.stateUpdaterFn = stateUpdaterFn; - } - - public Function<M, SK> getStoreKeyFn() { - return this.storeKeyFn; - } - - public BiFunction<M, SS, SS> getStateUpdaterFn() { - return this.stateUpdaterFn; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java deleted file mode 100644 index c8b0edb..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.windows; - -import org.apache.samza.operators.data.MessageEnvelope; - -import java.util.function.BiFunction; -import java.util.function.Function; - -/** - * Defines the trigger functions for the window operator. This class is immutable. - * - * @param <M> the type of {@link MessageEnvelope} in the input stream - * @param <S> the type of state variable in the window's state store - */ -public class Trigger<M extends MessageEnvelope, S extends WindowState> { - - /** - * System timer based trigger condition. This is the only guarantee that the window operator will proceed forward - */ - private final Function<S, Boolean> timerTrigger; - - /** - * early trigger condition that determines when to send the first output from the window operator - */ - private final BiFunction<M, S, Boolean> earlyTrigger; - - /** - * late trigger condition that determines when to send the updated output after the first one from a window operator - */ - private final BiFunction<M, S, Boolean> lateTrigger; - - /** - * the function to updated the window state when the first output is triggered - */ - private final Function<S, S> earlyTriggerUpdater; - - /** - * the function to updated the window state when the late output is triggered - */ - private final Function<S, S> lateTriggerUpdater; - - /** - * Private constructor to prevent instantiation - * - * @param timerTrigger system timer trigger condition - * @param earlyTrigger early trigger condition - * @param lateTrigger late trigger condition - * @param earlyTriggerUpdater early trigger state updater - * @param lateTriggerUpdater late trigger state updater - */ - private Trigger(Function<S, Boolean> timerTrigger, BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, - Function<S, S> earlyTriggerUpdater, Function<S, S> lateTriggerUpdater) { - this.timerTrigger = timerTrigger; - this.earlyTrigger = earlyTrigger; - this.lateTrigger = lateTrigger; - this.earlyTriggerUpdater = earlyTriggerUpdater; - this.lateTriggerUpdater = lateTriggerUpdater; - } - - /** - * Static method to create a {@link Trigger} object - * - * @param timerTrigger system timer trigger condition - * @param earlyTrigger early trigger condition - * @param lateTrigger late trigger condition - * @param earlyTriggerUpdater early trigger state updater - * @param lateTriggerUpdater late trigger state updater - * @param <M> the type of input {@link MessageEnvelope} - * @param <S> the type of window state extends {@link WindowState} - * @return the {@link Trigger} function - */ - public static <M extends MessageEnvelope, S extends WindowState> Trigger<M, S> createTrigger(Function<S, Boolean> timerTrigger, - BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, Function<S, S> earlyTriggerUpdater, - Function<S, S> lateTriggerUpdater) { - return new Trigger(timerTrigger, earlyTrigger, lateTrigger, earlyTriggerUpdater, lateTriggerUpdater); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java deleted file mode 100644 index 6336a50..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.windows; - - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.MessageEnvelope; - -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; -import java.util.function.Function; - - -/** - * This class defines a builder of {@link Trigger} object for a {@link Window}. The triggers are categorized into - * three types: - * - * <p> - * early trigger: defines the condition when the first output from the window function is sent. - * late trigger: defines the condition when the updated output after the first output is sent. - * timer trigger: defines a system timeout condition to trigger output if no more inputs are received to enable early/late triggers - * </p> - * - * If multiple conditions are defined for a specific type of trigger, the aggregated trigger is the disjunction - * of each individual trigger (i.e. OR). - * - * @param <M> the type of input {@link MessageEnvelope} to the {@link Window} - * @param <V> the type of output value from the {@link Window} - */ -@InterfaceStability.Unstable -public final class TriggerBuilder<M extends MessageEnvelope, V> { - - /** - * Predicate helper to OR multiple trigger conditions - */ - static class PredicateHelper { - static <M, S> BiFunction<M, S, Boolean> or(BiFunction<M, S, Boolean> lhs, BiFunction<M, S, Boolean> rhs) { - return (m, s) -> lhs.apply(m, s) || rhs.apply(m, s); - } - - static <S> Function<S, Boolean> or(Function<S, Boolean> lhs, Function<S, Boolean> rhs) { - return s -> lhs.apply(s) || rhs.apply(s); - } - } - - /** - * The early trigger condition that determines the first output from the {@link Window} - */ - private BiFunction<M, WindowState<V>, Boolean> earlyTrigger = null; - - /** - * The late trigger condition that determines the late output(s) from the {@link Window} - */ - private BiFunction<M, WindowState<V>, Boolean> lateTrigger = null; - - /** - * The system timer based trigger conditions that guarantees the {@link Window} proceeds forward - */ - private Function<WindowState<V>, Boolean> timerTrigger = null; - - /** - * The state updater function to be applied after the first output is triggered - */ - private Function<WindowState<V>, WindowState<V>> earlyTriggerUpdater = Function.identity(); - - /** - * The state updater function to be applied after the late output is triggered - */ - private Function<WindowState<V>, WindowState<V>> lateTriggerUpdater = Function.identity(); - - /** - * Helper method to add a trigger condition - * - * @param currentTrigger current trigger condition - * @param newTrigger new trigger condition - * @return combined trigger condition that is {@code currentTrigger} OR {@code newTrigger} - */ - private BiFunction<M, WindowState<V>, Boolean> addTrigger(BiFunction<M, WindowState<V>, Boolean> currentTrigger, - BiFunction<M, WindowState<V>, Boolean> newTrigger) { - if (currentTrigger == null) { - return newTrigger; - } - - return PredicateHelper.or(currentTrigger, newTrigger); - } - - /** - * Helper method to add a system timer trigger - * - * @param currentTimer current timer condition - * @param newTimer new timer condition - * @return combined timer condition that is {@code currentTimer} OR {@code newTimer} - */ - private Function<WindowState<V>, Boolean> addTimerTrigger(Function<WindowState<V>, Boolean> currentTimer, - Function<WindowState<V>, Boolean> newTimer) { - if (currentTimer == null) { - return newTimer; - } - - return PredicateHelper.or(currentTimer, newTimer); - } - - /** - * default constructor to prevent instantiation - */ - private TriggerBuilder() {} - - /** - * Constructor that set the size limit as the early trigger for a window - * - * @param sizeLimit the number of {@link MessageEnvelope}s in a window that would trigger the first output - */ - private TriggerBuilder(long sizeLimit) { - this.earlyTrigger = (m, s) -> s.getNumberMessages() > sizeLimit; - } - - /** - * Constructor that set the event time length as the early trigger - * - * @param eventTimeFunction the function that calculate the event time in nano-second from the input {@link MessageEnvelope} - * @param wndLenMs the window length in event time in milli-second - */ - private TriggerBuilder(Function<M, Long> eventTimeFunction, long wndLenMs) { - this.earlyTrigger = (m, s) -> - TimeUnit.NANOSECONDS.toMillis(Math.max(s.getLatestEventTimeNs() - s.getEarliestEventTimeNs(), - eventTimeFunction.apply(m) - s.getEarliestEventTimeNs())) > wndLenMs; - } - - /** - * Constructor that set the special token {@link MessageEnvelope} as the early trigger - * - * @param tokenFunc the function that checks whether an input {@link MessageEnvelope} is a token {@link MessageEnvelope} that triggers window output - */ - private TriggerBuilder(Function<M, Boolean> tokenFunc) { - this.earlyTrigger = (m, s) -> tokenFunc.apply(m); - } - - /** - * Build method that creates an {@link Trigger} object based on the trigger conditions set in {@link TriggerBuilder} - * This is kept package private and only used by {@link Windows} to convert the mutable {@link TriggerBuilder} object to an immutable {@link Trigger} object - * - * @return the final {@link Trigger} object - */ - Trigger<M, WindowState<V>> build() { - return Trigger.createTrigger(this.timerTrigger, this.earlyTrigger, this.lateTrigger, this.earlyTriggerUpdater, this.lateTriggerUpdater); - } - - /** - * Public API methods start here - */ - - - /** - * API method to allow users to set an update method to update the output value after the first window output is triggered - * by the early trigger condition - * - * @param onTriggerFunc the method to update the output value after the early trigger - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> onEarlyTrigger(Function<V, V> onTriggerFunc) { - this.earlyTriggerUpdater = s -> { - s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); - return s; - }; - return this; - } - - /** - * API method to allow users to set an update method to update the output value after a late window output is triggered - * by the late trigger condition - * - * @param onTriggerFunc the method to update the output value after the late trigger - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> onLateTrigger(Function<V, V> onTriggerFunc) { - this.lateTriggerUpdater = s -> { - s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); - return s; - }; - return this; - } - - /** - * API method to allow users to add a system timer trigger based on timeout after the last {@link MessageEnvelope} received in the window - * - * @param timeoutMs the timeout in ms after the last {@link MessageEnvelope} received in the window - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> addTimeoutSinceLastMessage(long timeoutMs) { - this.timerTrigger = this.addTimerTrigger(this.timerTrigger, - s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + timeoutMs < System.currentTimeMillis()); - return this; - } - - /** - * API method to allow users to add a system timer trigger based on the timeout after the first {@link MessageEnvelope} received in the window - * - * @param timeoutMs the timeout in ms after the first {@link MessageEnvelope} received in the window - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> addTimeoutSinceFirstMessage(long timeoutMs) { - this.timerTrigger = this.addTimerTrigger(this.timerTrigger, s -> - TimeUnit.NANOSECONDS.toMillis(s.getFirstMessageTimeNs()) + timeoutMs < System.currentTimeMillis()); - return this; - } - - /** - * API method allow users to add a late trigger based on the window size limit - * - * @param sizeLimit limit on the number of {@link MessageEnvelope}s in window - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> addLateTriggerOnSizeLimit(long sizeLimit) { - this.lateTrigger = this.addTrigger(this.lateTrigger, (m, s) -> s.getNumberMessages() > sizeLimit); - return this; - } - - /** - * API method to allow users to define a customized late trigger function based on input {@link MessageEnvelope} and the window state - * - * @param lateTrigger the late trigger condition based on input {@link MessageEnvelope} and the current {@link WindowState} - * @return the {@link TriggerBuilder} object - */ - public TriggerBuilder<M, V> addLateTrigger(BiFunction<M, WindowState<V>, Boolean> lateTrigger) { - this.lateTrigger = this.addTrigger(this.lateTrigger, lateTrigger); - return this; - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on window size limit - * - * @param sizeLimit window size limit - * @param <M> the type of input {@link MessageEnvelope} - * @param <V> the type of {@link Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> earlyTriggerWhenExceedWndLen(long sizeLimit) { - return new TriggerBuilder<M, V>(sizeLimit); - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on event time window - * - * - * @param eventTimeFunc the function to get the event time from the input {@link MessageEnvelope} - * @param eventTimeWndSizeMs the event time window size in Ms - * @param <M> the type of input {@link MessageEnvelope} - * @param <V> the type of {@link Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> earlyTriggerOnEventTime(Function<M, Long> eventTimeFunc, long eventTimeWndSizeMs) { - return new TriggerBuilder<M, V>(eventTimeFunc, eventTimeWndSizeMs); - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on token {@link MessageEnvelope}s - * - * @param tokenFunc the function to determine whether an input {@link MessageEnvelope} is a window token or not - * @param <M> the type of input {@link MessageEnvelope} - * @param <V> the type of {@link Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> earlyTriggerOnTokenMsg(Function<M, Boolean> tokenFunc) { - return new TriggerBuilder<M, V>(tokenFunc); - } - - /** - * Static API method to allow customized early trigger condition based on input {@link MessageEnvelope} and the corresponding {@link WindowState} - * - * @param earlyTrigger the user defined early trigger condition - * @param <M> the input {@link MessageEnvelope} type - * @param <V> the output value from the window - * @return the {@link TriggerBuilder} object - */ - public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> earlyTrigger(BiFunction<M, WindowState<V>, Boolean> earlyTrigger) { - TriggerBuilder<M, V> newTriggers = new TriggerBuilder<M, V>(); - newTriggers.earlyTrigger = newTriggers.addTrigger(newTriggers.earlyTrigger, earlyTrigger); - return newTriggers; - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ system timeout after the last {@link MessageEnvelope} received in the window - * - * @param timeoutMs timeout in ms after the last {@link MessageEnvelope} received - * @param <M> the type of input {@link MessageEnvelope} - * @param <V> the type of {@link Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> timeoutSinceLastMessage(long timeoutMs) { - return new TriggerBuilder<M, V>().addTimeoutSinceLastMessage(timeoutMs); - } - - /** - * Static API method to create a {@link TriggerBuilder} w/ system timeout after the first {@link MessageEnvelope} received in the window - * - * @param timeoutMs timeout in ms after the first {@link MessageEnvelope} received - * @param <M> the type of input {@link MessageEnvelope} - * @param <V> the type of {@link Window} output value - * @return the {@link TriggerBuilder} object - */ - public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> timeoutSinceFirstMessage(long timeoutMs) { - return new TriggerBuilder<M, V>().addTimeoutSinceFirstMessage(timeoutMs); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java index 56a307d..6aae940 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java @@ -18,32 +18,94 @@ */ package org.apache.samza.operators.windows; +import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.triggers.Trigger; /** - * The public programming interface class for window function + * Groups incoming {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream} into finite + * windows for processing. * - * @param <M> the type of input {@link MessageEnvelope} - * @param <WK> the type of key to the {@link Window} - * @param <WV> the type of output value in the {@link WindowOutput} - * @param <WM> the type of {@link MessageEnvelope} in the window output stream + * <p> A window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s + * that determine when results from the {@link Window} are emitted. + * + * <p> Each emitted result contains one or more {@link MessageEnvelope}s in the window and is called a {@link WindowPane}. + * A pane can include all {@link MessageEnvelope}s collected for the window so far or only the new {@link MessageEnvelope}s + * since the last emitted pane. (as determined by the {@link AccumulationMode}) + * + * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window + * has arrived or late triggers that allow handling of late data arrivals. + * + * <p> A {@link Window} is defined as "keyed" when the incoming {@link org.apache.samza.operators.MessageStream} is first + * partitioned based on the provided key, and windowing is applied on the partitioned stream. + * + * window wk1 (with its triggers) + * +--------------------------------+ + * ------------+--------+-----------+ + * | | | | + * | pane 1 |pane2 | pane3 | + * +-----------+--------+-----------+ + * + ----------------------------------- + *incoming message stream ------+ + ----------------------------------- + * window wk2 + * +---------------------+---------+ + * | pane 1| pane 2 | pane 3 | + * | | | | + * +---------+-----------+---------+ + * + * window wk3 + * +----------+-----------+---------+ + * | | | | + * | pane 1 | pane 2 | pane 3| + * | | | | + * +----------+-----------+---------+ + * + * + * <p> Use the {@link Windows} APIs to create various windows and the {@link org.apache.samza.operators.triggers.Triggers} + * APIs to create triggers. + * + * @param <M> the type of the input {@link MessageEnvelope} + * @param <K> the type of the key in the {@link MessageEnvelope} in this {@link org.apache.samza.operators.MessageStream}. + * @param <WV> the type of the value in the {@link WindowPane}. + * @param <WM> the type of the output. */ -public interface Window<M extends MessageEnvelope, WK, WV, WM extends WindowOutput<WK, WV>> { +@InterfaceStability.Unstable +public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<K, WV>> { /** - * Set the triggers for this {@link Window} + * Set the early triggers for this {@link Window}. + * <p>Use the {@link org.apache.samza.operators.triggers.Triggers} APIs to create instances of {@link Trigger} * - * @param wndTrigger trigger conditions set by the programmers - * @return the {@link Window} function w/ the trigger {@code wndTrigger} + * @param trigger the early trigger + * @return the {@link Window} function with the early trigger */ - Window<M, WK, WV, WM> setTriggers(TriggerBuilder<M, WV> wndTrigger); + Window<M, K, WV, WM> setEarlyTrigger(Trigger<M> trigger); /** - * Internal implementation helper to get the functions associated with this Window. + * Set the late triggers for this {@link Window}. + * <p>Use the {@link org.apache.samza.operators.triggers.Triggers} APIs to create instances of {@link Trigger} * - * <b>NOTE:</b> This is purely an internal API and should not be used directly by users. + * @param trigger the late trigger + * @return the {@link Window} function with the late trigger + */ + Window<M, K, WV, WM> setLateTrigger(Trigger<M> trigger); + + /** + * Specify how a {@link Window} should process its previously emitted {@link WindowPane}s. + * + * <p> There are two types of {@link AccumulationMode}s: + * <ul> + * <li> ACCUMULATING: Specifies that window panes should include all messages collected for the window (key) so far, even if they were + * included in previously emitted window panes. + * <li> DISCARDING: Specifies that window panes should only include messages collected for this window (key) since the last emitted + * window pane. + * </ul> * - * @return the functions associated with this Window. + * @param mode the accumulation mode + * @return the {@link Window} function with the specified {@link AccumulationMode}. */ - WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> getInternalWindowFn(); + Window<M, K, WV, WM> setAccumulationMode(AccumulationMode mode); + } http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java deleted file mode 100644 index 8878bf9..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.windows; - -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.storage.kv.Entry; - -import java.util.function.BiFunction; - - -/** - * Defines an internal representation of a window function. - * - * @param <M> type of the input {@link MessageEnvelope} for the window - * @param <WK> type of the window key in the output {@link MessageEnvelope} - * @param <WS> type of the {@link WindowState} in the state store - * @param <WM> type of the {@link MessageEnvelope} in the output stream - */ -public interface WindowFn<M extends MessageEnvelope, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> { - - /** - * Get the transformation function of the {@link WindowFn}. - * - * @return the transformation function which takes a {@link MessageEnvelope} of type {@code M} and its window state entry, - * and transforms it to an {@link WindowOutput} - */ - BiFunction<M, Entry<WK, WS>, WM> getTransformFn(); - - /** - * Get the state store functions for this {@link WindowFn}. - * - * @return the state store functions - */ - StoreFunctions<M, WK, WS> getStoreFns(); - - /** - * Get the trigger conditions for this {@link WindowFn}. - * - * @return the trigger condition for this {@link WindowFn} - */ - Trigger<M, WS> getTrigger(); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java new file mode 100644 index 0000000..7edf3e1 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +/** + * Key for a {@link WindowPane} emitted from a {@link Window}. + * + * @param <K> the type of the key in the incoming {@link org.apache.samza.operators.data.MessageEnvelope}. + * Windows that are not keyed have a {@link Void} key type. + * + */ +public class WindowKey<K> { + + private final K key; + + private final String windowId; + + public WindowKey(K key, String windowId) { + this.key = key; + this.windowId = windowId; + } + + public K getKey() { + return key; + } + + public String getWindowId() { + return windowId; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java deleted file mode 100644 index 63e34c8..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.windows; - -import org.apache.samza.operators.data.MessageEnvelope; - - -/** - * The type of output {@link MessageEnvelope}s in a window operator output stream. - * - * @param <K> the type of key in the window output - * @param <M> the type of value in the window output - */ -public final class WindowOutput<K, M> implements MessageEnvelope<K, M> { - private final K key; - private final M value; - - WindowOutput(K key, M value) { - this.key = key; - this.value = value; - } - - @Override public M getMessage() { - return this.value; - } - - @Override public K getKey() { - return this.key; - } - - static public <K, M> WindowOutput<K, M> of(K key, M result) { - return new WindowOutput<>(key, result); - } -} - http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java new file mode 100644 index 0000000..0388048 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.apache.samza.operators.data.MessageEnvelope; + + +/** + * Specifies the result emitted from a {@link Window}. + * + * @param <K> the type of key in the window pane + * @param <V> the type of value in the window pane. + */ +public final class WindowPane<K, V> implements MessageEnvelope<WindowKey<K>, V> { + + private final WindowKey<K> key; + + private final V value; + + private final AccumulationMode mode; + + WindowPane(WindowKey<K> key, V value, AccumulationMode mode) { + this.key = key; + this.value = value; + this.mode = mode; + } + + @Override public V getMessage() { + return this.value; + } + + @Override public WindowKey<K> getKey() { + return this.key; + } + + static public <K, M> WindowPane<K, M> of(WindowKey<K> key, M result) { + return new WindowPane<>(key, result, AccumulationMode.DISCARDING); + } +} + + http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java deleted file mode 100644 index 835d749..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.windows; - -import org.apache.samza.annotation.InterfaceStability; - - -/** - * This interface defines the methods a window state class has to implement. The programmers are allowed to implement - * customized window state to be stored in window state stores by implementing this interface class. - * - * @param <WV> the type for window output value - */ -@InterfaceStability.Unstable -public interface WindowState<WV> { - /** - * Method to get the system time when the first {@link org.apache.samza.operators.data.MessageEnvelope} - * in the window is received - * - * @return nano-second of system time for the first {@link org.apache.samza.operators.data.MessageEnvelope} - * received in the window - */ - long getFirstMessageTimeNs(); - - /** - * Method to get the system time when the last {@link org.apache.samza.operators.data.MessageEnvelope} - * in the window is received - * - * @return nano-second of system time for the last {@link org.apache.samza.operators.data.MessageEnvelope} - * received in the window - */ - long getLastMessageTimeNs(); - - /** - * Method to get the earliest event time in the window - * - * @return the earliest event time in nano-second in the window - */ - long getEarliestEventTimeNs(); - - /** - * Method to get the latest event time in the window - * - * @return the latest event time in nano-second in the window - */ - long getLatestEventTimeNs(); - - /** - * Method to get the total number of {@link org.apache.samza.operators.data.MessageEnvelope}s received in the window - * - * @return number of {@link org.apache.samza.operators.data.MessageEnvelope}s in the window - */ - long getNumberMessages(); - - /** - * Method to get the corresponding window's output value - * - * @return the corresponding window's output value - */ - WV getOutputValue(); - - /** - * Method to set the corresponding window's output value - * - * @param value the corresponding window's output value - */ - void setOutputValue(WV value); - -}