[scala] [streaming] Added implicit conversions from java to scala streams

Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d4ec0095
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d4ec0095
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d4ec0095

Branch: refs/heads/master
Commit: d4ec0095c253d7b8b2b99226f42dab4ce3555aef
Parents: fac7734
Author: Gyula Fora <[email protected]>
Authored: Sat Jan 3 19:41:37 2015 +0100
Committer: Gyula Fora <[email protected]>
Committed: Sat Jan 3 19:41:37 2015 +0100

----------------------------------------------------------------------
 .../flink/api/scala/streaming/DataStream.scala  | 63 ++++++++++----------
 .../api/scala/streaming/SplitDataStream.scala   |  6 +-
 .../scala/streaming/StreamCrossOperator.scala   |  3 +-
 .../streaming/StreamExecutionEnvironment.scala  | 15 ++---
 .../scala/streaming/StreamJoinOperator.scala    |  3 +-
 .../scala/streaming/StreamingConversions.scala  | 36 +++++++++++
 .../scala/streaming/WindowedDataStream.scala    | 15 +++--
 7 files changed, 88 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index dc1e5b3..546d8a9 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -49,8 +49,8 @@ import 
org.apache.flink.streaming.api.function.aggregation.SumFunction
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction
 import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
-import 
com.amazonaws.services.cloudfront_2012_03_15.model.InvalidArgumentException
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.scala.streaming.StreamingConversions._
 
 class DataStream[T](javaStream: JavaStream[T]) {
 
@@ -62,7 +62,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   /**
    * Sets the degree of parallelism of this operation. This must be greater 
than 1.
    */
-  def setParallelism(dop: Int) = {
+  def setParallelism(dop: Int): DataStream[T] = {
     javaStream match {
       case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop)
       case _ =>
@@ -91,15 +91,14 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def merge(dataStreams: DataStream[T]*): DataStream[T] =
-    new DataStream[T](javaStream.merge(dataStreams.map(_.getJavaStream): _*))
+    javaStream.merge(dataStreams.map(_.getJavaStream): _*)
 
   /**
    * Groups the elements of a DataStream by the given key positions (for 
tuple/array types) to
    * be used with grouped operators like grouped reduce or grouped aggregations
    *
    */
-  def groupBy(fields: Int*): DataStream[T] =
-    new DataStream[T](javaStream.groupBy(fields: _*))
+  def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*)
 
   /**
    * Groups the elements of a DataStream by the given field expressions to
@@ -107,7 +106,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def groupBy(firstField: String, otherFields: String*): DataStream[T] = 
-    new DataStream[T](javaStream.groupBy(firstField +: otherFields.toArray: 
_*))    
+   javaStream.groupBy(firstField +: otherFields.toArray: _*)   
   
   /**
    * Groups the elements of a DataStream by the given K key to
@@ -120,7 +119,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
       val cleanFun = clean(fun)
       def getKey(in: T) = cleanFun(in)
     }
-    new DataStream[T](javaStream.groupBy(keyExtractor))
+    javaStream.groupBy(keyExtractor)
   }
 
   /**
@@ -130,7 +129,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def partitionBy(fields: Int*): DataStream[T] =
-    new DataStream[T](javaStream.partitionBy(fields: _*));
+    javaStream.partitionBy(fields: _*)
 
   /**
    * Sets the partitioning of the DataStream so that the output is
@@ -139,7 +138,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
-    new DataStream[T](javaStream.partitionBy(firstField +: 
otherFields.toArray: _*))    
+   javaStream.partitionBy(firstField +: otherFields.toArray: _*)
 
   /**
    * Sets the partitioning of the DataStream so that the output is
@@ -153,7 +152,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
       val cleanFun = clean(fun)
       def getKey(in: T) = cleanFun(in)
     }
-    new DataStream[T](javaStream.partitionBy(keyExtractor))
+    javaStream.partitionBy(keyExtractor)
   }
 
   /**
@@ -163,7 +162,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * parallel instances of the next processing operator.
    *
    */
-  def broadcast: DataStream[T] = new DataStream[T](javaStream.broadcast())
+  def broadcast: DataStream[T] = javaStream.broadcast()
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
@@ -172,7 +171,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * processing operator.
    *
    */
-  def shuffle: DataStream[T] = new DataStream[T](javaStream.shuffle())
+  def shuffle: DataStream[T] = javaStream.shuffle()
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
@@ -182,7 +181,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * instances of the next processing operator.
    *
    */
-  def forward: DataStream[T] = new DataStream[T](javaStream.forward())
+  def forward: DataStream[T] = javaStream.forward()
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
@@ -191,7 +190,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * the next processing operator.
    *
    */
-  def distribute: DataStream[T] = new DataStream[T](javaStream.distribute())
+  def distribute: DataStream[T] = javaStream.distribute()
 
   /**
    * Initiates an iterative part of the program that creates a loop by feeding
@@ -217,7 +216,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
     val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
     iterativeStream.closeWith(feedback.getJavaStream)
-    new DataStream[T](output.getJavaStream)
+    output
   }
 
   /**
@@ -301,8 +300,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
       def map(in: T): R = cleanFun(in)
     }
 
-    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]],
-      new MapInvokable[T, R](mapper)))
+    javaStream.transform("map", implicitly[TypeInformation[R]], new 
MapInvokable[T, R](mapper))
   }
 
   /**
@@ -313,8 +311,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
       throw new NullPointerException("Map function must not be null.")
     }
 
-    new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]],
-      new MapInvokable[T, R](mapper)))
+    javaStream.transform("map", implicitly[TypeInformation[R]], new 
MapInvokable[T, R](mapper))
   }
 
   /**
@@ -325,8 +322,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (flatMapper == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
-    new DataStream[R](javaStream.transform("flatMap", 
implicitly[TypeInformation[R]],
-      new FlatMapInvokable[T, R](flatMapper)))
+   javaStream.transform("flatMap", implicitly[TypeInformation[R]], 
+       new FlatMapInvokable[T, R](flatMapper))
   }
 
   /**
@@ -368,10 +365,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
       throw new NullPointerException("Reduce function must not be null.")
     }
     javaStream match {
-      case ds: GroupedDataStream[_] => new 
DataStream[T](javaStream.transform("reduce",
-        javaStream.getType(), new GroupedReduceInvokable[T](reducer, 
ds.getKeySelector())))
-      case _ => new DataStream[T](javaStream.transform("reduce", 
javaStream.getType(),
-        new StreamReduceInvokable[T](reducer)))
+      case ds: GroupedDataStream[_] => javaStream.transform("reduce",
+        javaStream.getType(), new GroupedReduceInvokable[T](reducer, 
ds.getKeySelector()))
+      case _ => javaStream.transform("reduce", javaStream.getType(),
+        new StreamReduceInvokable[T](reducer))
     }
   }
 
@@ -397,7 +394,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (filter == null) {
       throw new NullPointerException("Filter function must not be null.")
     }
-    new DataStream[T](javaStream.filter(filter))
+    javaStream.filter(filter)
   }
 
   /**
@@ -426,7 +423,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * window(List(triggers), List(evicters))
    */
   def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
-    new WindowedDataStream[T](javaStream.window(windowingHelper: _*))
+    javaStream.window(windowingHelper: _*)
 
   /**
    * Create a WindowedDataStream using the given TriggerPolicy-s and 
EvictionPolicy-s.
@@ -436,7 +433,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def window(triggers: List[TriggerPolicy[T]], evicters: 
List[EvictionPolicy[T]]):
-    WindowedDataStream[T] = new 
WindowedDataStream[T](javaStream.window(triggers, evicters))
+    WindowedDataStream[T] = javaStream.window(triggers, evicters)
 
   /**
    *
@@ -445,7 +442,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * SplitDataStream.
    */
   def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream 
match {
-    case op: SingleOutputStreamOperator[_, _] => new 
SplitDataStream[T](op.split(selector))
+    case op: SingleOutputStreamOperator[_, _] => op.split(selector)
     case _ =>
       throw new UnsupportedOperationException("Operator " + 
javaStream.toString + " can not be " +
         "split.")
@@ -503,7 +500,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * written.
    *
    */
-  def print(): DataStream[T] = new DataStream[T](javaStream.print())
+  def print(): DataStream[T] = javaStream.print()
 
   /**
    * Writes a DataStream to the file specified by path in text format. The
@@ -513,7 +510,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def writeAsText(path: String, millis: Long = 0): DataStream[T] =
-    new DataStream[T](javaStream.writeAsText(path, millis))
+    javaStream.writeAsText(path, millis)
 
   /**
    * Writes a DataStream to the file specified by path in text format. The
@@ -523,7 +520,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def writeAsCsv(path: String, millis: Long = 0): DataStream[T] =
-    new DataStream[T](javaStream.writeAsCsv(path, millis))
+    javaStream.writeAsCsv(path, millis)
 
   /**
    * Adds the given sink to this DataStream. Only streams with sinks added
@@ -532,7 +529,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] =
-    new DataStream[T](javaStream.addSink(sinkFuntion))
+    javaStream.addSink(sinkFuntion)
 
   /**
    * Adds the given sink to this DataStream. Only streams with sinks added

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
index 82a5c70..f61e34b 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.scala.streaming
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.datastream.{ SplitDataStream => 
SplitJavaStream }
+import org.apache.flink.api.scala.streaming.StreamingConversions._
 
 /**
  * The SplitDataStream represents an operator that has been split using an
@@ -39,12 +40,11 @@ class SplitDataStream[T](javaStream: SplitJavaStream[T]) {
   /**
    *  Sets the output names for which the next operator will receive values.
    */
-  def select(outputNames: String*): DataStream[T] =
-    new DataStream[T](javaStream.select(outputNames: _*))
+  def select(outputNames: String*): DataStream[T] = 
javaStream.select(outputNames: _*)
 
   /**
    * Selects all output names from a split data stream.
    */
-  def selectAll(): DataStream[T] = new DataStream[T](javaStream.selectAll())
+  def selectAll(): DataStream[T] = javaStream.selectAll()
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
index e9010c8..cac2927 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.functions.CrossFunction
 import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
+import org.apache.flink.api.scala.streaming.StreamingConversions._
 
 class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) 
extends
   TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
@@ -88,7 +89,7 @@ object StreamCrossOperator {
       
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
         invokable)
 
-      new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
+      javaStream.setType(implicitly[TypeInformation[R]])
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
index a7a471f..9c66b24 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
@@ -29,6 +29,7 @@ import 
org.apache.flink.streaming.api.function.source.FromElementsFunction
 import org.apache.flink.streaming.api.function.source.SourceFunction
 import scala.collection.JavaConversions._
 import org.apache.flink.util.Collector
+import org.apache.flink.api.scala.streaming.StreamingConversions._
 
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
@@ -82,7 +83,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *
    */
   def readTextFile(filePath: String): DataStream[String] =
-    new DataStream[String](javaEnv.readTextFile(filePath))
+    javaEnv.readTextFile(filePath)
 
   /**
    * Creates a DataStream that represents the Strings produced by reading the
@@ -91,8 +92,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * testing a topology.
    *
    */
-  def readTextStream(StreamPath: String): DataStream[String] =
-    new DataStream[String](javaEnv.readTextStream(StreamPath))
+  def readTextStream(StreamPath: String): DataStream[String] = 
+    javaEnv.readTextStream(StreamPath)
 
   /**
    * Creates a new DataStream that contains the strings received infinitely
@@ -101,7 +102,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *
    */
   def socketTextStream(hostname: String, port: Int, delimiter: Char): 
DataStream[String] =
-    new DataStream[String](javaEnv.socketTextStream(hostname, port, delimiter))
+    javaEnv.socketTextStream(hostname, port, delimiter)
 
   /**
    * Creates a new DataStream that contains the strings received infinitely
@@ -110,7 +111,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *
    */
   def socketTextStream(hostname: String, port: Int): DataStream[String] =
-    new DataStream[String](javaEnv.socketTextStream(hostname, port))
+    javaEnv.socketTextStream(hostname, port)
 
   /**
    * Creates a new DataStream that contains a sequence of numbers.
@@ -151,7 +152,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
       new SourceInvokable[T](new 
FromElementsFunction[T](scala.collection.JavaConversions
         .asJavaCollection(data))), null, typeInfo,
       "source", 1);
-    new DataStream(returnStream)
+    returnStream
   }
 
   /**
@@ -163,7 +164,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     Validate.notNull(function, "Function must not be null.")
     val cleanFun = StreamExecutionEnvironment.clean(function)
     val typeInfo = implicitly[TypeInformation[T]]
-    new DataStream[T](javaEnv.addSource(cleanFun, typeInfo))
+    javaEnv.addSource(cleanFun, typeInfo)
   }
   
    /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
index 4ed5082..8d8a0b0 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
@@ -34,6 +34,7 @@ import 
org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 import org.apache.flink.streaming.util.keys.KeySelectorUtil
 import org.apache.flink.api.java.operators.Keys
 import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
+import org.apache.flink.api.scala.streaming.StreamingConversions._
 
 class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) 
extends
 TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
@@ -178,7 +179,7 @@ object StreamJoinOperator {
       
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
         invokable)
 
-      new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
+      javaStream.setType(implicitly[TypeInformation[R]])
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
new file mode 100644
index 0000000..a34d0dc
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => 
JavaWStream }
+import org.apache.flink.streaming.api.datastream.{ SplitDataStream => 
SplitJavaStream }
+
+object StreamingConversions {
+  
+  implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
+    new DataStream[R](javaStream)
+
+  implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): 
WindowedDataStream[R] =
+    new WindowedDataStream[R](javaWStream)
+
+  implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): 
SplitDataStream[R] =
+    new SplitDataStream[R](javaStream)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
index e33368c..2f9c792 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
@@ -41,6 +41,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.streaming.api.function.aggregation.SumFunction
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
+import org.apache.flink.api.scala.streaming.StreamingConversions._
 
 class WindowedDataStream[T](javaStream: JavaWStream[T]) {
 
@@ -50,7 +51,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    * the window.
    */
   def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
-    new WindowedDataStream[T](javaStream.every(windowingHelper: _*))
+    javaStream.every(windowingHelper: _*)
 
   /**
    * Groups the elements of the WindowedDataStream using the given
@@ -61,8 +62,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    * DataStream.window(...) operator on an already grouped data stream.
    *
    */
-  def groupBy(fields: Int*): WindowedDataStream[T] =
-    new WindowedDataStream[T](javaStream.groupBy(fields: _*))
+  def groupBy(fields: Int*): WindowedDataStream[T] = 
javaStream.groupBy(fields: _*)
 
   /**
    * Groups the elements of the WindowedDataStream using the given
@@ -74,8 +74,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    *
    */
   def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] 
=
-    new WindowedDataStream[T](javaStream.groupBy(
-          firstField +: otherFields.toArray: _*))    
+   javaStream.groupBy(firstField +: otherFields.toArray: _*)   
     
   /**
    * Groups the elements of the WindowedDataStream using the given
@@ -92,7 +91,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
       val cleanFun = clean(fun)
       def getKey(in: T) = cleanFun(in)
     }
-    new WindowedDataStream[T](javaStream.groupBy(keyExtractor))
+    javaStream.groupBy(keyExtractor)
   }
 
   /**
@@ -104,7 +103,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
     if (reducer == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
-    new DataStream[T](javaStream.reduce(reducer))
+    javaStream.reduce(reducer)
   }
 
   /**
@@ -136,7 +135,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
     if (reducer == null) {
       throw new NullPointerException("GroupReduce function must not be null.")
     }
-    new DataStream[R](javaStream.reduceGroup(reducer, 
implicitly[TypeInformation[R]]))
+    javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]])
   }
 
   /**

Reply via email to