[scala] [streaming] added initial ConnectedDataStream support (no 
windowing,batching) for the scala api


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/55ef7954
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/55ef7954
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/55ef7954

Branch: refs/heads/master
Commit: 55ef7954bfbcc33ac1367b93762918055d496495
Parents: a761cdc
Author: carbone <[email protected]>
Authored: Sat Jan 3 19:46:22 2015 +0100
Committer: carbone <[email protected]>
Committed: Sat Jan 3 20:22:10 2015 +0100

----------------------------------------------------------------------
 .../scala/streaming/ConnectedDataStream.scala   | 369 +++++++++++++++++++
 1 file changed, 369 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ef7954/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
new file mode 100644
index 0000000..1c3f5a1
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+
+import java.util
+
+import scala.collection.JavaConversions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment._
+import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => 
JavaCStream}
+import org.apache.flink.streaming.api.function.co.{CoWindowFunction, 
CoFlatMapFunction, CoMapFunction, CoReduceFunction}
+import 
org.apache.flink.streaming.api.invokable.operator.co.{CoFlatMapInvokable, 
CoMapInvokable, CoReduceInvokable}
+import org.apache.flink.util.Collector
+
+
+import scala.reflect.ClassTag
+
+class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) {
+
+  /**
+   * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
+   * the output to a common type. The transformation calls a
+   * @param fun1 for each element of the first input and
+   * @param fun2 for each element of the second input. Each
+   * CoMapFunction call returns exactly one element.
+   *
+        * The CoMapFunction used to jointly transform the two input
+   * DataStreams
+   * @return The transformed { @link DataStream}
+   */
+  def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): 
DataStream[R] = {
+    if (fun1 == null || fun2 == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+    val comapper = new CoMapFunction[IN1,IN2,R] {
+       def map1(in1: IN1): R = clean(fun1)(in1)
+       def map2(in2: IN2): R = clean(fun2)(in2)
+    }
+
+    new 
DataStream(javaStream.addCoFunction("map",implicitly[TypeInformation[R]],
+    new CoMapInvokable[IN1,IN2,R](comapper)))
+  }
+
+  /**
+   * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
+   * the output to a common type. The transformation calls a
+   * {@link CoMapFunction#map1} for each element of the first input and
+   * {@link CoMapFunction#map2} for each element of the second input. Each
+   * CoMapFunction call returns exactly one element. The user can also extend
+   * {@link RichCoMapFunction} to gain access to other features provided by
+   * the {@link RichFuntion} interface.
+   *
+   * @param coMapper
+        * The CoMapFunction used to jointly transform the two input
+   * DataStreams
+   * @return The transformed { @link DataStream}
+   */
+  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1,IN2,R]): 
DataStream[R] = {
+    if (coMapper == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+
+    new 
DataStream(javaStream.addCoFunction("map",implicitly[TypeInformation[R]],
+      new CoMapInvokable[IN1,IN2,R](coMapper)))
+  }
+
+  /**
+   * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
+   * maps the output to a common type. The transformation calls a
+   * {@link CoFlatMapFunction#flatMap1} for each element of the first input
+   * and {@link CoFlatMapFunction#flatMap2} for each element of the second
+   * input. Each CoFlatMapFunction call returns any number of elements
+   * including none. The user can also extend {@link RichFlatMapFunction} to
+   * gain access to other features provided by the {@link RichFuntion}
+   * interface.
+   *
+   * @param coFlatMapper
+        * The CoFlatMapFunction used to jointly transform the two input
+   * DataStreams
+   * @return The transformed { @link DataStream}
+   */
+  def flatMap[R: TypeInformation: ClassTag](coFlatMapper: 
CoFlatMapFunction[IN1,IN2,R]): DataStream[R] = {
+    if (coFlatMapper == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    new DataStream[R](javaStream.addCoFunction("flatMap", 
implicitly[TypeInformation[R]],
+      new CoFlatMapInvokable[IN1,IN2, R](coFlatMapper)))
+  }
+
+  /**
+   * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
+   * maps the output to a common type. The transformation calls a
+   * @param fun1 for each element of the first input
+   * and @param fun2 for each element of the second
+   * input. Each CoFlatMapFunction call returns any number of elements
+   * including none.
+
+   * @return The transformed { @link DataStream}
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, 
fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
+    if (fun1 == null || fun2 == null) {
+      throw new NullPointerException("FlatMap functions must not be null.")
+    }
+    val flatMapper = new CoFlatMapFunction[IN1,IN2, R] {
+       def flatMap1(value: IN1, out: Collector[R]): Unit = 
clean(fun1)(value,out)
+       def flatMap2(value: IN2, out: Collector[R]): Unit = 
clean(fun2)(value,out)
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
+   * GroupBy operation for connected data stream. Groups the elements of
+   * input1 and input2 according to keyPosition1 and keyPosition2. Used for
+   * applying function on grouped data streams for example
+   * {@link ConnectedDataStream#reduce}
+   *
+   * @param keyPosition1
+        * The field used to compute the hashcode of the elements in the
+   * first input stream.
+   * @param keyPosition2
+        * The field used to compute the hashcode of the elements in the
+   * second input stream.
+   * @return @return The transformed { @link ConnectedDataStream}
+   */
+  def groupBy(keyPosition1 : Int,keyPosition2: Int) : 
ConnectedDataStream[IN1,IN2] = {
+    new 
ConnectedDataStream[IN1,IN2](javaStream.groupBy(keyPosition1,keyPosition2))
+  }
+
+  /**
+   * GroupBy operation for connected data stream. Groups the elements of
+   * input1 and input2 according to keyPositions1 and keyPositions2. Used for
+   * applying function on grouped data streams for example
+   * {@link ConnectedDataStream#reduce}
+   *
+   * @param keyPositions1
+        * The fields used to group the first input stream.
+   * @param keyPositions2
+        * The fields used to group the second input stream.
+   * @return @return The transformed { @link ConnectedDataStream}
+   */
+  def groupBy(keyPositions1 : Array[Int],keyPositions2: Array[Int]) : 
ConnectedDataStream[IN1,IN2] = {
+    new 
ConnectedDataStream[IN1,IN2](javaStream.groupBy(keyPositions1,keyPositions2))
+  }
+
+  /**
+   * GroupBy operation for connected data stream using key expressions. Groups
+   * the elements of input1 and input2 according to field1 and field2. A field
+   * expression is either the name of a public field or a getter method with
+   * parentheses of the {@link DataStream}S underlying type. A dot can be used
+   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+   *
+   * @param field1
+        * The grouping expression for the first input
+   * @param field2
+        * The grouping expression for the second input
+   * @return The grouped { @link ConnectedDataStream}
+   */
+  def groupBy(field1 : String, field2: String) : ConnectedDataStream[IN1,IN2] 
= {
+    new ConnectedDataStream[IN1,IN2](javaStream.groupBy(field1,field2))
+  }
+
+  /**
+   * GroupBy operation for connected data stream using key expressions. Groups
+   * the elements of input1 and input2 according to fields1 and fields2. A
+   * field expression is either the name of a public field or a getter method
+   * with parentheses of the {@link DataStream}S underlying type. A dot can be
+   * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+   * .
+   *
+   * @param fields1
+        * The grouping expressions for the first input
+   * @param fields2
+        * The grouping expressions for the second input
+   * @return The grouped { @link ConnectedDataStream}
+   */
+  def groupBy(fields1 : Array[String],fields2: Array[String]) : 
ConnectedDataStream[IN1,IN2] = {
+    new ConnectedDataStream[IN1,IN2](javaStream.groupBy(fields1,fields2))
+  }
+
+  /**
+   * GroupBy operation for connected data stream. Groups the elements of
+   * input1 and input2 using fun1 and fun2. Used for applying
+   * function on grouped data streams for example
+   * {@link ConnectedDataStream#reduce}
+   *
+   * @param fun1
+        * The function used for grouping the first input
+   * @param fun2
+        * The function used for grouping the second input
+   * @return @return The transformed { @link ConnectedDataStream}
+   */
+  def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _): 
ConnectedDataStream[IN1,IN2] = {
+
+    val keyExtractor1 = new KeySelector[IN1, Any] {
+      def getKey(in: IN1) = clean(fun1)(in)
+    }
+    val keyExtractor2 = new KeySelector[IN2, Any] {
+      def getKey(in: IN2) = clean(fun2)(in)
+    }
+
+    new 
ConnectedDataStream[IN1,IN2](javaStream.groupBy(keyExtractor1,keyExtractor2))
+  }
+
+  /**
+   * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
+   * the outputs to a common type. If the {@link ConnectedDataStream} is
+   * batched or windowed then the reduce transformation is applied on every
+   * sliding batch/window of the data stream. If the connected data stream is
+   * grouped then the reducer is applied on every group of elements sharing
+   * the same key. This type of reduce is much faster than reduceGroup since
+   * the reduce function can be applied incrementally.
+   *
+   * @param coReducer
+        * The { @link CoReduceFunction} that will be called for every
+   *             element of the inputs.
+   * @return The transformed { @link DataStream}.
+   */
+  def reduce[R: TypeInformation: ClassTag](coReducer: 
CoReduceFunction[IN1,IN2,R]): DataStream[R] = {
+    if (coReducer == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+
+    new DataStream[R](javaStream.addCoFunction("coReduce", 
implicitly[TypeInformation[R]],
+      new CoReduceInvokable[IN1,IN2,R](coReducer)))
+  }
+
+  /**
+   * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
+   * the outputs to a common type. If the {@link ConnectedDataStream} is
+   * batched or windowed then the reduce transformation is applied on every
+   * sliding batch/window of the data stream. If the connected data stream is
+   * grouped then the reducer is applied on every group of elements sharing
+   * the same key. This type of reduce is much faster than reduceGroup since
+   * the reduce function can be applied incrementally.
+   *
+   * @param reducer1
+   * @param reducer2
+   * @param mapper1
+   * @param mapper2
+   *
+   * @return The transformed { @link DataStream}.
+   */
+  def reduce[R: TypeInformation: ClassTag](reducer1: (IN1,IN1) => IN1, 
reducer2: (IN2,IN2) => IN2,
+                                            mapper1: IN1 => R, mapper2: IN2 => 
R): DataStream[R] = {
+    if (mapper1 == null || mapper2 == null) {
+      throw new NullPointerException("Map functions must not be null.")
+    }
+    if (reducer1 == null || reducer2 == null) {
+      throw new NullPointerException("Reduce functions must not be null.")
+    }
+    
+    val reducer = new CoReduceFunction[IN1,IN2,R] {
+       def reduce1(value1: IN1, value2: IN1): IN1 = 
clean(reducer1)(value1,value2)
+       def map2(value: IN2): R = clean(mapper2)(value)
+       def reduce2(value1: IN2, value2: IN2): IN2 = 
clean(reducer2)(value1,value2)
+       def map1(value: IN1): R = clean(mapper1)(value)
+    }
+    reduce(reducer)
+  }
+
+  /**
+   * Applies a CoWindow transformation on the connected DataStreams. The
+   * transformation calls the {@link CoWindowFunction#coWindow} method for for
+   * time aligned windows of the two data streams. System time is used as
+   * default to compute windows.
+   *
+   * @param coWindowFunction
+        * The { @link CoWindowFunction} that will be applied for the time
+   *             windows.
+   * @param windowSize
+        * Size of the windows that will be aligned for both streams in
+   * milliseconds.
+   * @param slideInterval
+        * After every function call the windows will be slid by this
+   * interval.
+   *
+   * @return The transformed { @link DataStream}.
+   */
+  def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: 
CoWindowFunction[IN1,IN2,R], windowSize:Long, slideInterval: Long) = {
+    if(coWindowFunction == null){
+      throw new NullPointerException("CoWindow function must no be null")
+    }
+
+    new DataStream[R](javaStream.windowReduce(coWindowFunction, windowSize, 
slideInterval))
+  }
+
+  /**
+   * Applies a CoWindow transformation on the connected DataStreams. The
+   * transformation calls the {@link CoWindowFunction#coWindow} method for for
+   * time aligned windows of the two data streams. System time is used as
+   * default to compute windows.
+   *
+   * @param coWindower
+        * The coWindowing function to be applied for the time windows.
+   * @param windowSize
+        * Size of the windows that will be aligned for both streams in
+   * milliseconds.
+   * @param slideInterval
+        * After every function call the windows will be slid by this
+   * interval.
+   *
+   * @return The transformed { @link DataStream}.
+   */
+  def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], 
Seq[IN2], Collector[R]) => Unit , windowSize:Long, slideInterval: Long) = {
+    if(coWindower == null){
+      throw new NullPointerException("CoWindow function must no be null")
+    }
+
+    val coWindowFun = new CoWindowFunction[IN1,IN2,R] {
+       def coWindow(first: util.List[IN1], second: util.List[IN2], out: 
Collector[R]): Unit = clean(coWindower)(first,second,out)
+    }
+
+    new DataStream[R](javaStream.windowReduce(coWindowFun, windowSize, 
slideInterval))
+  }
+
+  /**
+   * Returns the first {@link DataStream}.
+   *
+   * @return The first DataStream.
+   */
+  def getFirst(): DataStream[IN1] = {
+     new DataStream[IN1](javaStream.getFirst)
+  }
+
+  /**
+   * Returns the second {@link DataStream}.
+   *
+   * @return The second DataStream.
+   */
+  def getSecond(): DataStream[IN2] = {
+    new DataStream[IN2](javaStream.getSecond)
+  }
+
+  /**
+   * Gets the type of the first input
+   *
+   * @return The type of the first input
+   */
+  def getInputType1(): TypeInformation[IN1] = {
+    javaStream.getInputType1
+  }
+
+  /**
+   * Gets the type of the second input
+   *
+   * @return The type of the second input
+   */
+  def getInputType2(): TypeInformation[IN2] = {
+    javaStream.getInputType2
+  }
+
+
+}

Reply via email to