[FLINK-2753] [streaming] [api breaking] Add first parts of new window API for key grouped windows
This follows the API design outlined in https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams This is API breaking because it adds new generic type parameters to Java and Scala classes, breaking binary compatibility. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e20299c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e20299c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e20299c Branch: refs/heads/master Commit: 7e20299c4e2d9cc78c36f90bdf0acdbaf72062b0 Parents: 501a9b0 Author: Stephan Ewen <[email protected]> Authored: Wed Sep 23 12:05:54 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Sep 28 17:04:16 2015 +0200 ---------------------------------------------------------------------- .../flink/streaming/api/TimeCharacteristic.java | 81 +++++++++++ .../api/datastream/ConnectedDataStream.java | 4 +- .../streaming/api/datastream/DataStream.java | 26 ++-- .../api/datastream/GroupedDataStream.java | 8 +- .../api/datastream/KeyedDataStream.java | 51 ++++++- .../api/datastream/KeyedWindowDataStream.java | 135 +++++++++++++++++++ .../api/datastream/WindowedDataStream.java | 4 +- .../environment/StreamExecutionEnvironment.java | 47 ++++++- .../functions/windows/KeyedWindowFunction.java | 6 +- .../windowpolicy/AbstractTimePolicy.java | 109 +++++++++++++++ .../api/windowing/windowpolicy/EventTime.java | 64 +++++++++ .../windowing/windowpolicy/ProcessingTime.java | 65 +++++++++ .../api/windowing/windowpolicy/Time.java | 68 ++++++++++ .../windowing/windowpolicy/WindowPolicy.java | 57 ++++++++ .../windows/AccumulatingKeyedTimePanes.java | 8 +- ...ccumulatingProcessingTimeWindowOperator.java | 4 +- .../operators/windows/PolicyToOperator.java | 82 +++++++++++ .../streaming/util/keys/KeySelectorUtil.java | 17 ++- .../api/state/StatefulOperatorTest.java | 8 +- .../GroupedProcessingTimeWindowExample.java | 79 +++-------- .../flink/streaming/api/scala/DataStream.scala | 17 +-- .../streaming/api/scala/GroupedDataStream.scala | 3 +- .../api/scala/StreamExecutionEnvironment.scala | 25 +++- .../flink/streaming/api/scala/package.scala | 4 +- 24 files changed, 848 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java new file mode 100644 index 0000000..1ad3c99 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +/** + * The time characteristic defines how the system determines time for time-dependent + * order and operations that depend on time (such as time windows). + */ +public enum TimeCharacteristic { + + /** + * Processing time for operators means that the operator uses the system clock of the machine + * to determine the current time of the data stream. Processing-time windows trigger based + * on wall-clock time and include whatever elements happen to have arrived at the operator at + * that point in time. + * <p> + * Using processing time for window operations results in general in quite non-deterministic results, + * because the contents of the windows depends on the speed in which elements arrive. It is, however, + * the cheapest method of forming windows and the method that introduces the least latency. + */ + ProcessingTime, + + /** + * Ingestion time means that the time of each individual element in the stream is determined + * when the element enters the Flink streaming data flow. Operations like windows group the + * elements based on that time, meaning that processing speed within the streaming dataflow + * does not affect windowing, but only the speed at which sources receive elements. + * <p> + * Ingestion time is often a good compromise between more processing time and event time. + * It does not need and special manual form of watermark generation, and events are typically + * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can + * only be introduced by streaming shuffles or split/join/union operations. The fact that elements + * are not very much out-of-order means that the latency increase is moderate, compared to event + * time. + */ + IngestionTime, + + /** + * Event time means that the time of each individual element in the stream (also called event) + * is determined by the event's individual custom timestamp. These timestamps either exist in the + * elements from before they entered the Flink streaming dataflow, or are user-assigned at the sources. + * The big implication of this is that elements arrive in the sources and in all operators generally + * out of order, meaning that elements with earlier timestamps may arrive after elements with + * later timestamps. + * <p> + * Operators that window or order data with respect to event time must buffer data until they can + * be sure that all timestamps for a certain time interval have been received. This is handled by + * the so called "time watermarks". + * <p> + * Operations based on event time are very predictable - the result of windowing operations + * is typically identical no matter when the window is executed and how fast the streams operate. + * At the same time, the buffering and tracking of event time is also costlier than operating + * with processing time, and typically also introduces more latency. The amount of extra + * cost depends mostly on how much out of order the elements arrive, i.e., how long the time span + * between the arrival of early and late elements is. With respect to the "time watermarks", this + * means that teh cost typically depends on how early or late the watermarks for can be generated + * for their timestamp. + * <p> + * In relation to {@link #IngestionTime}, the event time is similar, but refers the the event's + * original time, rather than the time assigned at the data source. Practically, that means that + * event time has generally more meaning, but also that it takes longer to determine that all + * elements for a certain time have arrived. + */ + EventTime +} http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index 8609a30..0406e35 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -67,8 +67,8 @@ public class ConnectedDataStream<IN1, IN2> { if ((input1 instanceof GroupedDataStream) && (input2 instanceof GroupedDataStream)) { this.isGrouped = true; - this.keySelector1 = ((GroupedDataStream<IN1>) input1).keySelector; - this.keySelector2 = ((GroupedDataStream<IN2>) input2).keySelector; + this.keySelector1 = ((GroupedDataStream<IN1, ?>) input1).keySelector; + this.keySelector2 = ((GroupedDataStream<IN2, ?>) input2).keySelector; } else { this.isGrouped = false; this.keySelector1 = null; http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index d92498c..5dfb1e2 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -229,8 +229,8 @@ public class DataStream<T> { * The KeySelector to be used for extracting the key for partitioning * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream) */ - public KeyedDataStream<T> keyBy(KeySelector<T,?> key){ - return new KeyedDataStream<T>(this, clean(key)); + public <K> KeyedDataStream<T, K> keyBy(KeySelector<T, K> key){ + return new KeyedDataStream<T, K>(this, clean(key)); } /** @@ -241,7 +241,7 @@ public class DataStream<T> { * will be grouped. * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream) */ - public KeyedDataStream<T> keyBy(int... fields) { + public KeyedDataStream<T, Tuple> keyBy(int... fields) { if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) { return keyBy(new KeySelectorUtil.ArrayKeySelector<T>(fields)); } else { @@ -260,12 +260,12 @@ public class DataStream<T> { * partitioned. * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream) **/ - public KeyedDataStream<T> keyBy(String... fields) { + public KeyedDataStream<T, Tuple> keyBy(String... fields) { return keyBy(new Keys.ExpressionKeys<T>(fields, getType())); } - private KeyedDataStream<T> keyBy(Keys<T> keys) { - return new KeyedDataStream<T>(this, clean(KeySelectorUtil.getSelectorForKeys(keys, + private KeyedDataStream<T, Tuple> keyBy(Keys<T> keys) { + return new KeyedDataStream<T, Tuple>(this, clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))); } @@ -279,7 +279,7 @@ public class DataStream<T> { * will be partitioned. * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream) */ - public GroupedDataStream<T> groupBy(int... fields) { + public GroupedDataStream<T, Tuple> groupBy(int... fields) { if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) { return groupBy(new KeySelectorUtil.ArrayKeySelector<T>(fields)); } else { @@ -304,7 +304,7 @@ public class DataStream<T> { * grouped. * @return The grouped {@link DataStream} **/ - public GroupedDataStream<T> groupBy(String... fields) { + public GroupedDataStream<T, Tuple> groupBy(String... fields) { return groupBy(new Keys.ExpressionKeys<T>(fields, getType())); } @@ -322,13 +322,13 @@ public class DataStream<T> { * the values * @return The grouped {@link DataStream} */ - public GroupedDataStream<T> groupBy(KeySelector<T, ?> keySelector) { - return new GroupedDataStream<T>(this, clean(keySelector)); + public <K> GroupedDataStream<T, K> groupBy(KeySelector<T, K> keySelector) { + return new GroupedDataStream<T, K>(this, clean(keySelector)); } - private GroupedDataStream<T> groupBy(Keys<T> keys) { - return new GroupedDataStream<T>(this, clean(KeySelectorUtil.getSelectorForKeys(keys, - getType(), getExecutionConfig()))); + private GroupedDataStream<T, Tuple> groupBy(Keys<T> keys) { + return new GroupedDataStream<T, Tuple>(this, + clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java index a1106bc..50bf341 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java @@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.operators.StreamGroupedReduce; * @param <OUT> * The output type of the {@link GroupedDataStream}. */ -public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> { +public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { /** * Creates a new {@link GroupedDataStream}, group inclusion is determined using @@ -48,7 +48,7 @@ public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> { * @param dataStream Base stream of data * @param keySelector Function for determining group inclusion */ - public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, ?> keySelector) { + public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySelector) { super(dataStream, keySelector); } @@ -324,8 +324,6 @@ public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> { protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) { StreamGroupedReduce<OUT> operator = new StreamGroupedReduce<OUT>(clean(aggregate), keySelector); - SingleOutputStreamOperator<OUT, ?> returnStream = transform("Grouped Aggregation", - getType(), operator); - return returnStream; + return transform("Grouped Aggregation", getType(), operator); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java index 100e5de..a32cf53 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy; import org.apache.flink.streaming.runtime.partitioner.HashPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; @@ -32,11 +33,12 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; * are also possible on a KeyedDataStream, with the exception of partitioning methods such as shuffle, forward and groupBy. * * - * @param <T> The type of the elements in the Keyed Stream + * @param <T> The type of the elements in the Keyed Stream. + * @param <K> The type of the key in the Keyed Stream. */ -public class KeyedDataStream<T> extends DataStream<T> { +public class KeyedDataStream<T, K> extends DataStream<T> { - protected final KeySelector<T, ?> keySelector; + protected final KeySelector<T, K> keySelector; /** * Creates a new {@link KeyedDataStream} using the given {@link KeySelector} @@ -47,35 +49,70 @@ public class KeyedDataStream<T> extends DataStream<T> { * @param keySelector * Function for determining state partitions */ - public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, ?> keySelector) { + public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, K> keySelector) { super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector))); this.keySelector = keySelector; } - public KeySelector<T, ?> getKeySelector() { + + public KeySelector<T, K> getKeySelector() { return this.keySelector; } + @Override protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) { throw new UnsupportedOperationException("Cannot override partitioning for KeyedDataStream."); } + @Override public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator); - ((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector( - keySelector); + ((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(keySelector); return returnStream; } + + @Override public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) { DataStreamSink<T> result = super.addSink(sinkFunction); result.getTransformation().setStateKeySelector(keySelector); return result; } + + // ------------------------------------------------------------------------ + // Windowing + // ------------------------------------------------------------------------ + + /** + * Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key + * grouped stream. The window is defined by a single policy. + * <p> + * For time windows, these single-policy windows result in tumbling time windows. + * + * @param policy The policy that defines the window. + * @return The windows data stream. + */ + public KeyedWindowDataStream<T, K> window(WindowPolicy policy) { + return new KeyedWindowDataStream<T, K>(this, policy); + } + + /** + * Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key + * grouped stream. The window is defined by a window policy, plus a slide policy. + * <p> + * For time windows, these slide policy windows result in sliding time windows. + * + * @param window The policy that defines the window. + * @param slide The additional policy defining the slide of the window. + * @return The windows data stream. + */ + public KeyedWindowDataStream<T, K> window(WindowPolicy window, WindowPolicy slide) { + return new KeyedWindowDataStream<T, K>(this, window, slide); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java new file mode 100644 index 0000000..2ec175a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy; +import org.apache.flink.streaming.runtime.operators.windows.PolicyToOperator; + +/** + * A KeyedWindowDataStream represents a data stream where elements are grouped by key, and + * for each key, the stream of elements is split into windows. The windows are conceptually + * evaluated for each key individually, meaning windows and trigger at different points + * for each key. + * <p> + * In many cases, however, the windows are "aligned", meaning they trigger at the + * same time for all keys. The most common example for that are the regular time windows. + * <p> + * Note that the KeyedWindowDataStream is purely and API construct, during runtime the + * KeyedWindowDataStream will be collapsed together with the KeyedDataStream and the operation + * over the window into one single operation. + * + * @param <Type> The type of elements in the stream. + * @param <Key> The type of the key by which elements are grouped. + */ +public class KeyedWindowDataStream<Type, Key> { + + /** The keyed data stream that is windowed by this stream */ + private final KeyedDataStream<Type, Key> input; + + /** The core window policy */ + private final WindowPolicy windowPolicy; + + /** The optional additional slide policy */ + private final WindowPolicy slidePolicy; + + + public KeyedWindowDataStream(KeyedDataStream<Type, Key> input, WindowPolicy windowPolicy) { + this(input, windowPolicy, null); + } + + public KeyedWindowDataStream(KeyedDataStream<Type, Key> input, + WindowPolicy windowPolicy, WindowPolicy slidePolicy) + { + TimeCharacteristic time = input.getExecutionEnvironment().getStreamTimeCharacteristic(); + + this.input = input; + this.windowPolicy = windowPolicy.makeSpecificBasedOnTimeCharacteristic(time); + this.slidePolicy = slidePolicy == null ? null : slidePolicy.makeSpecificBasedOnTimeCharacteristic(time); + } + + // ------------------------------------------------------------------------ + // Operations on the keyed windows + // ------------------------------------------------------------------------ + + /** + * Applies a reduce function to the window. The window function is called for each evaluation + * of the window for each key individually. The output of the reduce function is interpreted + * as a regular non-windowed stream. + * <p> + * This window will try and pre-aggregate data as much as the window policies permit. For example, + * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per + * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval, + * so a few elements are stored per key (one per slide interval). + * Custom windows may not be able to pre-aggregate, or may need to store extra values in an + * aggregation tree. + * + * @param function The reduce function. + * @return The data stream that is the result of applying the reduce function to the window. + */ + public DataStream<Type> reduceWindow(ReduceFunction<Type> function) { + String callLocation = Utils.getCallLocationName(); + return createWindowOperator(function, input.getType(), "Reduce at " + callLocation); + } + + /** + * Applies a window function to the window. The window function is called for each evaluation + * of the window for each key individually. The output of the window function is interpreted + * as a regular non-windowed stream. + * <p> + * Not that this function requires that all data in the windows is buffered until the window + * is evaluated, as the function provides no means od pre-aggregation. + * + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<Type, Result, Key> function) { + String callLocation = Utils.getCallLocationName(); + + TypeInformation<Type> inType = input.getType(); + TypeInformation<Result> resultType = TypeExtractor.getUnaryOperatorReturnType( + function, KeyedWindowFunction.class, true, true, inType, null, false); + + return createWindowOperator(function, resultType, "KeyedWindowFunction at " + callLocation); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private <Result> DataStream<Result> createWindowOperator( + Function function, TypeInformation<Result> resultType, String functionName) { + + String opName = windowPolicy.toString(slidePolicy) + " of " + functionName; + KeySelector<Type, Key> keySel = input.getKeySelector(); + + OneInputStreamOperator<Type, Result> operator = + PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel); + + return input.transform(opName, resultType, operator); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index bf3a11a..1226adf 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -103,7 +103,7 @@ public class WindowedDataStream<OUT> { this.triggerHelper = policyHelper; if (dataStream instanceof GroupedDataStream) { - this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector; + this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector; } } @@ -115,7 +115,7 @@ public class WindowedDataStream<OUT> { this.userEvicter = evicter; if (dataStream instanceof GroupedDataStream) { - this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector; + this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector; } } http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index d91afc9..a22a519 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.environment; import com.esotericsoftware.kryo.Serializer; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; + import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; @@ -49,6 +49,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.FileStateHandle; import org.apache.flink.runtime.state.StateHandleProvider; import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction; @@ -72,10 +73,12 @@ import org.apache.flink.util.SplittableIterator; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Objects; /** * {@link org.apache.flink.api.java.ExecutionEnvironment} for streaming jobs. An instance of it is @@ -83,25 +86,33 @@ import java.util.List; */ public abstract class StreamExecutionEnvironment { - public final static String DEFAULT_JOB_NAME = "Flink Streaming Job"; + public static final String DEFAULT_JOB_NAME = "Flink Streaming Job"; private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors(); - + + /** The time characteristic that is used if none other is set */ + private static TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime; + + // ------------------------------------------------------------------------ + private long bufferTimeout = 100; - private ExecutionConfig config = new ExecutionConfig(); + private final ExecutionConfig config = new ExecutionConfig(); - protected List<StreamTransformation<?>> transformations = Lists.newArrayList(); + protected final List<StreamTransformation<?>> transformations = new ArrayList<>(); protected boolean isChainingEnabled = true; protected long checkpointInterval = -1; // disabled - protected CheckpointingMode checkpointingMode = null; + protected CheckpointingMode checkpointingMode; protected boolean forceCheckpointing = false; protected StateHandleProvider<?> stateHandleProvider; + + /** The time characteristic used by the data streams */ + private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC; /** The environment of the context (local by default, cluster if invoked through command line) */ private static StreamExecutionEnvironmentFactory contextEnvironmentFactory; @@ -516,6 +527,30 @@ public abstract class StreamExecutionEnvironment { } // -------------------------------------------------------------------------------------------- + // Time characteristic + // -------------------------------------------------------------------------------------------- + + /** + * Sets the time characteristic for the stream, e.g., processing time, event time, + * or ingestion time. + * + * @param characteristic The time characteristic. + */ + public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { + this.timeCharacteristic = Objects.requireNonNull(characteristic); + } + + /** + * Gets the time characteristic for the stream, e.g., processing time, event time, + * or ingestion time. + * + * @return The time characteristic. + */ + public TimeCharacteristic getStreamTimeCharacteristic() { + return timeCharacteristic; + } + + // -------------------------------------------------------------------------------------------- // Data stream creations // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java index d7ca0a1..b4e55e4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java @@ -25,12 +25,12 @@ import java.io.Serializable; /** * Base interface for functions that are evaluated over keyed (grouped) windows. - * - * @param <KEY> The type of the key. + * * @param <IN> The type of the input value. * @param <OUT> The type of the output value. + * @param <KEY> The type of the key. */ -public interface KeyedWindowFunction<KEY, IN, OUT> extends Function, Serializable { +public interface KeyedWindowFunction<IN, OUT, KEY> extends Function, Serializable { /** * http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java new file mode 100644 index 0000000..9dc0dd0 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.windowpolicy; + +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class AbstractTimePolicy extends WindowPolicy { + + private static final long serialVersionUID = 6593098375698927728L; + + /** the time unit for this policy's time interval */ + private final TimeUnit unit; + + /** the length of this policy's time interval */ + private final long num; + + + protected AbstractTimePolicy(long num, TimeUnit unit) { + this.unit = checkNotNull(unit, "time unit may not be null"); + this.num = num; + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + /** + * Gets the time unit for this policy's time interval. + * @return The time unit for this policy's time interval. + */ + public TimeUnit getUnit() { + return unit; + } + + /** + * Gets the length of this policy's time interval. + * @return The length of this policy's time interval. + */ + public long getNum() { + return num; + } + + /** + * Converts the time interval to milliseconds. + * @return The time interval in milliseconds. + */ + public long toMilliseconds() { + return unit.toMillis(num); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public String toString(WindowPolicy slidePolicy) { + if (slidePolicy == null) { + return "Tumbling Window (" + getClass().getSimpleName() + ") (" + num + ' ' + unit.name() + ')'; + } + else if (slidePolicy.getClass() == getClass()) { + AbstractTimePolicy timeSlide = (AbstractTimePolicy) slidePolicy; + + return "Sliding Window (" + getClass().getSimpleName() + ") (length=" + + num + ' ' + unit.name() + ", slide=" + timeSlide.num + ' ' + timeSlide.unit.name() + ')'; + } + else { + return super.toString(slidePolicy); + } + } + + @Override + public int hashCode() { + return 31 * (int) (num ^ (num >>> 32)) + unit.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj.getClass() == getClass()) { + AbstractTimePolicy that = (AbstractTimePolicy) obj; + return this.num == that.num && this.unit.equals(that.unit); + } + else { + return false; + } + } + + @Override + public String toString() { + return getClass().getSimpleName() + " (" + num + ' ' + unit.name() + ')'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java new file mode 100644 index 0000000..8a671fc --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.windowpolicy; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.streaming.api.TimeCharacteristic; + +import java.util.concurrent.TimeUnit; + +/** + * The definition of an event time interval for windowing. See + * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a definition + * of event time. + */ +public final class EventTime extends AbstractTimePolicy { + + private static final long serialVersionUID = 8333566691833596747L; + + /** Instantiation only via factory method */ + private EventTime(long num, TimeUnit unit) { + super(num, unit); + } + + @Override + public EventTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) { + if (characteristic == TimeCharacteristic.EventTime || characteristic == TimeCharacteristic.IngestionTime) { + return this; + } + else { + throw new InvalidProgramException( + "Cannot use EventTime policy in a dataflow that runs on " + characteristic); + } + } + // ------------------------------------------------------------------------ + // Factory + // ------------------------------------------------------------------------ + + /** + * Creates an event time policy describing an event time interval. + * + * @param num The length of the time interval. + * @param unit The init (seconds, milliseconds) of the time interval. + * @return The event time policy. + */ + public static EventTime of(long num, TimeUnit unit) { + return new EventTime(num, unit); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java new file mode 100644 index 0000000..2ff13fa --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.windowpolicy; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.streaming.api.TimeCharacteristic; + +import java.util.concurrent.TimeUnit; + +/** + * The definition of a processing time interval for windowing. See + * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} for a definition + * of processing time. + */ +public final class ProcessingTime extends AbstractTimePolicy { + + private static final long serialVersionUID = 7546166721132583007L; + + /** Instantiation only via factory method */ + private ProcessingTime(long num, TimeUnit unit) { + super(num, unit); + } + + @Override + public ProcessingTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) { + if (characteristic == TimeCharacteristic.ProcessingTime) { + return this; + } + else { + throw new InvalidProgramException( + "Cannot use ProcessingTime policy in a dataflow that runs on " + characteristic); + } + } + + // ------------------------------------------------------------------------ + // Factory + // ------------------------------------------------------------------------ + + /** + * Creates a processing time policy describing a processing time interval. + * + * @param num The length of the time interval. + * @param unit The init (seconds, milliseconds) of the time interval. + * @return The processing time policy. + */ + public static ProcessingTime of(long num, TimeUnit unit) { + return new ProcessingTime(num, unit); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java new file mode 100644 index 0000000..0233e96 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.windowpolicy; + +import org.apache.flink.streaming.api.TimeCharacteristic; + +import java.util.concurrent.TimeUnit; + +/** + * The definition of a time interval for windowing. The time characteristic referred + * to is the default time characteristic set on the execution environment. + */ +public final class Time extends AbstractTimePolicy { + + private static final long serialVersionUID = 3197290738634320211L; + + /** Instantiation only via factory method */ + private Time(long num, TimeUnit unit) { + super(num, unit); + } + + @Override + public AbstractTimePolicy makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) { + switch (timeCharacteristic) { + case ProcessingTime: + return ProcessingTime.of(getNum(), getUnit()); + case IngestionTime: + case EventTime: + return EventTime.of(getNum(), getUnit()); + default: + throw new IllegalArgumentException("Unknown time characteristic"); + } + } + + // ------------------------------------------------------------------------ + // Factory + // ------------------------------------------------------------------------ + + /** + * Creates a time policy describing a processing time interval. The policy refers to the + * time characteristic that is set on the dataflow via + * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment# + * setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}. + * + * @param num The length of the time interval. + * @param unit The init (seconds, milliseconds) of the time interval. + * @return The time policy. + */ + public static Time of(long num, TimeUnit unit) { + return new Time(num, unit); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java new file mode 100644 index 0000000..a82f892 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.windowpolicy; + +import org.apache.flink.streaming.api.TimeCharacteristic; + +/** + * The base class of all window policies. Window policies define how windows + * are formed over the data stream. + */ +public abstract class WindowPolicy implements java.io.Serializable { + + private static final long serialVersionUID = -8696529489282723113L; + + /** + * If this window policies concrete instantiation depends on the time characteristic of the + * dataflow (processing time, event time), then this method must be overridden to convert this + * policy to the respective specific instantiation. + * <p> + * The {@link Time} policy for example, will convert itself to an {@link ProcessingTime} policy, + * if the time characteristic is set to {@link TimeCharacteristic#ProcessingTime}. + * <p> + * By default, this method does nothing and simply returns this object itself. + * + * @param characteristic The time characteristic of the dataflow. + * @return The specific instantiation of this policy, or the policy itself. + */ + public WindowPolicy makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) { + return this; + } + + + public String toString(WindowPolicy slidePolicy) { + if (slidePolicy != null) { + return "Window [" + toString() + ", slide=" + slidePolicy + ']'; + } + else { + return "Window [" + toString() + ']'; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java index e776106..1212123 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java @@ -32,13 +32,13 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory(); - private final KeyedWindowFunction<Key, Type, Result> function; + private final KeyedWindowFunction<Type, Result, Key> function; private long evaluationPass; // ------------------------------------------------------------------------ - public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Key, Type, Result> function) { + public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key> function) { this.keySelector = keySelector; this.function = function; } @@ -75,7 +75,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> { - private final KeyedWindowFunction<Key, Type, Result> function; + private final KeyedWindowFunction<Type, Result, Key> function; private final UnionIterator<Type> unionIterator; @@ -83,7 +83,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed private Key currentKey; - WindowFunctionTraversal(KeyedWindowFunction<Key, Type, Result> function, Collector<Result> out) { + WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> function, Collector<Result> out) { this.function = function; this.out = out; this.unionIterator = new UnionIterator<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java index 16444c2..fb9d163 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java @@ -30,7 +30,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> public AccumulatingProcessingTimeWindowOperator( - KeyedWindowFunction<KEY, IN, OUT> function, + KeyedWindowFunction<IN, OUT, KEY> function, KeySelector<IN, KEY> keySelector, long windowLength, long windowSlide) @@ -41,7 +41,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> @Override protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) { @SuppressWarnings("unchecked") - KeyedWindowFunction<KEY, IN, OUT> windowFunction = (KeyedWindowFunction<KEY, IN, OUT>) function; + KeyedWindowFunction<IN, OUT, KEY> windowFunction = (KeyedWindowFunction<IN, OUT, KEY>) function; return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction); } http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java new file mode 100644 index 0000000..9d06ef5 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime; +import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime; +import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy; + +/** + * This class implements the conversion from window policies to concrete operator + * implementations. + */ +public class PolicyToOperator { + + /** + * Entry point to create an operator for the given window policies and the window function. + */ + public static <IN, OUT, KEY> OneInputStreamOperator<IN, OUT> createOperatorForPolicies( + WindowPolicy window, WindowPolicy slide, Function function, KeySelector<IN, KEY> keySelector) + { + if (window == null || function == null) { + throw new NullPointerException(); + } + + // -- case 1: both policies are processing time policies + if (window instanceof ProcessingTime && (slide == null || slide instanceof ProcessingTime)) { + final long windowLength = ((ProcessingTime) window).toMilliseconds(); + final long windowSlide = slide == null ? windowLength : ((ProcessingTime) slide).toMilliseconds(); + + if (function instanceof ReduceFunction) { + @SuppressWarnings("unchecked") + ReduceFunction<IN> reducer = (ReduceFunction<IN>) function; + + @SuppressWarnings("unchecked") + OneInputStreamOperator<IN, OUT> op = (OneInputStreamOperator<IN, OUT>) + new AggregatingProcessingTimeWindowOperator<KEY, IN>( + reducer, keySelector, windowLength, windowSlide); + return op; + } + else if (function instanceof KeyedWindowFunction) { + @SuppressWarnings("unchecked") + KeyedWindowFunction<IN, OUT, KEY> wf = (KeyedWindowFunction<IN, OUT, KEY>) function; + + return new AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>( + wf, keySelector, windowLength, windowSlide); + } + } + + // -- case 2: both policies are event time policies + if (window instanceof EventTime && (slide == null || slide instanceof EventTime)) { + // add event time implementation + } + + throw new UnsupportedOperationException("The windowing mechanism does not yet support " + window.toString(slide)); + } + + // ------------------------------------------------------------------------ + + /** Don't instantiate */ + private PolicyToOperator() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java index 2e0fe66..f758147 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java @@ -31,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple; public class KeySelectorUtil { - public static <X> KeySelector<X, ?> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) { + public static <X> KeySelector<X, Tuple> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) { if (!(typeInfo instanceof CompositeType)) { throw new InvalidTypesException( "This key operation requires a composite type such as Tuples, POJOs, or Case Classes."); @@ -93,9 +93,15 @@ public class KeySelectorUtil { comparator.extractKeys(value, keyArray, 0); return (K) keyArray[0]; } - } + // ------------------------------------------------------------------------ + + /** + * A key selector for selecting key fields via a TypeComparator. + * + * @param <IN> The type from which the key is extracted. + */ public static class ComparableKeySelector<IN> implements KeySelector<IN, Tuple> { private static final long serialVersionUID = 1L; @@ -126,6 +132,13 @@ public class KeySelectorUtil { } + // ------------------------------------------------------------------------ + + /** + * A key selector for selecting individual array fields as keys and returns them as a Tuple. + * + * @param <IN> The type from which the key is extracted, i.e., the array type. + */ public static final class ArrayKeySelector<IN> implements KeySelector<IN, Tuple> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java index b8b4c13..207b1b1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java @@ -113,7 +113,9 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); - KeyedDataStream<Integer> keyedStream = env.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).keyBy(new ModKey(4)); + KeyedDataStream<Integer, Integer> keyedStream = env + .fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6)) + .keyBy(new ModKey(4)); keyedStream.map(new StatefulMapper()).addSink(new SinkFunction<String>() { private static final long serialVersionUID = 1L; @@ -163,7 +165,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase { @SuppressWarnings("unchecked") private StreamMap<Integer, String> createOperatorWithContext(List<String> output, - KeySelector<Integer, Serializable> partitioner, byte[] serializedState) throws Exception { + KeySelector<Integer, ? extends Serializable> partitioner, byte[] serializedState) throws Exception { final List<String> outputList = output; StreamingRuntimeContext context = new StreamingRuntimeContext( @@ -355,7 +357,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase { } - public static class ModKey implements KeySelector<Integer, Serializable> { + public static class ModKey implements KeySelector<Integer, Integer> { private static final long serialVersionUID = 4193026742083046736L; http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java index 7387a1e..e52c2cb 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java @@ -22,15 +22,16 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; -import org.apache.flink.streaming.runtime.operators.windows.AggregatingProcessingTimeWindowOperator; +import org.apache.flink.streaming.api.windowing.windowpolicy.Time; import org.apache.flink.util.Collector; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + @SuppressWarnings("serial") public class GroupedProcessingTimeWindowExample { @@ -75,31 +76,20 @@ public class GroupedProcessingTimeWindowExample { }); stream - .groupBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>()) -// .window(Time.of(2500, TimeUnit.MILLISECONDS)).every(Time.of(500, TimeUnit.MILLISECONDS)) -// .reduceWindow(new SummingReducer()) -// .flatten() -// .partitionByHash(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>()) -// .transform( -// "Aligned time window", -// TypeInfoParser.<Tuple2<Long, Long>>parse("Tuple2<Long, Long>"), -// new AccumulatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>, Tuple2<Long, Long>>( -// new SummingWindowFunction<Long>(), -// new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(), -// 2500, 500)) - .transform( - "Aligned time window", - TypeInfoParser.<Tuple2<Long, Long>>parse("Tuple2<Long, Long>"), - new AggregatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>>( - new SummingReducer(), - new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(), - 2500, 500)) + .keyBy(0) + .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) + .reduceWindow(new SummingReducer()) + + // alternative: use a mapWindow function which does not pre-aggregate +// .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>()) +// .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) +// .mapWindow(new SummingWindowFunction()) .addSink(new SinkFunction<Tuple2<Long, Long>>() { - @Override - public void invoke(Tuple2<Long, Long> value) { - } - }); + @Override + public void invoke(Tuple2<Long, Long> value) { + } + }); env.execute(); } @@ -113,47 +103,16 @@ public class GroupedProcessingTimeWindowExample { } } - public static class IdentityKeyExtractor<T> implements KeySelector<T, T> { - - @Override - public T getKey(T value) { - return value; - } - } - - public static class IdentityWindowFunction<K, T> implements KeyedWindowFunction<K, T, T> { - - @Override - public void evaluate(K k, Iterable<T> values, Collector<T> out) throws Exception { - for (T v : values) { - out.collect(v); - } - } - } - - public static class CountingWindowFunction<K, T> implements KeyedWindowFunction<K, T, Long> { - - @Override - public void evaluate(K k, Iterable<T> values, Collector<Long> out) throws Exception { - long count = 0; - for (T ignored : values) { - count++; - } - - out.collect(count); - } - } - - public static class SummingWindowFunction<K> implements KeyedWindowFunction<K, Tuple2<K, Long>, Tuple2<K, Long>> { + public static class SummingWindowFunction implements KeyedWindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> { @Override - public void evaluate(K key, Iterable<Tuple2<K, Long>> values, Collector<Tuple2<K, Long>> out) throws Exception { + public void evaluate(Long key, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) { long sum = 0L; - for (Tuple2<K, Long> value : values) { + for (Tuple2<Long, Long> value : values) { sum += value.f1; } - out.collect(new Tuple2<K, Long>(key, sum)); + out.collect(new Tuple2<>(key, sum)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 2bb6a6a..2f4bd23 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -21,8 +21,8 @@ package org.apache.flink.streaming.api.scala import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import org.apache.flink.api.common.functions.{ReduceFunction, FlatMapFunction, MapFunction, - Partitioner, FoldFunction, FilterFunction} +import org.apache.flink.api.java.tuple.{Tuple => JavaTuple} +import org.apache.flink.api.common.functions.{FlatMapFunction, MapFunction, Partitioner, FilterFunction} import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector @@ -30,17 +30,12 @@ import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat import org.apache.flink.core.fs.{FileSystem, Path} import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator} -import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType -import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} import org.apache.flink.streaming.api.functions.sink.SinkFunction -import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, StreamReduce} import org.apache.flink.streaming.api.windowing.helper.WindowingHelper import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy} import org.apache.flink.streaming.util.serialization.SerializationSchema import org.apache.flink.util.Collector -import org.apache.flink.api.common.state.OperatorState import org.apache.flink.api.common.functions.{RichMapFunction, RichFlatMapFunction, RichFilterFunction} -import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.KeyedDataStream import org.apache.flink.streaming.api.scala.function.StatefulFunction @@ -244,20 +239,20 @@ class DataStream[T](javaStream: JavaStream[T]) { * Groups the elements of a DataStream by the given key positions (for tuple/array types) to * be used with grouped operators like grouped reduce or grouped aggregations. */ - def groupBy(fields: Int*): GroupedDataStream[T] = javaStream.groupBy(fields: _*) + def groupBy(fields: Int*): GroupedDataStream[T, JavaTuple] = javaStream.groupBy(fields: _*) /** * Groups the elements of a DataStream by the given field expressions to * be used with grouped operators like grouped reduce or grouped aggregations. */ - def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T] = + def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T, JavaTuple] = javaStream.groupBy(firstField +: otherFields.toArray: _*) /** * Groups the elements of a DataStream by the given K key to * be used with grouped operators like grouped reduce or grouped aggregations. */ - def groupBy[K: TypeInformation](fun: T => K): GroupedDataStream[T] = { + def groupBy[K: TypeInformation](fun: T => K): GroupedDataStream[T, K] = { val cleanFun = clean(fun) val keyExtractor = new KeySelector[T, K] { @@ -605,7 +600,7 @@ class DataStream[T](javaStream: JavaStream[T]) { } private[flink] def isStatePartitioned: Boolean = { - javaStream.isInstanceOf[KeyedDataStream[T]] + javaStream.isInstanceOf[KeyedDataStream[_, _]] } /** http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala index 34f0807..e1a963d 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala @@ -29,7 +29,8 @@ import org.apache.flink.api.common.functions.FoldFunction import org.apache.flink.api.common.functions.ReduceFunction -class GroupedDataStream[T](javaStream: GroupedJavaStream[T]) extends DataStream[T](javaStream){ +class GroupedDataStream[T, K](javaStream: GroupedJavaStream[T, K]) + extends DataStream[T](javaStream) { /** * Creates a new [[DataStream]] by reducing the elements of this DataStream http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 5e02ec5..9d62bcb 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -18,13 +18,15 @@ package org.apache.flink.streaming.api.scala +import java.util.Objects + import com.esotericsoftware.kryo.Serializer import org.apache.flink.api.common.io.{FileInputFormat, InputFormat} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.scala.ClosureCleaner import org.apache.flink.runtime.state.StateHandleProvider -import org.apache.flink.streaming.api.CheckpointingMode +import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode} import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext @@ -294,6 +296,27 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { } // -------------------------------------------------------------------------------------------- + // Time characteristic + // -------------------------------------------------------------------------------------------- + /** + * Sets the time characteristic for the stream, e.g., processing time, event time, + * or ingestion time. + * + * @param characteristic The time characteristic. + */ + def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = { + javaEnv.setStreamTimeCharacteristic(characteristic) + } + + /** + * Gets the time characteristic for the stream, e.g., processing time, event time, + * or ingestion time. + * + * @return The time characteristic. + */ + def getStreamTimeCharacteristic = javaEnv.getStreamTimeCharacteristic() + + // -------------------------------------------------------------------------------------------- // Data stream creations // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala index 2eb4f9e..59843e2 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala @@ -38,8 +38,8 @@ package object scala { implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] = new DataStream[R](javaStream) - implicit def javaToScalaGroupedStream[R](javaStream: GroupedJavaStream[R]): - GroupedDataStream[R] = new GroupedDataStream[R](javaStream) + implicit def javaToScalaGroupedStream[R, K](javaStream: GroupedJavaStream[R, K]): + GroupedDataStream[R, K] = new GroupedDataStream[R, K](javaStream) implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] = new WindowedDataStream[R](javaWStream)
