[scala] [streaming] Added implicit conversions from java to scala streams
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d4ec0095 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d4ec0095 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d4ec0095 Branch: refs/heads/master Commit: d4ec0095c253d7b8b2b99226f42dab4ce3555aef Parents: fac7734 Author: Gyula Fora <[email protected]> Authored: Sat Jan 3 19:41:37 2015 +0100 Committer: Gyula Fora <[email protected]> Committed: Sat Jan 3 19:41:37 2015 +0100 ---------------------------------------------------------------------- .../flink/api/scala/streaming/DataStream.scala | 63 ++++++++++---------- .../api/scala/streaming/SplitDataStream.scala | 6 +- .../scala/streaming/StreamCrossOperator.scala | 3 +- .../streaming/StreamExecutionEnvironment.scala | 15 ++--- .../scala/streaming/StreamJoinOperator.scala | 3 +- .../scala/streaming/StreamingConversions.scala | 36 +++++++++++ .../scala/streaming/WindowedDataStream.scala | 15 +++-- 7 files changed, 88 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala index dc1e5b3..546d8a9 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala @@ -49,8 +49,8 @@ import org.apache.flink.streaming.api.function.aggregation.SumFunction import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.streaming.api.function.aggregation.AggregationFunction import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType -import com.amazonaws.services.cloudfront_2012_03_15.model.InvalidArgumentException import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.scala.streaming.StreamingConversions._ class DataStream[T](javaStream: JavaStream[T]) { @@ -62,7 +62,7 @@ class DataStream[T](javaStream: JavaStream[T]) { /** * Sets the degree of parallelism of this operation. This must be greater than 1. */ - def setParallelism(dop: Int) = { + def setParallelism(dop: Int): DataStream[T] = { javaStream match { case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop) case _ => @@ -91,15 +91,14 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def merge(dataStreams: DataStream[T]*): DataStream[T] = - new DataStream[T](javaStream.merge(dataStreams.map(_.getJavaStream): _*)) + javaStream.merge(dataStreams.map(_.getJavaStream): _*) /** * Groups the elements of a DataStream by the given key positions (for tuple/array types) to * be used with grouped operators like grouped reduce or grouped aggregations * */ - def groupBy(fields: Int*): DataStream[T] = - new DataStream[T](javaStream.groupBy(fields: _*)) + def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*) /** * Groups the elements of a DataStream by the given field expressions to @@ -107,7 +106,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def groupBy(firstField: String, otherFields: String*): DataStream[T] = - new DataStream[T](javaStream.groupBy(firstField +: otherFields.toArray: _*)) + javaStream.groupBy(firstField +: otherFields.toArray: _*) /** * Groups the elements of a DataStream by the given K key to @@ -120,7 +119,7 @@ class DataStream[T](javaStream: JavaStream[T]) { val cleanFun = clean(fun) def getKey(in: T) = cleanFun(in) } - new DataStream[T](javaStream.groupBy(keyExtractor)) + javaStream.groupBy(keyExtractor) } /** @@ -130,7 +129,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def partitionBy(fields: Int*): DataStream[T] = - new DataStream[T](javaStream.partitionBy(fields: _*)); + javaStream.partitionBy(fields: _*) /** * Sets the partitioning of the DataStream so that the output is @@ -139,7 +138,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def partitionBy(firstField: String, otherFields: String*): DataStream[T] = - new DataStream[T](javaStream.partitionBy(firstField +: otherFields.toArray: _*)) + javaStream.partitionBy(firstField +: otherFields.toArray: _*) /** * Sets the partitioning of the DataStream so that the output is @@ -153,7 +152,7 @@ class DataStream[T](javaStream: JavaStream[T]) { val cleanFun = clean(fun) def getKey(in: T) = cleanFun(in) } - new DataStream[T](javaStream.partitionBy(keyExtractor)) + javaStream.partitionBy(keyExtractor) } /** @@ -163,7 +162,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * parallel instances of the next processing operator. * */ - def broadcast: DataStream[T] = new DataStream[T](javaStream.broadcast()) + def broadcast: DataStream[T] = javaStream.broadcast() /** * Sets the partitioning of the DataStream so that the output tuples @@ -172,7 +171,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * processing operator. * */ - def shuffle: DataStream[T] = new DataStream[T](javaStream.shuffle()) + def shuffle: DataStream[T] = javaStream.shuffle() /** * Sets the partitioning of the DataStream so that the output tuples @@ -182,7 +181,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * instances of the next processing operator. * */ - def forward: DataStream[T] = new DataStream[T](javaStream.forward()) + def forward: DataStream[T] = javaStream.forward() /** * Sets the partitioning of the DataStream so that the output tuples @@ -191,7 +190,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * the next processing operator. * */ - def distribute: DataStream[T] = new DataStream[T](javaStream.distribute()) + def distribute: DataStream[T] = javaStream.distribute() /** * Initiates an iterative part of the program that creates a loop by feeding @@ -217,7 +216,7 @@ class DataStream[T](javaStream: JavaStream[T]) { val (feedback, output) = stepFunction(new DataStream[T](iterativeStream)) iterativeStream.closeWith(feedback.getJavaStream) - new DataStream[T](output.getJavaStream) + output } /** @@ -301,8 +300,7 @@ class DataStream[T](javaStream: JavaStream[T]) { def map(in: T): R = cleanFun(in) } - new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], - new MapInvokable[T, R](mapper))) + javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper)) } /** @@ -313,8 +311,7 @@ class DataStream[T](javaStream: JavaStream[T]) { throw new NullPointerException("Map function must not be null.") } - new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], - new MapInvokable[T, R](mapper))) + javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper)) } /** @@ -325,8 +322,8 @@ class DataStream[T](javaStream: JavaStream[T]) { if (flatMapper == null) { throw new NullPointerException("FlatMap function must not be null.") } - new DataStream[R](javaStream.transform("flatMap", implicitly[TypeInformation[R]], - new FlatMapInvokable[T, R](flatMapper))) + javaStream.transform("flatMap", implicitly[TypeInformation[R]], + new FlatMapInvokable[T, R](flatMapper)) } /** @@ -368,10 +365,10 @@ class DataStream[T](javaStream: JavaStream[T]) { throw new NullPointerException("Reduce function must not be null.") } javaStream match { - case ds: GroupedDataStream[_] => new DataStream[T](javaStream.transform("reduce", - javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector()))) - case _ => new DataStream[T](javaStream.transform("reduce", javaStream.getType(), - new StreamReduceInvokable[T](reducer))) + case ds: GroupedDataStream[_] => javaStream.transform("reduce", + javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector())) + case _ => javaStream.transform("reduce", javaStream.getType(), + new StreamReduceInvokable[T](reducer)) } } @@ -397,7 +394,7 @@ class DataStream[T](javaStream: JavaStream[T]) { if (filter == null) { throw new NullPointerException("Filter function must not be null.") } - new DataStream[T](javaStream.filter(filter)) + javaStream.filter(filter) } /** @@ -426,7 +423,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * window(List(triggers), List(evicters)) */ def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = - new WindowedDataStream[T](javaStream.window(windowingHelper: _*)) + javaStream.window(windowingHelper: _*) /** * Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s. @@ -436,7 +433,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]): - WindowedDataStream[T] = new WindowedDataStream[T](javaStream.window(triggers, evicters)) + WindowedDataStream[T] = javaStream.window(triggers, evicters) /** * @@ -445,7 +442,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * SplitDataStream. */ def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream match { - case op: SingleOutputStreamOperator[_, _] => new SplitDataStream[T](op.split(selector)) + case op: SingleOutputStreamOperator[_, _] => op.split(selector) case _ => throw new UnsupportedOperationException("Operator " + javaStream.toString + " can not be " + "split.") @@ -503,7 +500,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * written. * */ - def print(): DataStream[T] = new DataStream[T](javaStream.print()) + def print(): DataStream[T] = javaStream.print() /** * Writes a DataStream to the file specified by path in text format. The @@ -513,7 +510,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def writeAsText(path: String, millis: Long = 0): DataStream[T] = - new DataStream[T](javaStream.writeAsText(path, millis)) + javaStream.writeAsText(path, millis) /** * Writes a DataStream to the file specified by path in text format. The @@ -523,7 +520,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def writeAsCsv(path: String, millis: Long = 0): DataStream[T] = - new DataStream[T](javaStream.writeAsCsv(path, millis)) + javaStream.writeAsCsv(path, millis) /** * Adds the given sink to this DataStream. Only streams with sinks added @@ -532,7 +529,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = - new DataStream[T](javaStream.addSink(sinkFuntion)) + javaStream.addSink(sinkFuntion) /** * Adds the given sink to this DataStream. Only streams with sinks added http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala index 82a5c70..f61e34b 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/SplitDataStream.scala @@ -20,6 +20,7 @@ package org.apache.flink.api.scala.streaming import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } +import org.apache.flink.api.scala.streaming.StreamingConversions._ /** * The SplitDataStream represents an operator that has been split using an @@ -39,12 +40,11 @@ class SplitDataStream[T](javaStream: SplitJavaStream[T]) { /** * Sets the output names for which the next operator will receive values. */ - def select(outputNames: String*): DataStream[T] = - new DataStream[T](javaStream.select(outputNames: _*)) + def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*) /** * Selects all output names from a split data stream. */ - def selectAll(): DataStream[T] = new DataStream[T](javaStream.selectAll()) + def selectAll(): DataStream[T] = javaStream.selectAll() } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala index e9010c8..cac2927 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala @@ -33,6 +33,7 @@ import org.apache.flink.api.common.functions.CrossFunction import org.apache.flink.api.scala.typeutils.CaseClassSerializer import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean +import org.apache.flink.api.scala.streaming.StreamingConversions._ class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) { @@ -88,7 +89,7 @@ object StreamCrossOperator { javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), invokable) - new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]])) + javaStream.setType(implicitly[TypeInformation[R]]) } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala index a7a471f..9c66b24 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.function.source.FromElementsFunction import org.apache.flink.streaming.api.function.source.SourceFunction import scala.collection.JavaConversions._ import org.apache.flink.util.Collector +import org.apache.flink.api.scala.streaming.StreamingConversions._ class StreamExecutionEnvironment(javaEnv: JavaEnv) { @@ -82,7 +83,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * */ def readTextFile(filePath: String): DataStream[String] = - new DataStream[String](javaEnv.readTextFile(filePath)) + javaEnv.readTextFile(filePath) /** * Creates a DataStream that represents the Strings produced by reading the @@ -91,8 +92,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * testing a topology. * */ - def readTextStream(StreamPath: String): DataStream[String] = - new DataStream[String](javaEnv.readTextStream(StreamPath)) + def readTextStream(StreamPath: String): DataStream[String] = + javaEnv.readTextStream(StreamPath) /** * Creates a new DataStream that contains the strings received infinitely @@ -101,7 +102,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * */ def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] = - new DataStream[String](javaEnv.socketTextStream(hostname, port, delimiter)) + javaEnv.socketTextStream(hostname, port, delimiter) /** * Creates a new DataStream that contains the strings received infinitely @@ -110,7 +111,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * */ def socketTextStream(hostname: String, port: Int): DataStream[String] = - new DataStream[String](javaEnv.socketTextStream(hostname, port)) + javaEnv.socketTextStream(hostname, port) /** * Creates a new DataStream that contains a sequence of numbers. @@ -151,7 +152,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { new SourceInvokable[T](new FromElementsFunction[T](scala.collection.JavaConversions .asJavaCollection(data))), null, typeInfo, "source", 1); - new DataStream(returnStream) + returnStream } /** @@ -163,7 +164,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { Validate.notNull(function, "Function must not be null.") val cleanFun = StreamExecutionEnvironment.clean(function) val typeInfo = implicitly[TypeInformation[T]] - new DataStream[T](javaEnv.addSource(cleanFun, typeInfo)) + javaEnv.addSource(cleanFun, typeInfo) } /** http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala index 4ed5082..8d8a0b0 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable import org.apache.flink.streaming.util.keys.KeySelectorUtil import org.apache.flink.api.java.operators.Keys import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean +import org.apache.flink.api.scala.streaming.StreamingConversions._ class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { @@ -178,7 +179,7 @@ object StreamJoinOperator { javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), invokable) - new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]])) + javaStream.setType(implicitly[TypeInformation[R]]) } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala new file mode 100644 index 0000000..a34d0dc --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.streaming + +import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } +import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream } +import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } + +object StreamingConversions { + + implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] = + new DataStream[R](javaStream) + + implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] = + new WindowedDataStream[R](javaWStream) + + implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] = + new SplitDataStream[R](javaStream) + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4ec0095/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala index e33368c..2f9c792 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala @@ -41,6 +41,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.streaming.api.function.aggregation.SumFunction import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean +import org.apache.flink.api.scala.streaming.StreamingConversions._ class WindowedDataStream[T](javaStream: JavaWStream[T]) { @@ -50,7 +51,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { * the window. */ def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = - new WindowedDataStream[T](javaStream.every(windowingHelper: _*)) + javaStream.every(windowingHelper: _*) /** * Groups the elements of the WindowedDataStream using the given @@ -61,8 +62,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { * DataStream.window(...) operator on an already grouped data stream. * */ - def groupBy(fields: Int*): WindowedDataStream[T] = - new WindowedDataStream[T](javaStream.groupBy(fields: _*)) + def groupBy(fields: Int*): WindowedDataStream[T] = javaStream.groupBy(fields: _*) /** * Groups the elements of the WindowedDataStream using the given @@ -74,8 +74,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { * */ def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] = - new WindowedDataStream[T](javaStream.groupBy( - firstField +: otherFields.toArray: _*)) + javaStream.groupBy(firstField +: otherFields.toArray: _*) /** * Groups the elements of the WindowedDataStream using the given @@ -92,7 +91,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { val cleanFun = clean(fun) def getKey(in: T) = cleanFun(in) } - new WindowedDataStream[T](javaStream.groupBy(keyExtractor)) + javaStream.groupBy(keyExtractor) } /** @@ -104,7 +103,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { if (reducer == null) { throw new NullPointerException("Reduce function must not be null.") } - new DataStream[T](javaStream.reduce(reducer)) + javaStream.reduce(reducer) } /** @@ -136,7 +135,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { if (reducer == null) { throw new NullPointerException("GroupReduce function must not be null.") } - new DataStream[R](javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]])) + javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]]) } /**
