[scala] [streaming] Added connect op to DataStream and implicit conversion for 
ConnectedDataStreams

[scala] [streaming] Changed return types to implicits


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7eaf726a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7eaf726a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7eaf726a

Branch: refs/heads/release-0.8
Commit: 7eaf726aeab1ac461e934a4b9d5f87404a64bc0d
Parents: b094d49
Author: carbone <[email protected]>
Authored: Sat Jan 3 20:00:13 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Mon Jan 5 18:07:21 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedDataStream.java      |   6 +-
 .../windowing/TopSpeedWindowingExample.java     | 175 ++++++++++---------
 .../streaming/windowing/TopSpeedWindowing.scala |  11 +-
 .../scala/streaming/ConnectedDataStream.scala   | 171 +++++++++---------
 .../flink/api/scala/streaming/DataStream.scala  |  15 +-
 .../scala/streaming/StreamingConversions.scala  |   6 +-
 6 files changed, 208 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7eaf726a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 287f29d..e81395d 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -231,7 +231,7 @@ public class WindowedDataStream<OUT> {
         * @return The transformed DataStream
         */
        public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> 
reduceFunction) {
-               return dataStream.transform("NextGenWindowReduce", getType(),
+               return dataStream.transform("WindowReduce", getType(),
                                getReduceInvokable(reduceFunction));
        }
 
@@ -255,7 +255,7 @@ public class WindowedDataStream<OUT> {
                TypeInformation<R> outType = TypeExtractor
                                .getGroupReduceReturnTypes(reduceFunction, 
inType);
 
-               return dataStream.transform("NextGenWindowReduce", outType,
+               return dataStream.transform("WindowReduce", outType,
                                getReduceGroupInvokable(reduceFunction));
        }
 
@@ -279,7 +279,7 @@ public class WindowedDataStream<OUT> {
        public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
                        GroupReduceFunction<OUT, R> reduceFunction, 
TypeInformation<R> outType) {
 
-               return dataStream.transform("NextGenWindowReduce", outType,
+               return dataStream.transform("WindowReduce", outType,
                                getReduceGroupInvokable(reduceFunction));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7eaf726a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
index bc3bba5..0f5d8eb 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
-
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -32,90 +31,98 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 /**
- * An example of grouped stream windowing where different eviction and trigger 
policies can be used.
- * A source fetches events from cars every 1 sec containing their id, their 
current speed (kmh),
- * overall elapsed distance (m) and a timestamp. The streaming
- * example triggers the top speed of each car every x meters elapsed for the 
last y seconds.
+ * An example of grouped stream windowing where different eviction and trigger
+ * policies can be used. A source fetches events from cars every 1 sec
+ * containing their id, their current speed (kmh), overall elapsed distance (m)
+ * and a timestamp. The streaming example triggers the top speed of each car
+ * every x meters elapsed for the last y seconds.
  */
 public class TopSpeedWindowingExample {
 
-    public static void main(String[] args) throws Exception {
-
-        if (!parseParameters(args)) {
-            return;
-        }
-
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-        @SuppressWarnings({"unchecked", "rawtypes"})
-        DataStream topSpeeds = env
-                .addSource(CarSource.create(numOfCars))
-                .groupBy(0)
-                .window(Time.of(evictionSec, TimeUnit.SECONDS))
-                .every(Delta.of(
-                        new DeltaFunction<Tuple4<Integer, Integer, Double, 
Long>>() {
-                            @Override
-                            public double getDelta(Tuple4<Integer, Integer, 
Double, Long> oldDataPoint, Tuple4<Integer, Integer, Double, Long> 
newDataPoint) {
-                                return newDataPoint.f2 - oldDataPoint.f2;
-                            }
-                        }
-                        , new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 
0l), triggerMeters))
-                .maxBy(1);
-
-        topSpeeds.print();
-        env.execute("CarTopSpeedWindowingExample");
-    }
-
-    private static class CarSource implements SourceFunction<Tuple4<Integer, 
Integer, Double, Long>> {
-        private Integer[] speeds;
-        private Double[] distances;
-
-        private Random rand = new Random();
-
-        private CarSource(int numOfCars) {
-            speeds = new Integer[numOfCars];
-            distances = new Double[numOfCars];
-            Arrays.fill(speeds, 50);
-            Arrays.fill(distances, 0d);
-        }
-
-        public static CarSource create(int cars) {
-            return new CarSource(cars);
-        }
-
-        @Override
-        public void invoke(Collector<Tuple4<Integer, Integer, Double, Long>> 
collector) throws Exception {
-
-            while (true) {
-                Thread.sleep(1000);
-                for (int carId = 0; carId < speeds.length; carId++) {
-                    if (rand.nextBoolean())
-                        speeds[carId] = Math.min(100, speeds[carId] + 5);
-                    else
-                        speeds[carId] = Math.max(0, speeds[carId] - 5);
-                    distances[carId] += speeds[carId] / 3.6d;
-                    collector.collect(new Tuple4<Integer, Integer, Double, 
Long>(carId, speeds[carId], distances[carId], System.currentTimeMillis()));
-                }
-            }
-        }
-    }
-
-    private static int numOfCars = 2;
-    private static int evictionSec = 10;
-    private static double triggerMeters = 50;
-
-    private static boolean parseParameters(String[] args) {
-
-        if (args.length > 0) {
-            if (args.length == 3) {
-                numOfCars = Integer.valueOf(args[0]);
-                evictionSec = Integer.valueOf(args[1]);
-                triggerMeters = Double.valueOf(args[2]);
-            } else {
-                System.err.println("Usage: TopSpeedWindowingExample <numCars> 
<evictSec> <triggerMeters>");
-                return false;
-            }
-        }
-        return true;
-    }
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               @SuppressWarnings({ "rawtypes", "serial" })
+               DataStream topSpeeds = env
+                               .addSource(CarSource.create(numOfCars))
+                               .groupBy(0)
+                               .window(Time.of(evictionSec, TimeUnit.SECONDS))
+                               .every(Delta.of(triggerMeters,
+                                               new 
DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
+                                                       @Override
+                                                       public double getDelta(
+                                                                       
Tuple4<Integer, Integer, Double, Long> oldDataPoint,
+                                                                       
Tuple4<Integer, Integer, Double, Long> newDataPoint) {
+                                                               return 
newDataPoint.f2 - oldDataPoint.f2;
+                                                       }
+                                               }, new Tuple4<Integer, Integer, 
Double, Long>(0, 0, 0d, 0l))).maxBy(1);
+
+               topSpeeds.print();
+               env.execute("CarTopSpeedWindowingExample");
+       }
+
+       private static class CarSource implements
+                       SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
+
+               private static final long serialVersionUID = 1L;
+               private Integer[] speeds;
+               private Double[] distances;
+
+               private Random rand = new Random();
+
+               private CarSource(int numOfCars) {
+                       speeds = new Integer[numOfCars];
+                       distances = new Double[numOfCars];
+                       Arrays.fill(speeds, 50);
+                       Arrays.fill(distances, 0d);
+               }
+
+               public static CarSource create(int cars) {
+                       return new CarSource(cars);
+               }
+
+               @Override
+               public void invoke(Collector<Tuple4<Integer, Integer, Double, 
Long>> collector)
+                               throws Exception {
+
+                       while (true) {
+                               Thread.sleep(1000);
+                               for (int carId = 0; carId < speeds.length; 
carId++) {
+                                       if (rand.nextBoolean()) {
+                                               speeds[carId] = Math.min(100, 
speeds[carId] + 5);
+                                       } else {
+                                               speeds[carId] = Math.max(0, 
speeds[carId] - 5);
+                                       }
+                                       distances[carId] += speeds[carId] / 
3.6d;
+                                       collector.collect(new Tuple4<Integer, 
Integer, Double, Long>(carId,
+                                                       speeds[carId], 
distances[carId], System.currentTimeMillis()));
+                               }
+                       }
+               }
+       }
+
+       private static int numOfCars = 2;
+       private static int evictionSec = 10;
+       private static double triggerMeters = 50;
+
+       private static boolean parseParameters(String[] args) {
+
+               if (args.length > 0) {
+                       if (args.length == 3) {
+                               numOfCars = Integer.valueOf(args[0]);
+                               evictionSec = Integer.valueOf(args[1]);
+                               triggerMeters = Double.valueOf(args[2]);
+                       } else {
+                               System.err
+                                               .println("Usage: 
TopSpeedWindowingExample <numCars> <evictSec> <triggerMeters>");
+                               return false;
+                       }
+               }
+               return true;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7eaf726a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
index 10ddfd8..e39fb11 100644
--- 
a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
+++ 
b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala
@@ -30,10 +30,12 @@ import scala.math.{max, min}
 import scala.util.Random
 
 /**
- * An example of grouped stream windowing where different eviction and trigger 
policies can be used.
- * A source fetches events from cars every 1 sec containing their id, their 
current speed (kmh),
+ * An example of grouped stream windowing where different eviction and 
+ * trigger policies can be used.A source fetches events from cars 
+ * every 1 sec containing their id, their current speed (kmh),
  * overall elapsed distance (m) and a timestamp. The streaming
- * example triggers the top speed of each car every x meters elapsed for the 
last y seconds.
+ * example triggers the top speed of each car every x meters elapsed 
+ * for the last y seconds.
  */
 object TopSpeedWindowing {
 
@@ -47,7 +49,8 @@ object TopSpeedWindowing {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val cars = env.addSource(carSource _).groupBy("carId")
       .window(Time.of(evictionSec, SECONDS))
-      .every(Delta.of[CarSpeed](triggerMeters, (oldSp,newSp) => 
newSp.distance-oldSp.distance, CarSpeed(0,0,0,0)))
+      .every(Delta.of[CarSpeed](triggerMeters, 
+          (oldSp,newSp) => newSp.distance-oldSp.distance, CarSpeed(0,0,0,0)))
       .reduce((x, y) => if (x.speed > y.speed) x else y)
 
     cars print

http://git-wip-us.apache.org/repos/asf/flink/blob/7eaf726a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
index 1c3f5a1..985e512 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,19 +20,19 @@ package org.apache.flink.api.scala.streaming
 
 import java.util
 
-import scala.collection.JavaConversions._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment._
-import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => 
JavaCStream}
-import org.apache.flink.streaming.api.function.co.{CoWindowFunction, 
CoFlatMapFunction, CoMapFunction, CoReduceFunction}
-import 
org.apache.flink.streaming.api.invokable.operator.co.{CoFlatMapInvokable, 
CoMapInvokable, CoReduceInvokable}
+import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => 
JavaCStream }
+import org.apache.flink.streaming.api.function.co.{ CoFlatMapFunction, 
CoMapFunction, CoReduceFunction, CoWindowFunction }
+import org.apache.flink.streaming.api.invokable.operator.co.{ 
CoFlatMapInvokable, CoMapInvokable, CoReduceInvokable }
 import org.apache.flink.util.Collector
+import org.apache.flink.api.scala.streaming.StreamingConversions._
 
-
+import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
-class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) {
+class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
 
   /**
    * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
@@ -40,21 +41,22 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * @param fun2 for each element of the second input. Each
    * CoMapFunction call returns exactly one element.
    *
-        * The CoMapFunction used to jointly transform the two input
+   * The CoMapFunction used to jointly transform the two input
    * DataStreams
    * @return The transformed { @link DataStream}
    */
-  def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): 
DataStream[R] = {
+  def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): 
+  DataStream[R] = {
     if (fun1 == null || fun2 == null) {
       throw new NullPointerException("Map function must not be null.")
     }
-    val comapper = new CoMapFunction[IN1,IN2,R] {
-       def map1(in1: IN1): R = clean(fun1)(in1)
-       def map2(in2: IN2): R = clean(fun2)(in2)
+    val comapper = new CoMapFunction[IN1, IN2, R] {
+      def map1(in1: IN1): R = clean(fun1)(in1)
+      def map2(in2: IN2): R = clean(fun2)(in2)
     }
 
-    new 
DataStream(javaStream.addCoFunction("map",implicitly[TypeInformation[R]],
-    new CoMapInvokable[IN1,IN2,R](comapper)))
+    new DataStream(javaStream.addCoFunction("map", 
implicitly[TypeInformation[R]],
+      new CoMapInvokable[IN1, IN2, R](comapper)))
   }
 
   /**
@@ -67,17 +69,18 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * the {@link RichFuntion} interface.
    *
    * @param coMapper
-        * The CoMapFunction used to jointly transform the two input
+   * The CoMapFunction used to jointly transform the two input
    * DataStreams
    * @return The transformed { @link DataStream}
    */
-  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1,IN2,R]): 
DataStream[R] = {
+  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): 
+  DataStream[R] = {
     if (coMapper == null) {
       throw new NullPointerException("Map function must not be null.")
     }
 
-    new 
DataStream(javaStream.addCoFunction("map",implicitly[TypeInformation[R]],
-      new CoMapInvokable[IN1,IN2,R](coMapper)))
+    new DataStream(javaStream.addCoFunction("map", 
implicitly[TypeInformation[R]],
+      new CoMapInvokable[IN1, IN2, R](coMapper)))
   }
 
   /**
@@ -91,16 +94,17 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * interface.
    *
    * @param coFlatMapper
-        * The CoFlatMapFunction used to jointly transform the two input
+   * The CoFlatMapFunction used to jointly transform the two input
    * DataStreams
    * @return The transformed { @link DataStream}
    */
-  def flatMap[R: TypeInformation: ClassTag](coFlatMapper: 
CoFlatMapFunction[IN1,IN2,R]): DataStream[R] = {
+  def flatMap[R: TypeInformation: ClassTag](coFlatMapper: 
CoFlatMapFunction[IN1, IN2, R]): 
+  DataStream[R] = {
     if (coFlatMapper == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
     new DataStream[R](javaStream.addCoFunction("flatMap", 
implicitly[TypeInformation[R]],
-      new CoFlatMapInvokable[IN1,IN2, R](coFlatMapper)))
+      new CoFlatMapInvokable[IN1, IN2, R](coFlatMapper)))
   }
 
   /**
@@ -110,16 +114,17 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * and @param fun2 for each element of the second
    * input. Each CoFlatMapFunction call returns any number of elements
    * including none.
-
+   *
    * @return The transformed { @link DataStream}
    */
-  def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, 
fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
+  def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, 
+      fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
     if (fun1 == null || fun2 == null) {
       throw new NullPointerException("FlatMap functions must not be null.")
     }
-    val flatMapper = new CoFlatMapFunction[IN1,IN2, R] {
-       def flatMap1(value: IN1, out: Collector[R]): Unit = 
clean(fun1)(value,out)
-       def flatMap2(value: IN2, out: Collector[R]): Unit = 
clean(fun2)(value,out)
+    val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
+      def flatMap1(value: IN1, out: Collector[R]): Unit = clean(fun1)(value, 
out)
+      def flatMap2(value: IN2, out: Collector[R]): Unit = clean(fun2)(value, 
out)
     }
     flatMap(flatMapper)
   }
@@ -131,15 +136,15 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * {@link ConnectedDataStream#reduce}
    *
    * @param keyPosition1
-        * The field used to compute the hashcode of the elements in the
+   * The field used to compute the hashcode of the elements in the
    * first input stream.
    * @param keyPosition2
-        * The field used to compute the hashcode of the elements in the
+   * The field used to compute the hashcode of the elements in the
    * second input stream.
    * @return @return The transformed { @link ConnectedDataStream}
    */
-  def groupBy(keyPosition1 : Int,keyPosition2: Int) : 
ConnectedDataStream[IN1,IN2] = {
-    new 
ConnectedDataStream[IN1,IN2](javaStream.groupBy(keyPosition1,keyPosition2))
+  def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, 
IN2] = {
+    javaStream.groupBy(keyPosition1, keyPosition2)
   }
 
   /**
@@ -149,13 +154,14 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * {@link ConnectedDataStream#reduce}
    *
    * @param keyPositions1
-        * The fields used to group the first input stream.
+   * The fields used to group the first input stream.
    * @param keyPositions2
-        * The fields used to group the second input stream.
+   * The fields used to group the second input stream.
    * @return @return The transformed { @link ConnectedDataStream}
    */
-  def groupBy(keyPositions1 : Array[Int],keyPositions2: Array[Int]) : 
ConnectedDataStream[IN1,IN2] = {
-    new 
ConnectedDataStream[IN1,IN2](javaStream.groupBy(keyPositions1,keyPositions2))
+  def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): 
+  ConnectedDataStream[IN1, IN2] = {
+    javaStream.groupBy(keyPositions1, keyPositions2)
   }
 
   /**
@@ -166,13 +172,13 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * to drill down into objects, as in {@code "field1.getInnerField2()" }.
    *
    * @param field1
-        * The grouping expression for the first input
+   * The grouping expression for the first input
    * @param field2
-        * The grouping expression for the second input
+   * The grouping expression for the second input
    * @return The grouped { @link ConnectedDataStream}
    */
-  def groupBy(field1 : String, field2: String) : ConnectedDataStream[IN1,IN2] 
= {
-    new ConnectedDataStream[IN1,IN2](javaStream.groupBy(field1,field2))
+  def groupBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = 
{
+    javaStream.groupBy(field1, field2)
   }
 
   /**
@@ -184,13 +190,14 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * .
    *
    * @param fields1
-        * The grouping expressions for the first input
+   * The grouping expressions for the first input
    * @param fields2
-        * The grouping expressions for the second input
+   * The grouping expressions for the second input
    * @return The grouped { @link ConnectedDataStream}
    */
-  def groupBy(fields1 : Array[String],fields2: Array[String]) : 
ConnectedDataStream[IN1,IN2] = {
-    new ConnectedDataStream[IN1,IN2](javaStream.groupBy(fields1,fields2))
+  def groupBy(fields1: Array[String], fields2: Array[String]): 
+  ConnectedDataStream[IN1, IN2] = {
+    javaStream.groupBy(fields1, fields2)
   }
 
   /**
@@ -200,12 +207,13 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * {@link ConnectedDataStream#reduce}
    *
    * @param fun1
-        * The function used for grouping the first input
+   * The function used for grouping the first input
    * @param fun2
-        * The function used for grouping the second input
+   * The function used for grouping the second input
    * @return @return The transformed { @link ConnectedDataStream}
    */
-  def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _): 
ConnectedDataStream[IN1,IN2] = {
+  def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _):
+  ConnectedDataStream[IN1, IN2] = {
 
     val keyExtractor1 = new KeySelector[IN1, Any] {
       def getKey(in: IN1) = clean(fun1)(in)
@@ -214,7 +222,7 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
       def getKey(in: IN2) = clean(fun2)(in)
     }
 
-    new 
ConnectedDataStream[IN1,IN2](javaStream.groupBy(keyExtractor1,keyExtractor2))
+    javaStream.groupBy(keyExtractor1, keyExtractor2)
   }
 
   /**
@@ -227,17 +235,18 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * the reduce function can be applied incrementally.
    *
    * @param coReducer
-        * The { @link CoReduceFunction} that will be called for every
+   * The { @link CoReduceFunction} that will be called for every
    *             element of the inputs.
    * @return The transformed { @link DataStream}.
    */
-  def reduce[R: TypeInformation: ClassTag](coReducer: 
CoReduceFunction[IN1,IN2,R]): DataStream[R] = {
+  def reduce[R: TypeInformation: ClassTag](coReducer: CoReduceFunction[IN1, 
IN2, R]): 
+  DataStream[R] = {
     if (coReducer == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
 
     new DataStream[R](javaStream.addCoFunction("coReduce", 
implicitly[TypeInformation[R]],
-      new CoReduceInvokable[IN1,IN2,R](coReducer)))
+      new CoReduceInvokable[IN1, IN2, R](coReducer)))
   }
 
   /**
@@ -256,20 +265,20 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    *
    * @return The transformed { @link DataStream}.
    */
-  def reduce[R: TypeInformation: ClassTag](reducer1: (IN1,IN1) => IN1, 
reducer2: (IN2,IN2) => IN2,
-                                            mapper1: IN1 => R, mapper2: IN2 => 
R): DataStream[R] = {
+  def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1, 
+      reducer2: (IN2, IN2) => IN2,mapper1: IN1 => R, mapper2: IN2 => R): 
DataStream[R] = {
     if (mapper1 == null || mapper2 == null) {
       throw new NullPointerException("Map functions must not be null.")
     }
     if (reducer1 == null || reducer2 == null) {
       throw new NullPointerException("Reduce functions must not be null.")
     }
-    
-    val reducer = new CoReduceFunction[IN1,IN2,R] {
-       def reduce1(value1: IN1, value2: IN1): IN1 = 
clean(reducer1)(value1,value2)
-       def map2(value: IN2): R = clean(mapper2)(value)
-       def reduce2(value1: IN2, value2: IN2): IN2 = 
clean(reducer2)(value1,value2)
-       def map1(value: IN1): R = clean(mapper1)(value)
+
+    val reducer = new CoReduceFunction[IN1, IN2, R] {
+      def reduce1(value1: IN1, value2: IN1): IN1 = clean(reducer1)(value1, 
value2)
+      def map2(value: IN2): R = clean(mapper2)(value)
+      def reduce2(value1: IN2, value2: IN2): IN2 = clean(reducer2)(value1, 
value2)
+      def map1(value: IN1): R = clean(mapper1)(value)
     }
     reduce(reducer)
   }
@@ -281,23 +290,24 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * default to compute windows.
    *
    * @param coWindowFunction
-        * The { @link CoWindowFunction} that will be applied for the time
+   * The { @link CoWindowFunction} that will be applied for the time
    *             windows.
    * @param windowSize
-        * Size of the windows that will be aligned for both streams in
+   * Size of the windows that will be aligned for both streams in
    * milliseconds.
    * @param slideInterval
-        * After every function call the windows will be slid by this
+   * After every function call the windows will be slid by this
    * interval.
    *
    * @return The transformed { @link DataStream}.
    */
-  def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: 
CoWindowFunction[IN1,IN2,R], windowSize:Long, slideInterval: Long) = {
-    if(coWindowFunction == null){
+  def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: 
+      CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long) = {
+    if (coWindowFunction == null) {
       throw new NullPointerException("CoWindow function must no be null")
     }
 
-    new DataStream[R](javaStream.windowReduce(coWindowFunction, windowSize, 
slideInterval))
+    javaStream.windowReduce(coWindowFunction, windowSize, slideInterval)
   }
 
   /**
@@ -307,26 +317,28 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * default to compute windows.
    *
    * @param coWindower
-        * The coWindowing function to be applied for the time windows.
+   * The coWindowing function to be applied for the time windows.
    * @param windowSize
-        * Size of the windows that will be aligned for both streams in
+   * Size of the windows that will be aligned for both streams in
    * milliseconds.
    * @param slideInterval
-        * After every function call the windows will be slid by this
+   * After every function call the windows will be slid by this
    * interval.
    *
    * @return The transformed { @link DataStream}.
    */
-  def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], 
Seq[IN2], Collector[R]) => Unit , windowSize:Long, slideInterval: Long) = {
-    if(coWindower == null){
+  def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], 
Seq[IN2], 
+      Collector[R]) => Unit, windowSize: Long, slideInterval: Long) = {
+    if (coWindower == null) {
       throw new NullPointerException("CoWindow function must no be null")
     }
 
-    val coWindowFun = new CoWindowFunction[IN1,IN2,R] {
-       def coWindow(first: util.List[IN1], second: util.List[IN2], out: 
Collector[R]): Unit = clean(coWindower)(first,second,out)
+    val coWindowFun = new CoWindowFunction[IN1, IN2, R] {
+      def coWindow(first: util.List[IN1], second: util.List[IN2], 
+          out: Collector[R]): Unit = clean(coWindower)(first, second, out)
     }
 
-    new DataStream[R](javaStream.windowReduce(coWindowFun, windowSize, 
slideInterval))
+    javaStream.windowReduce(coWindowFun, windowSize, slideInterval)
   }
 
   /**
@@ -335,7 +347,7 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * @return The first DataStream.
    */
   def getFirst(): DataStream[IN1] = {
-     new DataStream[IN1](javaStream.getFirst)
+    javaStream.getFirst
   }
 
   /**
@@ -344,7 +356,7 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
    * @return The second DataStream.
    */
   def getSecond(): DataStream[IN2] = {
-    new DataStream[IN2](javaStream.getSecond)
+    javaStream.getSecond
   }
 
   /**
@@ -365,5 +377,4 @@ class ConnectedDataStream[IN1,IN2] (javaStream: 
JavaCStream[IN1,IN2]) {
     javaStream.getInputType2
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7eaf726a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index 546d8a9..ccfd176 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -19,22 +19,20 @@
 package org.apache.flink.api.scala.streaming
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
+  SingleOutputStreamOperator, GroupedDataStream}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
 import org.apache.flink.util.Collector
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.streaming.api.invokable.StreamInvokable
-import org.apache.flink.streaming.api.datastream.GroupedDataStream
 import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable
 import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable
-import org.apache.flink.streaming.api.datastream.GroupedDataStream
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.common.functions.FilterFunction
@@ -94,6 +92,15 @@ class DataStream[T](javaStream: JavaStream[T]) {
     javaStream.merge(dataStreams.map(_.getJavaStream): _*)
 
   /**
+   * Creates a new ConnectedDataStream by connecting
+   * DataStream outputs of different type with each other. The
+   * DataStreams connected using this operators can be used with CoFunctions.
+   *
+   */
+  def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = 
+    javaStream.connect(dataStream.getJavaStream)
+
+  /**
    * Groups the elements of a DataStream by the given key positions (for 
tuple/array types) to
    * be used with grouped operators like grouped reduce or grouped aggregations
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/7eaf726a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
index a34d0dc..9aefa04 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
@@ -21,9 +21,10 @@ package org.apache.flink.api.scala.streaming
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => 
JavaWStream }
 import org.apache.flink.streaming.api.datastream.{ SplitDataStream => 
SplitJavaStream }
+import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => 
JavaConStream }
 
 object StreamingConversions {
-  
+
   implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
     new DataStream[R](javaStream)
 
@@ -33,4 +34,7 @@ object StreamingConversions {
   implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): 
SplitDataStream[R] =
     new SplitDataStream[R](javaStream)
 
+  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: 
JavaConStream[IN1, IN2]): 
+  ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream)
+
 }

Reply via email to