[FLINK-2677] Add a general-purpose keyed-window operator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd51c977 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd51c977 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd51c977 Branch: refs/heads/master Commit: dd51c97741b336d3a11e319183537eef864a84fd Parents: 3be2dc1 Author: Aljoscha Krettek <[email protected]> Authored: Fri Sep 25 12:27:35 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Sep 28 17:09:25 2015 +0200 ---------------------------------------------------------------------- .../api/datastream/KeyedDataStream.java | 19 + .../KeyedTriggerWindowDataStream.java | 255 +++++++++++ .../windowing/ReduceWindowFunction.java | 70 +++ .../ReduceWindowFunctionWithWindow.java | 71 +++ .../windowing/RichKeyedWindowFunction.java | 25 + .../flink/streaming/api/graph/StreamGraph.java | 6 + .../api/windowing/assigners/GlobalWindows.java | 55 +++ .../assigners/SlidingProcessingTimeWindows.java | 92 ++++ .../windowing/assigners/SlidingTimeWindows.java | 81 ++++ .../TumblingProcessingTimeWindows.java | 67 +++ .../assigners/TumblingTimeWindows.java | 67 +++ .../api/windowing/assigners/WindowAssigner.java | 32 ++ .../api/windowing/evictors/CountEvictor.java | 44 ++ .../api/windowing/evictors/DeltaEvictor.java | 58 +++ .../api/windowing/evictors/Evictor.java | 28 ++ .../api/windowing/evictors/TimeEvictor.java | 54 +++ .../ContinuousProcessingTimeTrigger.java | 79 ++++ .../triggers/ContinuousWatermarkTrigger.java | 65 +++ .../api/windowing/triggers/CountTrigger.java | 61 +++ .../api/windowing/triggers/DeltaTrigger.java | 66 +++ .../triggers/ProcessingTimeTrigger.java | 57 +++ .../api/windowing/triggers/PurgingTrigger.java | 76 +++ .../api/windowing/triggers/Trigger.java | 40 ++ .../windowing/triggers/WatermarkTrigger.java | 57 +++ .../api/windowing/windows/GlobalWindow.java | 65 +++ .../api/windowing/windows/TimeWindow.java | 75 +++ .../streaming/api/windowing/windows/Window.java | 27 ++ .../operators/BucketStreamSortOperator.java | 93 ++++ .../windowing/EvictingWindowOperator.java | 115 +++++ .../operators/windowing/PolicyToOperator.java | 166 ++++++- .../operators/windowing/WindowOperator.java | 320 +++++++++++++ .../windowing/buffers/EvictingWindowBuffer.java | 22 + .../windowing/buffers/HeapWindowBuffer.java | 88 ++++ .../buffers/PreAggregatingHeapWindowBuffer.java | 91 ++++ .../windowing/buffers/WindowBuffer.java | 34 ++ .../windowing/buffers/WindowBufferFactory.java | 30 ++ .../windowing/EvictingWindowOperatorTest.java | 179 ++++++++ .../windowing/PolicyWindowTranslationTest.java | 216 +++++++++ .../windowing/TriggerWindowTranslationTest.java | 201 ++++++++ .../operators/windowing/WindowOperatorTest.java | 459 +++++++++++++++++++ 40 files changed, 3701 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java index 611953e..ce105e5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java @@ -23,7 +23,9 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy; +import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.partitioner.HashPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; @@ -115,4 +117,21 @@ public class KeyedDataStream<T, KEY> extends DataStream<T> { public KeyedWindowDataStream<T, KEY> window(WindowPolicy window, WindowPolicy slide) { return new KeyedWindowDataStream<T, KEY>(this, window, slide); } + + /** + * Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows + * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The + * grouping of elements is done both by key and by window. + * + * <p> + * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify + * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger} + * that is used if a {@code Trigger} is not specified. + * + * @param assigner The {@code WindowAssigner} that assigns elements to windows. + * @return The trigger windows data stream. + */ + public <W extends Window> KeyedTriggerWindowDataStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) { + return new KeyedTriggerWindowDataStream<T, KEY, W>(this, assigner); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java new file mode 100644 index 0000000..5b39775 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.commons.lang.SerializationUtils; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.evictors.Evictor; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime; +import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator; +import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator; +import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; + +import java.util.concurrent.TimeUnit; + +/** + * A {@code KeyedTriggerWindowDataStream} represents a data stream where elements are grouped by + * key, and for each key, the stream of elements is split into windows based on a + * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission + * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}. + * + * <p> + * The windows are conceptually evaluated for each key individually, meaning windows can trigger at + * different points for each key. + * + * <p> + * If an {@link Evictor} is specified it will be used to evict elements from the window after + * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window. + * When using an evictor window performance will degrade significantly, since + * pre-aggregation of window results cannot be used. + * + * <p> + * Note that the {@code KeyedTriggerWindowDataStream} is purely and API construct, during runtime + * the {@code KeyedTriggerWindowDataStream} will be collapsed together with the + * {@code KeyedDataStream} and the operation over the window into one single operation. + * + * @param <T> The type of elements in the stream. + * @param <K> The type of the key by which elements are grouped. + * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to. + */ +public class KeyedTriggerWindowDataStream<T, K, W extends Window> { + + /** The keyed data stream that is windowed by this stream */ + private final KeyedDataStream<T, K> input; + + /** The window assigner */ + private final WindowAssigner<? super T, W> windowAssigner; + + /** The trigger that is used for window evaluation/emission. */ + private Trigger<? super T, ? super W> trigger; + + /** The evictor that is used for evicting elements before window evaluation. */ + private Evictor<? super T, ? super W> evictor; + + + public KeyedTriggerWindowDataStream(KeyedDataStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) { + this.input = input; + this.windowAssigner = windowAssigner; + this.trigger = windowAssigner.getDefaultTrigger(); + } + + /** + * Sets the {@code Trigger} that should be used to trigger window emission. + */ + public KeyedTriggerWindowDataStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) { + this.trigger = trigger; + return this; + } + + /** + * Sets the {@code Evictor} that should be used to evict elements from a window before emission. + * + * <p> + * Note: When using an evictor window performance will degrade significantly, since + * pre-aggregation of window results cannot be used. + */ + public KeyedTriggerWindowDataStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) { + this.evictor = evictor; + return this; + } + + + // ------------------------------------------------------------------------ + // Operations on the keyed windows + // ------------------------------------------------------------------------ + + /** + * Applies a reduce function to the window. The window function is called for each evaluation + * of the window for each key individually. The output of the reduce function is interpreted + * as a regular non-windowed stream. + * <p> + * This window will try and pre-aggregate data as much as the window policies permit. For example, + * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per + * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval, + * so a few elements are stored per key (one per slide interval). + * Custom windows may not be able to pre-aggregate, or may need to store extra values in an + * aggregation tree. + * + * @param function The reduce function. + * @return The data stream that is the result of applying the reduce function to the window. + */ + public DataStream<T> reduceWindow(ReduceFunction<T> function) { + String callLocation = Utils.getCallLocationName(); + String udfName = "Reduce at " + callLocation; + + DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName); + if (result != null) { + return result; + } + + String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; + KeySelector<T, K> keySel = input.getKeySelector(); + + OneInputStreamOperator<T, T> operator; + + if (evictor != null) { + operator = new EvictingWindowOperator<>(windowAssigner, + keySel, + new HeapWindowBuffer.Factory<T>(), + new ReduceWindowFunction<K, W, T>(function), + trigger, + evictor); + + } else { + // we need to copy because we need our own instance of the pre aggregator + @SuppressWarnings("unchecked") + ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function); + + operator = new WindowOperator<>(windowAssigner, + keySel, + new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy), + new ReduceWindowFunction<K, W, T>(function), + trigger); + } + + return input.transform(opName, input.getType(), operator); + } + + /** + * Applies a window function to the window. The window function is called for each evaluation + * of the window for each key individually. The output of the window function is interpreted + * as a regular non-windowed stream. + * <p> + * Not that this function requires that all data in the windows is buffered until the window + * is evaluated, as the function provides no means of pre-aggregation. + * + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + public <R> DataStream<R> mapWindow(KeyedWindowFunction<T, R, K, W> function) { + TypeInformation<T> inType = input.getType(); + TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( + function, KeyedWindowFunction.class, true, true, inType, null, false); + + String callLocation = Utils.getCallLocationName(); + String udfName = "MapWindow at " + callLocation; + + DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName); + if (result != null) { + return result; + } + + + String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; + KeySelector<T, K> keySel = input.getKeySelector(); + + OneInputStreamOperator<T, R> operator; + + if (evictor != null) { + operator = new EvictingWindowOperator<>(windowAssigner, + keySel, + new HeapWindowBuffer.Factory<T>(), + function, + trigger, + evictor); + + } else { + operator = new WindowOperator<>(windowAssigner, + keySel, + new HeapWindowBuffer.Factory<T>(), + function, + trigger); + } + + + + return input.transform(opName, resultType, operator); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private <R> DataStream<R> createFastTimeOperatorIfValid( + Function function, + TypeInformation<R> resultType, + String functionName) { + + WindowPolicy windowPolicy = null; + WindowPolicy slidePolicy = null; + + if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) { + SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner; + windowPolicy = ProcessingTime.of(timeWindows.getSize(), TimeUnit.MILLISECONDS); + slidePolicy = ProcessingTime.of(timeWindows.getSlide(), TimeUnit.MILLISECONDS); + } else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) { + TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner; + windowPolicy = ProcessingTime.of(timeWindows.getSize(), TimeUnit.MILLISECONDS); + } + + if (windowPolicy == null) { + return null; + } + + String opName = windowPolicy.toString(slidePolicy) + " of " + functionName; + KeySelector<T, K> keySel = input.getKeySelector(); + + OneInputStreamOperator<T, R> operator = + PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel); + + return input.transform(opName, resultType, operator); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java new file mode 100644 index 0000000..1c9578a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +public class ReduceWindowFunction<K, W extends Window, T> extends RichKeyedWindowFunction<T, T, K, W> { + private static final long serialVersionUID = 1L; + + private final ReduceFunction<T> reduceFunction; + + public ReduceWindowFunction(ReduceFunction<T> reduceFunction) { + this.reduceFunction = reduceFunction; + } + + @Override + public void setRuntimeContext(RuntimeContext ctx) { + super.setRuntimeContext(ctx); + FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + FunctionUtils.openFunction(reduceFunction, parameters); + } + + @Override + public void close() throws Exception { + super.close(); + FunctionUtils.closeFunction(reduceFunction); + } + + @Override + public void evaluate(K k, W window, Iterable<T> values, Collector<T> out) throws Exception { + T result = null; + + for (T v: values) { + if (result == null) { + result = v; + } else { + result = reduceFunction.reduce(result, v); + } + } + + if (result != null) { + out.collect(result); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java new file mode 100644 index 0000000..bceff82 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends RichKeyedWindowFunction<T, Tuple2<W, T>, K, W> { + private static final long serialVersionUID = 1L; + + private final ReduceFunction<T> reduceFunction; + + public ReduceWindowFunctionWithWindow(ReduceFunction<T> reduceFunction) { + this.reduceFunction = reduceFunction; + } + + @Override + public void setRuntimeContext(RuntimeContext ctx) { + super.setRuntimeContext(ctx); + FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + FunctionUtils.openFunction(reduceFunction, parameters); + } + + @Override + public void close() throws Exception { + super.close(); + FunctionUtils.closeFunction(reduceFunction); + } + + @Override + public void evaluate(K k, W window, Iterable<T> values, Collector<Tuple2<W, T>> out) throws Exception { + T result = null; + + for (T v: values) { + if (result == null) { + result = v; + } else { + result = reduceFunction.reduce(result, v); + } + } + + if (result != null) { + out.collect(Tuple2.of(window, result)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java new file mode 100644 index 0000000..90ccb40 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; + +public abstract class RichKeyedWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction implements KeyedWindowFunction<IN, OUT, KEY, W> { + private static final long serialVersionUID = 1L; +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index cda5686..cfa6d93 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -38,6 +38,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.optimizer.plan.StreamingPlan; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -217,6 +218,11 @@ public class StreamGraph extends StreamingPlan { outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig); } + if (operatorObject instanceof InputTypeConfigurable) { + InputTypeConfigurable inputTypeConfigurable = (InputTypeConfigurable) operatorObject; + inputTypeConfigurable.setInputType(inTypeInfo, executionConfig); + } + if (LOG.isDebugEnabled()) { LOG.debug("Vertex: {}", vertexID); } http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java new file mode 100644 index 0000000..391a6a4 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; + +import java.util.Collection; +import java.util.Collections; + +public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { + private static final long serialVersionUID = 1L; + + private GlobalWindows() {} + + @Override + public Collection<GlobalWindow> assignWindows(Object element, long timestamp) { + return Collections.singletonList(GlobalWindow.get()); + } + + @Override + public Trigger<Object, GlobalWindow> getDefaultTrigger() { + return null; + } + + @Override + public String toString() { + return "GlobalWindows()"; + } + + /** + * Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns + * all elements to the same {@link GlobalWindow}. + * + * @return The global window policy. + */ + public static GlobalWindows create() { + return new GlobalWindows(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java new file mode 100644 index 0000000..a2d95c2 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.assigners; + +import com.google.common.collect.Lists; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Collection; +import java.util.List; + +public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { + private static final long serialVersionUID = 1L; + + private final long size; + + private final long slide; + + private transient List<TimeWindow> result; + + private SlidingProcessingTimeWindows(long size, long slide) { + this.size = size; + this.slide = slide; + this.result = Lists.newArrayListWithCapacity((int) (size / slide)); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.result = Lists.newArrayListWithCapacity((int) (size / slide)); + } + + @Override + public Collection<TimeWindow> assignWindows(Object element, long timestamp) { + result.clear(); + long time = System.currentTimeMillis(); + long lastStart = time - time % slide; + for (long start = lastStart; + start > time - size; + start -= slide) { + result.add(new TimeWindow(start, size)); + } + return result; + } + + public long getSize() { + return size; + } + + public long getSlide() { + return slide; + } + + @Override + public Trigger<Object, TimeWindow> getDefaultTrigger() { + return ProcessingTimeTrigger.create(); + } + + @Override + public String toString() { + return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")"; + } + + /** + * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns + * elements to sliding time windows based on the current processing time. + * + * @param size The size of the generated windows. + * @param slide The slide interval of the generated windows. + * @return The time policy. + */ + public static SlidingProcessingTimeWindows of(long size, long slide) { + return new SlidingProcessingTimeWindows(size, slide); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java new file mode 100644 index 0000000..cb5a7a1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> { + private static final long serialVersionUID = 1L; + + private final long size; + + private final long slide; + + private SlidingTimeWindows(long size, long slide) { + this.size = size; + this.slide = slide; + } + + @Override + public Collection<TimeWindow> assignWindows(Object element, long timestamp) { + List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); + long lastStart = timestamp - timestamp % slide; + for (long start = lastStart; + start > timestamp - size; + start -= slide) { + windows.add(new TimeWindow(start, size)); + } + return windows; + } + + public long getSize() { + return size; + } + + public long getSlide() { + return slide; + } + + @Override + public Trigger<Object, TimeWindow> getDefaultTrigger() { + return WatermarkTrigger.create(); + } + + @Override + public String toString() { + return "SlidingTimeWindows(" + size + ", " + slide + ")"; + } + + /** + * Creates a new {@code SlidingTimeWindows} {@link WindowAssigner} that assigns + * elements to sliding time windows based on the element timestamp. + * + * @param size The size of the generated windows. + * @param slide The slide interval of the generated windows. + * @return The time policy. + */ + public static SlidingTimeWindows of(long size, long slide) { + return new SlidingTimeWindows(size, slide); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java new file mode 100644 index 0000000..b1ef857 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; +import java.util.Collections; + +public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { + private static final long serialVersionUID = 1L; + + private long size; + + private TumblingProcessingTimeWindows(long size) { + this.size = size; + } + + @Override + public Collection<TimeWindow> assignWindows(Object element, long timestamp) { + long time = System.currentTimeMillis(); + long start = time - (time % size); + return Collections.singletonList(new TimeWindow(start, size)); + } + + public long getSize() { + return size; + } + + @Override + public Trigger<Object, TimeWindow> getDefaultTrigger() { + return ProcessingTimeTrigger.create(); + } + + @Override + public String toString() { + return "TumblingProcessingTimeWindows(" + size + ")"; + } + + /** + * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the current processing time. + * + * @param size The size of the generated windows. + * @return The time policy. + */ + public static TumblingProcessingTimeWindows of(long size) { + return new TumblingProcessingTimeWindows(size); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java new file mode 100644 index 0000000..d19c97d --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; +import java.util.Collections; + +public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> { + private static final long serialVersionUID = 1L; + + private long size; + + private TumblingTimeWindows(long size) { + this.size = size; + } + + @Override + public Collection<TimeWindow> assignWindows(Object element, long timestamp) { + long start = timestamp - (timestamp % size); + return Collections.singletonList(new TimeWindow(start, size)); + } + + public long getSize() { + return size; + } + + @Override + public Trigger<Object, TimeWindow> getDefaultTrigger() { + return WatermarkTrigger.create(); + } + + @Override + public String toString() { + return "TumblingTimeWindows(" + size + ")"; + } + + /** + * Creates a new {@code TumblingTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp. + * + * @param size The size of the generated windows. + * @return The time policy. + */ + public static TumblingTimeWindows of(long size) { + return new TumblingTimeWindows(size); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java new file mode 100644 index 0000000..5996426 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.Window; +import scala.Serializable; + +import java.util.Collection; + +public abstract class WindowAssigner<T, W extends Window> implements Serializable { + private static final long serialVersionUID = 1L; + + public abstract Collection<W> assignWindows(T element, long timestamp); + + public abstract Trigger<T, W> getDefaultTrigger(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java new file mode 100644 index 0000000..04636ee --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.evictors; + +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +public class CountEvictor<W extends Window> implements Evictor<Object, W> { + private static final long serialVersionUID = 1L; + + private final long maxCount; + + private CountEvictor(long count) { + this.maxCount = count; + } + + @Override + public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) { + if (size > maxCount) { + return (int) (size - maxCount); + } else { + return 0; + } + } + + public static <W extends Window> CountEvictor<W> of(long maxCount) { + return new CountEvictor<>(maxCount); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java new file mode 100644 index 0000000..c7872ce --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.evictors; + +import com.google.common.collect.Iterables; +import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> { + private static final long serialVersionUID = 1L; + + DeltaFunction<T> deltaFunction; + private double threshold; + + private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) { + this.deltaFunction = deltaFunction; + this.threshold = threshold; + } + + @Override + public int evict(Iterable<StreamRecord<T>> elements, int size, W window) { + StreamRecord<T> lastElement = Iterables.getLast(elements); + int toEvict = 0; + for (StreamRecord<T> element : elements) { + if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) < this.threshold) { + break; + } + toEvict++; + } + + return toEvict; + } + + @Override + public String toString() { + return "DeltaEvictor(" + deltaFunction + ", " + threshold + ")"; + } + + public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction) { + return new DeltaEvictor<>(threshold, deltaFunction); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java new file mode 100644 index 0000000..db04ac4 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.evictors; + +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import scala.Serializable; + +public interface Evictor<T, W extends Window> extends Serializable { + + public abstract int evict(Iterable<StreamRecord<T>> elements, int size, W window); +} + http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java new file mode 100644 index 0000000..450b132 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.evictors; + +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +public class TimeEvictor<W extends Window> implements Evictor<Object, W> { + private static final long serialVersionUID = 1L; + + private final long windowSize; + + public TimeEvictor(long windowSize) { + this.windowSize = windowSize; + } + + @Override + public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) { + int toEvict = 0; + long currentTime = System.currentTimeMillis(); + long evictCutoff = currentTime - windowSize; + for (StreamRecord<Object> record: elements) { + if (record.getTimestamp() > evictCutoff) { + break; + } + toEvict++; + } + return toEvict; + } + + @Override + public String toString() { + return "TimeEvictor(" + windowSize + ")"; + } + + public static <W extends Window> TimeEvictor<W> of(long windowSize) { + return new TimeEvictor<>(windowSize); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java new file mode 100644 index 0000000..64850a2 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.triggers; + +import org.apache.flink.streaming.api.windowing.windows.Window; + +public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> { + private static final long serialVersionUID = 1L; + + private long granularity; + + private long nextFireTimestamp = 0; + + private ContinuousProcessingTimeTrigger(long granularity) { + this.granularity = granularity; + } + + @Override + public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) { + long currentTime = System.currentTimeMillis(); + if (nextFireTimestamp == 0) { + long start = currentTime - (currentTime % granularity); + nextFireTimestamp = start + granularity; + + ctx.registerProcessingTimeTimer(nextFireTimestamp); + return TriggerResult.CONTINUE; + } + if (currentTime > nextFireTimestamp) { + long start = currentTime - (currentTime % granularity); + nextFireTimestamp = start + granularity; + + ctx.registerProcessingTimeTimer(nextFireTimestamp); + + return TriggerResult.FIRE; + } + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onTime(long time, TriggerContext ctx) { + // only fire if an element didn't already fire + long currentTime = System.currentTimeMillis(); + if (currentTime > nextFireTimestamp) { + long start = currentTime - (currentTime % granularity); + nextFireTimestamp = start + granularity; + return TriggerResult.FIRE; + } + return TriggerResult.CONTINUE; + } + + @Override + public Trigger<Object, W> duplicate() { + return new ContinuousProcessingTimeTrigger<>(granularity); + } + + @Override + public String toString() { + return "ContinuousProcessingTimeTrigger(" + granularity + ")"; + } + + public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(long granularity) { + return new ContinuousProcessingTimeTrigger<>(granularity); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java new file mode 100644 index 0000000..b7f085a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.triggers; + +import org.apache.flink.streaming.api.windowing.windows.Window; + +public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> { + private static final long serialVersionUID = 1L; + + private long granularity; + + private boolean first = true; + + private ContinuousWatermarkTrigger(long granularity) { + this.granularity = granularity; + } + + @Override + public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) { + if (first) { + long start = timestamp - (timestamp % granularity); + long nextFireTimestamp = start + granularity; + + ctx.registerWatermarkTimer(nextFireTimestamp); + first = false; + return TriggerResult.CONTINUE; + } + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onTime(long time, TriggerContext ctx) { + ctx.registerWatermarkTimer(time + granularity); + return TriggerResult.FIRE; + } + + @Override + public Trigger<Object, W> duplicate() { + return new ContinuousWatermarkTrigger<>(granularity); + } + + @Override + public String toString() { + return "ContinuousProcessingTimeTrigger(" + granularity + ")"; + } + + public static <W extends Window> ContinuousWatermarkTrigger<W> of(long granularity) { + return new ContinuousWatermarkTrigger<>(granularity); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java new file mode 100644 index 0000000..a51fae6 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.triggers; + +import org.apache.flink.streaming.api.windowing.windows.Window; + +public class CountTrigger<W extends Window> implements Trigger<Object, W> { + private static final long serialVersionUID = 1L; + + private long maxCount; + private long count; + + private CountTrigger(long maxCount) { + this.maxCount = maxCount; + count = 0; + } + + @Override + public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) { + count++; + if (count >= maxCount) { + count = 0; + return TriggerResult.FIRE; + } + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onTime(long time, TriggerContext ctx) { + return null; + } + + @Override + public Trigger<Object, W> duplicate() { + return new CountTrigger<>(maxCount); + } + + @Override + public String toString() { + return "CountTrigger(" + maxCount + ")"; + } + + public static <W extends Window> CountTrigger<W> of(long maxCount) { + return new CountTrigger<>(maxCount); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java new file mode 100644 index 0000000..ecd7ed0 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.triggers; + +import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; + +public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> { + private static final long serialVersionUID = 1L; + + DeltaFunction<T> deltaFunction; + private double threshold; + private transient T lastElement; + + private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction) { + this.deltaFunction = deltaFunction; + this.threshold = threshold; + } + + @Override + public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) { + if (lastElement == null) { + lastElement = element; + return TriggerResult.CONTINUE; + } + if (deltaFunction.getDelta(lastElement, element) > this.threshold) { + lastElement = element; + return TriggerResult.FIRE; + } + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onTime(long time, TriggerContext ctx) { + return null; + } + + @Override + public Trigger<T, W> duplicate() { + return new DeltaTrigger<>(threshold, deltaFunction); + } + + @Override + public String toString() { + return "DeltaTrigger(" + deltaFunction + ", " + threshold + ")"; + } + + public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) { + return new DeltaTrigger<>(threshold, deltaFunction); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java new file mode 100644 index 0000000..f693a67 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.triggers; + +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> { + private static final long serialVersionUID = 1L; + + boolean isFirst = true; + + private ProcessingTimeTrigger() {} + + @Override + public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { + if (isFirst) { + ctx.registerProcessingTimeTimer(window.getEnd()); + isFirst = false; + } + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onTime(long time, TriggerContext ctx) { + return TriggerResult.FIRE_AND_PURGE; + } + + @Override + public Trigger<Object, TimeWindow> duplicate() { + return new ProcessingTimeTrigger(); + } + + @Override + public String toString() { + return "ProcessingTimeTrigger()"; + } + + public static ProcessingTimeTrigger create() { + return new ProcessingTimeTrigger(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java new file mode 100644 index 0000000..88e22cd --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.triggers; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.flink.streaming.api.windowing.windows.Window; + +public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> { + private static final long serialVersionUID = 1L; + + private Trigger<T, W> nestedTrigger; + + private PurgingTrigger(Trigger<T, W> nestedTrigger) { + this.nestedTrigger = nestedTrigger; + } + + @Override + public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) { + TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx); + switch (triggerResult) { + case FIRE: + return TriggerResult.FIRE_AND_PURGE; + case FIRE_AND_PURGE: + return TriggerResult.FIRE_AND_PURGE; + default: + return TriggerResult.CONTINUE; + } + } + + @Override + public TriggerResult onTime(long time, TriggerContext ctx) { + TriggerResult triggerResult = nestedTrigger.onTime(time, ctx); + switch (triggerResult) { + case FIRE: + return TriggerResult.FIRE_AND_PURGE; + case FIRE_AND_PURGE: + return TriggerResult.FIRE_AND_PURGE; + default: + return TriggerResult.CONTINUE; + } + } + + @Override + public Trigger<T, W> duplicate() { + return new PurgingTrigger<>(nestedTrigger.duplicate()); + } + + @Override + public String toString() { + return "PurgingTrigger(" + nestedTrigger.toString() + ")"; + } + + public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) { + return new PurgingTrigger<>(nestedTrigger); + } + + @VisibleForTesting + public Trigger<T, W> getNestedTrigger() { + return nestedTrigger; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java new file mode 100644 index 0000000..b04aacf --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.triggers; + +import org.apache.flink.streaming.api.windowing.windows.Window; +import scala.Serializable; + +public interface Trigger<T, W extends Window> extends Serializable { + + public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx); + + public TriggerResult onTime(long time, TriggerContext ctx); + + public Trigger<T, W> duplicate(); + + public static enum TriggerResult { + CONTINUE, FIRE_AND_PURGE, FIRE + } + + public interface TriggerContext { + void registerProcessingTimeTimer(long time); + + void registerWatermarkTimer(long time); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java new file mode 100644 index 0000000..6ba8890 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.triggers; + +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +public class WatermarkTrigger implements Trigger<Object, TimeWindow> { + private static final long serialVersionUID = 1L; + + boolean isFirst = true; + + private WatermarkTrigger() {} + + @Override + public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { + if (isFirst) { + ctx.registerWatermarkTimer(window.maxTimestamp()); + isFirst = false; + } + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onTime(long time, TriggerContext ctx) { + return TriggerResult.FIRE_AND_PURGE; + } + + @Override + public Trigger<Object, TimeWindow> duplicate() { + return new WatermarkTrigger(); + } + + @Override + public String toString() { + return "WatermarkTrigger()"; + } + + public static WatermarkTrigger create() { + return new WatermarkTrigger(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java new file mode 100644 index 0000000..e0df19d --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.windows; + +public class GlobalWindow extends Window { + + private static GlobalWindow INSTANCE = new GlobalWindow(); + + private GlobalWindow() { } + + public static GlobalWindow get() { + return INSTANCE; + } + + @Override + public long getStart() { + return Long.MIN_VALUE; + } + + @Override + public long getEnd() { + return Long.MAX_VALUE; + } + + @Override + public long maxTimestamp() { + return Long.MAX_VALUE; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public String toString() { + return "GlobalWindow"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java new file mode 100644 index 0000000..20080c0 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.windows; + +public class TimeWindow extends Window { + long start; + long end; + + public TimeWindow() { + } + + public TimeWindow(long start, long size) { + this.start = start; + this.end = start + size - 1; + } + + @Override + public long getStart() { + return start; + } + + @Override + public long getEnd() { + return end; + } + + @Override + public long maxTimestamp() { + return end; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TimeWindow window = (TimeWindow) o; + + return end == window.end && start == window.start; + } + + @Override + public int hashCode() { + int result = (int) (start ^ (start >>> 32)); + result = 31 * result + (int) (end ^ (end >>> 32)); + return result; + } + + @Override + public String toString() { + return "TimeWindow{" + + "start=" + start + + ", end=" + end + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java new file mode 100644 index 0000000..4e22c32 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.windows; + +public abstract class Window { + + public abstract long getStart(); + + public abstract long getEnd(); + + public abstract long maxTimestamp(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java new file mode 100644 index 0000000..145ad25 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> { + private static final long serialVersionUID = 1L; + + private long granularity; + + private transient Map<Long, List<StreamRecord<T>>> buckets; + + public BucketStreamSortOperator(long granularity) { + this.granularity = granularity; + } + + @Override + @SuppressWarnings("unchecked") + public void open(Configuration parameters) throws Exception { + super.open(parameters); + buckets = Maps.newHashMap(); + + } + + @Override + @SuppressWarnings("unchecked") + public void processElement(StreamRecord<T> record) throws Exception { + long bucketId = record.getTimestamp() - (record.getTimestamp() % granularity); + List<StreamRecord<T>> bucket = buckets.get(bucketId); + if (bucket == null) { + bucket = Lists.newArrayList(); + buckets.put(bucketId, bucket); + } + bucket.add(record); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + long maxBucketId = mark.getTimestamp() - (mark.getTimestamp() % granularity); + Set<Long> toRemove = Sets.newHashSet(); + for (Map.Entry<Long, List<StreamRecord<T>>> bucket: buckets.entrySet()) { + if (bucket.getKey() < maxBucketId) { + Collections.sort(bucket.getValue(), new Comparator<StreamRecord<T>>() { + @Override + public int compare(StreamRecord<T> o1, StreamRecord<T> o2) { + return (int) (o1.getTimestamp() - o2.getTimestamp()); + } + }); + for (StreamRecord<T> r: bucket.getValue()) { + output.collect(r); + } + toRemove.add(bucket.getKey()); + } + } + + for (Long l: toRemove) { + buckets.remove(l); + } + + output.emitWatermark(mark); + } + +}
