[scala] [streaming] Extended scala data stream functionality to include simple 
operators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40efecf7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40efecf7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40efecf7

Branch: refs/heads/release-0.8
Commit: 40efecf7cf79deb02f070bedc9980c91886b66ef
Parents: 6b1fd15
Author: Gyula Fora <[email protected]>
Authored: Fri Dec 12 00:12:49 2014 +0100
Committer: mbalassi <[email protected]>
Committed: Mon Jan 5 17:49:30 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  28 +++
 .../api/datastream/GroupedDataStream.java       |   4 +
 .../flink/api/scala/streaming/DataStream.scala  | 233 ++++++++++++++++++-
 3 files changed, 257 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40efecf7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 04929c1..1cf8d72 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -677,6 +677,20 @@ public class DataStream<OUT> {
        public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
                return this.minBy(positionToMinBy, true);
        }
+       
+       /**
+        * Applies an aggregation that that gives the current element with the
+        * minimum value at the given position, if more elements have the 
minimum
+        * value at the given position, the operator returns the first one by
+        * default.
+        * 
+        * @param positionToMinBy
+        *            The position in the data point to minimize
+        * @return The transformed DataStream.
+        */
+       public SingleOutputStreamOperator<OUT, ?> minBy(String positionToMinBy) 
{
+               return this.minBy(positionToMinBy, true);
+       }
 
        /**
         * Applies an aggregation that that gives the current element with the
@@ -710,6 +724,20 @@ public class DataStream<OUT> {
        public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
                return this.maxBy(positionToMaxBy, true);
        }
+       
+       /**
+        * Applies an aggregation that that gives the current element with the
+        * maximum value at the given position, if more elements have the 
maximum
+        * value at the given position, the operator returns the first one by
+        * default.
+        * 
+        * @param positionToMaxBy
+        *            The position in the data point to maximize
+        * @return The transformed DataStream.
+        */
+       public SingleOutputStreamOperator<OUT, ?> maxBy(String positionToMaxBy) 
{
+               return this.maxBy(positionToMaxBy, true);
+       }
 
        /**
         * Applies an aggregation that that gives the current element with the

http://git-wip-us.apache.org/repos/asf/flink/blob/40efecf7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index a2c0f89..18b4b75 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -47,6 +47,10 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
                this.keySelector = dataStream.keySelector;
        }
 
+       public KeySelector<OUT, ?> getKeySelector() {
+               return this.keySelector;
+       }
+
        /**
         * Applies a reduce transformation on the grouped data stream grouped 
on by
         * the given key position. The {@link ReduceFunction} will receive input

http://git-wip-us.apache.org/repos/asf/flink/blob/40efecf7/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index 711ce7c..b10bdc6 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -17,15 +17,28 @@
  */
 
 package org.apache.flink.api.scala.streaming
+import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.streaming.api.invokable.StreamInvokable
+import org.apache.flink.streaming.api.datastream.GroupedDataStream
+import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable
+import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable
+import org.apache.flink.streaming.api.datastream.GroupedDataStream
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.common.functions.FilterFunction
 
-class DataStream[OUT](javaStream: JavaStream[OUT]) {
+class DataStream[T](javaStream: JavaStream[T]) {
 
   /* This code is originally from the Apache Spark project. */
   /**
@@ -46,29 +59,233 @@ class DataStream[OUT](javaStream: JavaStream[OUT]) {
   }
 
   /**
+   * Gets the underlying java DataStream object.
+   */
+  private[flink] def getJavaStream: JavaStream[T] = javaStream
+
+  /**
+   * Sets the degree of parallelism of this operation. This must be greater 
than 1.
+   */
+  def setParallelism(dop: Int) = {
+    javaStream match {
+      case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop)
+      case _ =>
+        throw new UnsupportedOperationException("Operator " + 
javaStream.toString + " cannot have " +
+          "parallelism.")
+    }
+    this
+  }
+
+  /**
+   * Returns the degree of parallelism of this operation.
+   */
+  def getParallelism: Int = javaStream match {
+    case op: SingleOutputStreamOperator[_, _] => op.getParallelism
+    case _ =>
+      throw new UnsupportedOperationException("Operator " + 
javaStream.toString + " does not have " +
+        "parallelism.")
+  }
+
+  def merge(dataStreams: DataStream[T]*): DataStream[T] =
+    new DataStream[T](javaStream.merge(dataStreams.map(_.getJavaStream): _*))
+
+  def groupBy(fields: Int*): DataStream[T] =
+    new DataStream[T](javaStream.groupBy(fields: _*))
+
+  def groupBy(firstField: String, otherFields: String*): DataStream[T] =
+    new DataStream[T](javaStream.groupBy(firstField +: otherFields.toArray: 
_*))
+
+  def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
+
+    val keyExtractor = new KeySelector[T, K] {
+      val cleanFun = clean(fun)
+      def getKey(in: T) = cleanFun(in)
+    }
+    new DataStream[T](javaStream.groupBy(keyExtractor))
+  }
+
+  def partitionBy(fields: Int*): DataStream[T] =
+    new DataStream[T](javaStream.partitionBy(fields: _*))
+
+  def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
+    new DataStream[T](javaStream.partitionBy(firstField +: 
otherFields.toArray: _*))
+
+  def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
+
+    val keyExtractor = new KeySelector[T, K] {
+      val cleanFun = clean(fun)
+      def getKey(in: T) = cleanFun(in)
+    }
+    new DataStream[T](javaStream.partitionBy(keyExtractor))
+  }
+
+  def broadcast: DataStream[T] = new DataStream[T](javaStream.broadcast())
+
+  def shuffle: DataStream[T] = new DataStream[T](javaStream.shuffle())
+
+  def forward: DataStream[T] = new DataStream[T](javaStream.forward())
+
+  def distribute: DataStream[T] = new DataStream[T](javaStream.distribute())
+
+  def max(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.max(field))
+    case field: String => return new DataStream[T](javaStream.max(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
+  }
+
+  def min(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.min(field))
+    case field: String => return new DataStream[T](javaStream.min(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
+  }
+
+  def sum(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.sum(field))
+    case field: String => return new DataStream[T](javaStream.sum(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
+  }
+
+  def maxBy(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.maxBy(field))
+    case field: String => return new DataStream[T](javaStream.maxBy(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
+  }
+
+  def minBy(field: Any): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.minBy(field))
+    case field: String => return new DataStream[T](javaStream.minBy(field))
+    case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
+  }
+
+  def minBy(field: Any, first: Boolean): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.minBy(field, first))
+    case field: String => return new DataStream[T](javaStream.minBy(field, 
first))
+    case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
+  }
+
+  def maxBy(field: Any, first: Boolean): DataStream[T] = field match {
+    case field: Int => return new DataStream[T](javaStream.maxBy(field, first))
+    case field: String => return new DataStream[T](javaStream.maxBy(field, 
first))
+    case _ => throw new IllegalArgumentException("Aggregations are only 
supported by field position (Int) or field expression (String)")
+  }
+
+  def count: DataStream[java.lang.Long] = new 
DataStream[java.lang.Long](javaStream.count())
+
+  /**
    * Creates a new DataStream by applying the given function to every element 
of this DataStream.
    */
-  def map[R: TypeInformation: ClassTag](fun: OUT => R): DataStream[R] = {
+  def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("Map function must not be null.")
     }
-    val mapper = new MapFunction[OUT, R] {
+    val mapper = new MapFunction[T, R] {
       val cleanFun = clean(fun)
-      def map(in: OUT): R = cleanFun(in)
+      def map(in: T): R = cleanFun(in)
     }
 
-    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], 
new MapInvokable[OUT, R](mapper)))
+    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], 
new MapInvokable[T, R](mapper)))
   }
 
   /**
    * Creates a new DataStream by applying the given function to every element 
of this DataStream.
    */
-  def map[R: TypeInformation: ClassTag](mapper: MapFunction[OUT, R]): 
DataStream[R] = {
+  def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): 
DataStream[R] = {
     if (mapper == null) {
       throw new NullPointerException("Map function must not be null.")
     }
 
-    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], 
new MapInvokable[OUT, R](mapper)))
+    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], 
new MapInvokable[T, R](mapper)))
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element 
and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, 
R]): DataStream[R] = {
+    if (flatMapper == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    new DataStream[R](javaStream.transform("flatMap", 
implicitly[TypeInformation[R]], new FlatMapInvokable[T, R](flatMapper)))
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element 
and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): 
DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    val flatMapper = new FlatMapFunction[T, R] {
+      val cleanFun = clean(fun)
+      def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) }
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element 
and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): 
DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    val flatMapper = new FlatMapFunction[T, R] {
+      val cleanFun = clean(fun)
+      def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect 
}
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
+   * Creates a new [[DataStream]] by merging the elements of this DataStream 
using an associative reduce
+   * function.
+   */
+  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
+    if (reducer == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    javaStream match {
+      case ds: GroupedDataStream[_] => new 
DataStream[T](javaStream.transform("reduce", javaStream.getType(), new 
GroupedReduceInvokable[T](reducer, ds.getKeySelector())))
+      case _ => new DataStream[T](javaStream.transform("reduce", 
javaStream.getType(), new StreamReduceInvokable[T](reducer)))
+    }
+  }
+
+  /**
+   * Creates a new [[DataStream]] by merging the elements of this DataStream 
using an associative reduce
+   * function.
+   */
+  def reduce(fun: (T, T) => T): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val reducer = new ReduceFunction[T] {
+      val cleanFun = clean(fun)
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
+  }
+
+  /**
+   * Creates a new DataSet that contains only the elements satisfying the 
given filter predicate.
+   */
+  def filter(filter: FilterFunction[T]): DataStream[T] = {
+    if (filter == null) {
+      throw new NullPointerException("Filter function must not be null.")
+    }
+    new DataStream[T](javaStream.filter(filter))
+  }
+
+  def filter(fun: T => Boolean): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Filter function must not be null.")
+    }
+    val filter = new FilterFunction[T] {
+      val cleanFun = clean(fun)
+      def filter(in: T) = cleanFun(in)
+    }
+    this.filter(filter)
   }
 
   def print() = javaStream.print()

Reply via email to