[scala] [streaming] Extended scala data stream functionality to include simple operators
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c123e11a Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c123e11a Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c123e11a Branch: refs/heads/master Commit: c123e11a01a801419f2ed53814b22d0ad638f98c Parents: 34353f6 Author: Gyula Fora <[email protected]> Authored: Fri Dec 12 00:12:49 2014 +0100 Committer: Gyula Fora <[email protected]> Committed: Fri Jan 2 18:34:38 2015 +0100 ---------------------------------------------------------------------- .../streaming/api/datastream/DataStream.java | 28 +++ .../api/datastream/GroupedDataStream.java | 4 + .../flink/api/scala/streaming/DataStream.scala | 233 ++++++++++++++++++- 3 files changed, 257 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c123e11a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 04929c1..1cf8d72 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -677,6 +677,20 @@ public class DataStream<OUT> { public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) { return this.minBy(positionToMinBy, true); } + + /** + * Applies an aggregation that that gives the current element with the + * minimum value at the given position, if more elements have the minimum + * value at the given position, the operator returns the first one by + * default. + * + * @param positionToMinBy + * The position in the data point to minimize + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator<OUT, ?> minBy(String positionToMinBy) { + return this.minBy(positionToMinBy, true); + } /** * Applies an aggregation that that gives the current element with the @@ -710,6 +724,20 @@ public class DataStream<OUT> { public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } + + /** + * Applies an aggregation that that gives the current element with the + * maximum value at the given position, if more elements have the maximum + * value at the given position, the operator returns the first one by + * default. + * + * @param positionToMaxBy + * The position in the data point to maximize + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator<OUT, ?> maxBy(String positionToMaxBy) { + return this.maxBy(positionToMaxBy, true); + } /** * Applies an aggregation that that gives the current element with the http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c123e11a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java index a2c0f89..18b4b75 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java @@ -47,6 +47,10 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> { this.keySelector = dataStream.keySelector; } + public KeySelector<OUT, ?> getKeySelector() { + return this.keySelector; + } + /** * Applies a reduce transformation on the grouped data stream grouped on by * the given key position. The {@link ReduceFunction} will receive input http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c123e11a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala index 711ce7c..b10bdc6 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala @@ -17,15 +17,28 @@ */ package org.apache.flink.api.scala.streaming +import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } -import org.apache.flink.api.scala.ClosureCleaner import org.apache.flink.api.common.typeinfo.TypeInformation import scala.reflect.ClassTag import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.streaming.api.invokable.operator.MapInvokable +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator +import org.apache.flink.util.Collector +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.streaming.api.invokable.StreamInvokable +import org.apache.flink.streaming.api.datastream.GroupedDataStream +import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable +import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable +import org.apache.flink.streaming.api.datastream.GroupedDataStream +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.common.functions.FilterFunction -class DataStream[OUT](javaStream: JavaStream[OUT]) { +class DataStream[T](javaStream: JavaStream[T]) { /* This code is originally from the Apache Spark project. */ /** @@ -46,29 +59,233 @@ class DataStream[OUT](javaStream: JavaStream[OUT]) { } /** + * Gets the underlying java DataStream object. + */ + private[flink] def getJavaStream: JavaStream[T] = javaStream + + /** + * Sets the degree of parallelism of this operation. This must be greater than 1. + */ + def setParallelism(dop: Int) = { + javaStream match { + case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop) + case _ => + throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot have " + + "parallelism.") + } + this + } + + /** + * Returns the degree of parallelism of this operation. + */ + def getParallelism: Int = javaStream match { + case op: SingleOutputStreamOperator[_, _] => op.getParallelism + case _ => + throw new UnsupportedOperationException("Operator " + javaStream.toString + " does not have " + + "parallelism.") + } + + def merge(dataStreams: DataStream[T]*): DataStream[T] = + new DataStream[T](javaStream.merge(dataStreams.map(_.getJavaStream): _*)) + + def groupBy(fields: Int*): DataStream[T] = + new DataStream[T](javaStream.groupBy(fields: _*)) + + def groupBy(firstField: String, otherFields: String*): DataStream[T] = + new DataStream[T](javaStream.groupBy(firstField +: otherFields.toArray: _*)) + + def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = { + + val keyExtractor = new KeySelector[T, K] { + val cleanFun = clean(fun) + def getKey(in: T) = cleanFun(in) + } + new DataStream[T](javaStream.groupBy(keyExtractor)) + } + + def partitionBy(fields: Int*): DataStream[T] = + new DataStream[T](javaStream.partitionBy(fields: _*)) + + def partitionBy(firstField: String, otherFields: String*): DataStream[T] = + new DataStream[T](javaStream.partitionBy(firstField +: otherFields.toArray: _*)) + + def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = { + + val keyExtractor = new KeySelector[T, K] { + val cleanFun = clean(fun) + def getKey(in: T) = cleanFun(in) + } + new DataStream[T](javaStream.partitionBy(keyExtractor)) + } + + def broadcast: DataStream[T] = new DataStream[T](javaStream.broadcast()) + + def shuffle: DataStream[T] = new DataStream[T](javaStream.shuffle()) + + def forward: DataStream[T] = new DataStream[T](javaStream.forward()) + + def distribute: DataStream[T] = new DataStream[T](javaStream.distribute()) + + def max(field: Any): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.max(field)) + case field: String => return new DataStream[T](javaStream.max(field)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + + def min(field: Any): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.min(field)) + case field: String => return new DataStream[T](javaStream.min(field)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + + def sum(field: Any): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.sum(field)) + case field: String => return new DataStream[T](javaStream.sum(field)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + + def maxBy(field: Any): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.maxBy(field)) + case field: String => return new DataStream[T](javaStream.maxBy(field)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + + def minBy(field: Any): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.minBy(field)) + case field: String => return new DataStream[T](javaStream.minBy(field)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + + def minBy(field: Any, first: Boolean): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.minBy(field, first)) + case field: String => return new DataStream[T](javaStream.minBy(field, first)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + + def maxBy(field: Any, first: Boolean): DataStream[T] = field match { + case field: Int => return new DataStream[T](javaStream.maxBy(field, first)) + case field: String => return new DataStream[T](javaStream.maxBy(field, first)) + case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)") + } + + def count: DataStream[java.lang.Long] = new DataStream[java.lang.Long](javaStream.count()) + + /** * Creates a new DataStream by applying the given function to every element of this DataStream. */ - def map[R: TypeInformation: ClassTag](fun: OUT => R): DataStream[R] = { + def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = { if (fun == null) { throw new NullPointerException("Map function must not be null.") } - val mapper = new MapFunction[OUT, R] { + val mapper = new MapFunction[T, R] { val cleanFun = clean(fun) - def map(in: OUT): R = cleanFun(in) + def map(in: T): R = cleanFun(in) } - new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[OUT, R](mapper))) + new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))) } /** * Creates a new DataStream by applying the given function to every element of this DataStream. */ - def map[R: TypeInformation: ClassTag](mapper: MapFunction[OUT, R]): DataStream[R] = { + def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataStream[R] = { if (mapper == null) { throw new NullPointerException("Map function must not be null.") } - new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[OUT, R](mapper))) + new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))) + } + + /** + * Creates a new DataStream by applying the given function to every element and flattening + * the results. + */ + def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataStream[R] = { + if (flatMapper == null) { + throw new NullPointerException("FlatMap function must not be null.") + } + new DataStream[R](javaStream.transform("flatMap", implicitly[TypeInformation[R]], new FlatMapInvokable[T, R](flatMapper))) + } + + /** + * Creates a new DataStream by applying the given function to every element and flattening + * the results. + */ + def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataStream[R] = { + if (fun == null) { + throw new NullPointerException("FlatMap function must not be null.") + } + val flatMapper = new FlatMapFunction[T, R] { + val cleanFun = clean(fun) + def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) } + } + flatMap(flatMapper) + } + + /** + * Creates a new DataStream by applying the given function to every element and flattening + * the results. + */ + def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataStream[R] = { + if (fun == null) { + throw new NullPointerException("FlatMap function must not be null.") + } + val flatMapper = new FlatMapFunction[T, R] { + val cleanFun = clean(fun) + def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect } + } + flatMap(flatMapper) + } + + /** + * Creates a new [[DataStream]] by merging the elements of this DataStream using an associative reduce + * function. + */ + def reduce(reducer: ReduceFunction[T]): DataStream[T] = { + if (reducer == null) { + throw new NullPointerException("Reduce function must not be null.") + } + javaStream match { + case ds: GroupedDataStream[_] => new DataStream[T](javaStream.transform("reduce", javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector()))) + case _ => new DataStream[T](javaStream.transform("reduce", javaStream.getType(), new StreamReduceInvokable[T](reducer))) + } + } + + /** + * Creates a new [[DataStream]] by merging the elements of this DataStream using an associative reduce + * function. + */ + def reduce(fun: (T, T) => T): DataStream[T] = { + if (fun == null) { + throw new NullPointerException("Reduce function must not be null.") + } + val reducer = new ReduceFunction[T] { + val cleanFun = clean(fun) + def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } + } + reduce(reducer) + } + + /** + * Creates a new DataSet that contains only the elements satisfying the given filter predicate. + */ + def filter(filter: FilterFunction[T]): DataStream[T] = { + if (filter == null) { + throw new NullPointerException("Filter function must not be null.") + } + new DataStream[T](javaStream.filter(filter)) + } + + def filter(fun: T => Boolean): DataStream[T] = { + if (fun == null) { + throw new NullPointerException("Filter function must not be null.") + } + val filter = new FilterFunction[T] { + val cleanFun = clean(fun) + def filter(in: T) = cleanFun(in) + } + this.filter(filter) } def print() = javaStream.print()
