[scala] [streaming] Windowing functionality added to scala api
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/80393c4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/80393c4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/80393c4a Branch: refs/heads/master Commit: 80393c4a1901e2672349b76cb7b3476dcb674edb Parents: de06d95 Author: Gyula Fora <[email protected]> Authored: Mon Dec 15 16:21:00 2014 +0100 Committer: Gyula Fora <[email protected]> Committed: Fri Jan 2 18:34:38 2015 +0100 ---------------------------------------------------------------------- .../api/datastream/WindowedDataStream.java | 54 +++++ .../flink/api/scala/streaming/DataStream.scala | 27 +++ .../streaming/StreamExecutionEnvironment.scala | 14 +- .../scala/streaming/WindowedDataStream.scala | 232 +++++++++++++++++++ 4 files changed, 326 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/80393c4a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index cb9cd04..287f29d 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -260,6 +260,30 @@ public class WindowedDataStream<OUT> { } /** + * Applies a reduceGroup transformation on the windowed data stream by + * reducing the current window at every trigger. In contrast with the + * standard binary reducer, with reduceGroup the user can access all + * elements of the window at the same time through the iterable interface. + * The user can also extend the {@link RichGroupReduceFunction} to gain + * access to other features provided by the + * {@link org.apache.flink.api.common.functions.RichFunction} interface. + * </br> </br> This version of reduceGroup uses user supplied + * typeinformation for serializaton. Use this only when the system is unable + * to detect type information using: + * {@link #reduceGroup(GroupReduceFunction)} + * + * @param reduceFunction + * The reduce function that will be applied to the windows. + * @return The transformed DataStream + */ + public <R> SingleOutputStreamOperator<R, ?> reduceGroup( + GroupReduceFunction<OUT, R> reduceFunction, TypeInformation<R> outType) { + + return dataStream.transform("NextGenWindowReduce", outType, + getReduceGroupInvokable(reduceFunction)); + } + + /** * Applies an aggregation that sums every window of the data stream at the * given position. * @@ -335,6 +359,19 @@ public class WindowedDataStream<OUT> { /** * Applies an aggregation that gives the minimum element of every window of * the data stream by the given position. If more elements have the same + * minimum value the operator returns the first element by default. + * + * @param positionToMinBy + * The position to minimize by + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator<OUT, ?> minBy(String positionToMinBy) { + return this.minBy(positionToMinBy, true); + } + + /** + * Applies an aggregation that gives the minimum element of every window of + * the data stream by the given position. If more elements have the same * minimum value the operator returns either the first or last one depending * on the parameter setting. * @@ -418,6 +455,19 @@ public class WindowedDataStream<OUT> { /** * Applies an aggregation that gives the maximum element of every window of * the data stream by the given position. If more elements have the same + * maximum value the operator returns the first by default. + * + * @param positionToMaxBy + * The position to maximize by + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator<OUT, ?> maxBy(String positionToMaxBy) { + return this.maxBy(positionToMaxBy, true); + } + + /** + * Applies an aggregation that gives the maximum element of every window of + * the data stream by the given position. If more elements have the same * maximum value the operator returns either the first or last one depending * on the parameter setting. * @@ -598,6 +648,10 @@ public class WindowedDataStream<OUT> { return dataStream.getType(); } + public DataStream<OUT> getDataStream() { + return dataStream; + } + protected WindowedDataStream<OUT> copy() { return new WindowedDataStream<OUT>(this); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/80393c4a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala index e96f5eb..69b8359 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala @@ -38,6 +38,10 @@ import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.common.functions.FilterFunction import org.apache.flink.streaming.api.function.sink.SinkFunction +import org.apache.flink.streaming.api.windowing.helper.WindowingHelper +import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy +import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy +import scala.collection.JavaConversions._ class DataStream[T](javaStream: JavaStream[T]) { @@ -397,6 +401,29 @@ class DataStream[T](javaStream: JavaStream[T]) { } /** + * Create a WindowedDataStream that can be used to apply + * transformation like .reduce(...) or aggregations on + * preset chunks(windows) of the data stream. To define the windows one or + * more WindowingHelper-s such as Time, Count and + * Delta can be used.</br></br> When applied to a grouped data + * stream, the windows (evictions) and slide sizes (triggers) will be + * computed on a per group basis. </br></br> For more advanced control over + * the trigger and eviction policies please use to + * window(List(triggers), List(evicters)) + */ + def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = new WindowedDataStream[T](javaStream.window(windowingHelper: _*)) + + /** + * Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s. + * Windowing can be used to apply transformation like .reduce(...) or aggregations on + * preset chunks(windows) of the data stream.</br></br>For most common + * use-cases please refer to window(WindowingHelper[_]*) + * + */ + def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]): WindowedDataStream[T] = new WindowedDataStream[T](javaStream.window(triggers, evicters)) + + /** + * >>>>>>> 12178aa... [scala] [streaming] Windowing functionality added to scala api * Writes a DataStream to the standard output stream (stdout). For each * element of the DataStream the result of .toString is * written. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/80393c4a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala index e4a7b48..dadfde2 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.streaming +import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.environment.{ StreamExecutionEnvironment => JavaEnv } import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.commons.lang.Validate @@ -26,6 +27,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource import org.apache.flink.streaming.api.invokable.SourceInvokable import org.apache.flink.streaming.api.function.source.FromElementsFunction import org.apache.flink.streaming.api.function.source.SourceFunction +import scala.collection.JavaConversions._ +import org.apache.flink.util.Collector class StreamExecutionEnvironment(javaEnv: JavaEnv) { @@ -113,7 +116,16 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * Creates a new DataStream that contains a sequence of numbers. * */ - def generateSequence(from: Long, to: Long): DataStream[java.lang.Long] = new DataStream(javaEnv.generateSequence(from, to)) + def generateSequence(from: Long, to: Long): DataStream[Long] = { + val source = new SourceFunction[Long] { + override def invoke(out: Collector[Long]) = { + for (i <- from.to(to)) { + out.collect(i) + } + } + } + addSource(source) + } /** * Creates a DataStream that contains the given elements. The elements must all be of the http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/80393c4a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala new file mode 100644 index 0000000..ff89a47 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.streaming +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } +import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream } +import org.apache.flink.api.common.typeinfo.TypeInformation +import scala.reflect.ClassTag +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.streaming.api.invokable.operator.MapInvokable +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator +import org.apache.flink.util.Collector +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.streaming.api.windowing.helper.WindowingHelper +import org.apache.flink.api.common.functions.GroupReduceFunction +import org.apache.flink.streaming.api.invokable.StreamInvokable +import scala.collection.JavaConversions._ + +class WindowedDataStream[T](javaStream: JavaWStream[T]) { + + private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { + ClosureCleaner.clean(f, checkSerializable) + f + } + + /** + * Defines the slide size (trigger frequency) for the windowed data stream. + * This controls how often the user defined function will be triggered on + * the window. + */ + def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = new WindowedDataStream[T](javaStream.every(windowingHelper: _*)) + + /** + * Groups the elements of the WindowedDataStream using the given + * field positions. The window sizes (evictions) and slide sizes + * (triggers) will be calculated on the whole stream (in a central fashion), + * but the user defined functions will be applied on a per group basis. + * </br></br> To get windows and triggers on a per group basis apply the + * DataStream.window(...) operator on an already grouped data stream. + * + */ + def groupBy(fields: Int*): WindowedDataStream[T] = + new WindowedDataStream[T](javaStream.groupBy(fields: _*)) + + /** + * Groups the elements of the WindowedDataStream using the given + * field expressions. The window sizes (evictions) and slide sizes + * (triggers) will be calculated on the whole stream (in a central fashion), + * but the user defined functions will be applied on a per group basis. + * </br></br> To get windows and triggers on a per group basis apply the + * DataStream.window(...) operator on an already grouped data stream. + * + */ + def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] = + new WindowedDataStream[T](javaStream.groupBy(firstField +: otherFields.toArray: _*)) + + /** + * Groups the elements of the WindowedDataStream using the given + * KeySelector function. The window sizes (evictions) and slide sizes + * (triggers) will be calculated on the whole stream (in a central fashion), + * but the user defined functions will be applied on a per group basis. + * </br></br> To get windows and triggers on a per group basis apply the + * DataStream.window(...) operator on an already grouped data stream. + * + */ + def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = { + + val keyExtractor = new KeySelector[T, K] { + val cleanFun = clean(fun) + def getKey(in: T) = cleanFun(in) + } + new WindowedDataStream[T](javaStream.groupBy(keyExtractor)) + } + + /** + * Applies a reduce transformation on the windowed data stream by reducing + * the current window at every trigger. + * + */ + def reduce(reducer: ReduceFunction[T]): DataStream[T] = { + if (reducer == null) { + throw new NullPointerException("Reduce function must not be null.") + } + new DataStream[T](javaStream.reduce(reducer)) + } + + /** + * Applies a reduce transformation on the windowed data stream by reducing + * the current window at every trigger. + * + */ + def reduce(fun: (T, T) => T): DataStream[T] = { + if (fun == null) { + throw new NullPointerException("Reduce function must not be null.") + } + val reducer = new ReduceFunction[T] { + val cleanFun = clean(fun) + def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } + } + reduce(reducer) + } + + /** + * Applies a reduceGroup transformation on the windowed data stream by reducing + * the current window at every trigger. In contrast with the simple binary reduce operator, groupReduce exposes the whole window through the Iterable interface. + * </br> + * </br> + * Whenever possible try to use reduce instead of groupReduce for increased efficiency + */ + def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]): DataStream[R] = { + if (reducer == null) { + throw new NullPointerException("GroupReduce function must not be null.") + } + new DataStream[R](javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]])) + } + + /** + * Applies a reduceGroup transformation on the windowed data stream by reducing + * the current window at every trigger. In contrast with the simple binary reduce operator, groupReduce exposes the whole window through the Iterable interface. + * </br> + * </br> + * Whenever possible try to use reduce instead of groupReduce for increased efficiency + */ + def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit): DataStream[R] = { + if (fun == null) { + throw new NullPointerException("GroupReduce function must not be null.") + } + val reducer = new GroupReduceFunction[T, R] { + val cleanFun = clean(fun) + def reduce(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in, out) } + } + reduceGroup(reducer) + } + + /** + * Applies an aggregation that that gives the maximum of the elements in the window at + * the given position. + * + */ + def max(field: Any): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.max(field)) + case field: String => return new DataStream[T](javaStream.max(field)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + + /** + * Applies an aggregation that that gives the minimum of the elements in the window at + * the given position. + * + */ + def min(field: Any): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.min(field)) + case field: String => return new DataStream[T](javaStream.min(field)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + + /** + * Applies an aggregation that sums the elements in the window at the given position. + * + */ + def sum(field: Any): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.sum(field)) + case field: String => return new DataStream[T](javaStream.sum(field)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + + /** + * Applies an aggregation that that gives the maximum element of the window by + * the given position. When equality, returns the first. + * + */ + def maxBy(field: Any): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.maxBy(field)) + case field: String => return new DataStream[T](javaStream.maxBy(field)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + + /** + * Applies an aggregation that that gives the minimum element of the window by + * the given position. When equality, returns the first. + * + */ + def minBy(field: Any): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.minBy(field)) + case field: String => return new DataStream[T](javaStream.minBy(field)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + + /** + * Applies an aggregation that that gives the minimum element of the window by + * the given position. When equality, the user can set to get the first or last element with the minimal value. + * + */ + def minBy(field: Any, first: Boolean): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.minBy(field, first)) + case field: String => return new DataStream[T](javaStream.minBy(field, first)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + + /** + * Applies an aggregation that that gives the maximum element of the window by + * the given position. When equality, the user can set to get the first or last element with the maximal value. + * + */ + def maxBy(field: Any, first: Boolean): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.maxBy(field, first)) + case field: String => return new DataStream[T](javaStream.maxBy(field, first)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + +} \ No newline at end of file
