Repository: flink Updated Branches: refs/heads/master 85ac6d3d4 -> 086acf681
[streaming] [scala] Exposed environment from DataStream This is needed for streaming library features, is identical to the batch API. Closes #1480 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/086acf68 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/086acf68 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/086acf68 Branch: refs/heads/master Commit: 086acf681f01f2da530c04289e0682c56f98a378 Parents: 85ac6d3 Author: Márton Balassi <mbala...@apache.org> Authored: Wed Dec 30 17:31:37 2015 +0100 Committer: Márton Balassi <mbala...@apache.org> Committed: Sat Jan 30 08:25:15 2016 +0100 ---------------------------------------------------------------------- .../scala/table/ScalaStreamingTranslator.scala | 2 +- .../apache/flink/api/scala/table/package.scala | 2 +- .../streaming/api/scala/CoGroupedStreams.scala | 8 +- .../flink/streaming/api/scala/DataStream.scala | 135 ++++++++++--------- .../streaming/api/scala/JoinedStreams.scala | 10 +- .../api/scala/AllWindowTranslationTest.scala | 16 +-- .../streaming/api/scala/DataStreamTest.scala | 2 +- .../api/scala/StreamingOperatorsITCase.scala | 4 +- .../StreamingScalaAPICompletenessTest.scala | 1 - .../api/scala/WindowTranslationTest.scala | 16 +-- 10 files changed, 101 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala index 88f1b83..86b9044 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala @@ -51,7 +51,7 @@ class ScalaStreamingTranslator extends PlanTranslator { resultFields: Seq[(String, TypeInformation[_])]): Table = { val result = - javaTranslator.createTable(repr.getJavaStream, inputType, expressions, resultFields) + javaTranslator.createTable(repr.javaStream, inputType, expressions, resultFields) new Table(result.operation) } http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala index e74651b..31373a3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala @@ -90,7 +90,7 @@ package object table extends ImplicitExpressionConversions { stream: DataStream[T]): DataStreamConversions[T] = { new DataStreamConversions[T]( stream, - stream.getJavaStream.getType.asInstanceOf[CompositeType[T]]) + stream.javaStream.getType.asInstanceOf[CompositeType[T]]) } implicit def table2RowDataStream( http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala index e676f81..3c54e7e 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala @@ -97,7 +97,7 @@ object CoGroupedStreams { * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. */ private[flink] def clean[F <: AnyRef](f: F): F = { - new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f) + new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f) } } @@ -170,7 +170,7 @@ object CoGroupedStreams { * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. */ private[flink] def clean[F <: AnyRef](f: F): F = { - new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f) + new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f) } } @@ -270,7 +270,7 @@ object CoGroupedStreams { */ def apply[T: TypeInformation](function: CoGroupFunction[T1, T2, T]): DataStream[T] = { - val coGroup = new JavaCoGroupedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream) + val coGroup = new JavaCoGroupedStreams[T1, T2](input1.javaStream, input2.javaStream) coGroup .where(keySelector1) @@ -286,7 +286,7 @@ object CoGroupedStreams { * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. */ private[flink] def clean[F <: AnyRef](f: F): F = { - new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f) + new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f) } } http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 28edc2d..69a8dc5 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -39,33 +39,40 @@ import org.apache.flink.util.Collector import scala.collection.JavaConverters._ import scala.reflect.ClassTag -class DataStream[T](javaStream: JavaStream[T]) { +class DataStream[T](stream: JavaStream[T]) { /** * Gets the underlying java DataStream object. */ - def getJavaStream: JavaStream[T] = javaStream + private[flink] def javaStream: JavaStream[T] = stream + + /** + * Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]]. + * @return associated execution environment + */ + def getExecutionEnvironment: StreamExecutionEnvironment = + new StreamExecutionEnvironment(stream.getExecutionEnvironment) /** * Returns the ID of the DataStream. * * @return ID of the DataStream */ - def getId = javaStream.getId + def getId = stream.getId /** * Returns the TypeInformation for the elements of this DataStream. */ - def getType(): TypeInformation[T] = javaStream.getType() + def getType(): TypeInformation[T] = stream.getType() /** * Sets the parallelism of this operation. This must be at least 1. */ def setParallelism(parallelism: Int): DataStream[T] = { - javaStream match { + stream match { case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(parallelism) case _ => - throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot " + + throw new UnsupportedOperationException("Operator " + stream.toString + " cannot " + "have " + "parallelism.") } @@ -75,12 +82,12 @@ class DataStream[T](javaStream: JavaStream[T]) { /** * Returns the parallelism of this operation. */ - def getParallelism = javaStream.getParallelism + def getParallelism = stream.getParallelism /** * Returns the execution config. */ - def getExecutionConfig = javaStream.getExecutionConfig + def getExecutionConfig = stream.getExecutionConfig /** * Gets the name of the current data stream. This name is @@ -88,7 +95,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * * @return Name of the stream. */ - def getName : String = javaStream match { + def getName : String = stream match { case stream : SingleOutputStreamOperator[T,_] => stream.getName case _ => throw new UnsupportedOperationException("Only supported for operators.") @@ -100,7 +107,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * * @return The named operator */ - def name(name: String) : DataStream[T] = javaStream match { + def name(name: String) : DataStream[T] = stream match { case stream : SingleOutputStreamOperator[T,_] => stream.name(name) case _ => throw new UnsupportedOperationException("Only supported for operators.") this @@ -132,7 +139,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def disableChaining(): DataStream[T] = { - javaStream match { + stream match { case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining(); case _ => throw new UnsupportedOperationException("Only supported for operators.") @@ -147,7 +154,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def startNewChain(): DataStream[T] = { - javaStream match { + stream match { case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain(); case _ => throw new UnsupportedOperationException("Only supported for operators.") @@ -163,7 +170,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def isolateResources(): DataStream[T] = { - javaStream match { + stream match { case ds: SingleOutputStreamOperator[_, _] => ds.isolateResources(); case _ => throw new UnsupportedOperationException("Only supported for operators.") @@ -183,7 +190,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * default. */ def startNewResourceGroup(): DataStream[T] = { - javaStream match { + stream match { case ds: SingleOutputStreamOperator[_, _] => ds.startNewResourceGroup(); case _ => throw new UnsupportedOperationException("Only supported for operators.") @@ -200,7 +207,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * @return The operator with buffer timeout set. */ def setBufferTimeout(timeoutMillis: Long): DataStream[T] = { - javaStream match { + stream match { case ds: SingleOutputStreamOperator[_, _] => ds.setBufferTimeout(timeoutMillis); case _ => throw new UnsupportedOperationException("Only supported for operators.") @@ -215,7 +222,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def union(dataStreams: DataStream[T]*): DataStream[T] = - javaStream.union(dataStreams.map(_.getJavaStream): _*) + stream.union(dataStreams.map(_.javaStream): _*) /** * Creates a new ConnectedStreams by connecting @@ -223,21 +230,21 @@ class DataStream[T](javaStream: JavaStream[T]) { * DataStreams connected using this operators can be used with CoFunctions. */ def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] = - javaStream.connect(dataStream.getJavaStream) - + stream.connect(dataStream.javaStream) + /** * Groups the elements of a DataStream by the given key positions (for tuple/array types) to * be used with grouped operators like grouped reduce or grouped aggregations. */ - def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = javaStream.keyBy(fields: _*) + def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = stream.keyBy(fields: _*) /** * Groups the elements of a DataStream by the given field expressions to * be used with grouped operators like grouped reduce or grouped aggregations. */ def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple] = - javaStream.keyBy(firstField +: otherFields.toArray: _*) - + stream.keyBy(firstField +: otherFields.toArray: _*) + /** * Groups the elements of a DataStream by the given K key to * be used with grouped operators like grouped reduce or grouped aggregations. @@ -246,26 +253,26 @@ class DataStream[T](javaStream: JavaStream[T]) { val cleanFun = clean(fun) val keyType: TypeInformation[K] = implicitly[TypeInformation[K]] - + val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] { def getKey(in: T) = cleanFun(in) override def getProducedType: TypeInformation[K] = keyType } - new JavaKeyedStream(javaStream, keyExtractor, keyType) + new JavaKeyedStream(stream, keyExtractor, keyType) } /** * Partitions the elements of a DataStream by the given key positions (for tuple/array types) to * be used with grouped operators like grouped reduce or grouped aggregations. */ - def partitionByHash(fields: Int*): DataStream[T] = javaStream.partitionByHash(fields: _*) + def partitionByHash(fields: Int*): DataStream[T] = stream.partitionByHash(fields: _*) /** * Groups the elements of a DataStream by the given field expressions to * be used with grouped operators like grouped reduce or grouped aggregations. */ def partitionByHash(firstField: String, otherFields: String*): DataStream[T] = - javaStream.partitionByHash(firstField +: otherFields.toArray: _*) + stream.partitionByHash(firstField +: otherFields.toArray: _*) /** * Groups the elements of a DataStream by the given K key to @@ -278,7 +285,7 @@ class DataStream[T](javaStream: JavaStream[T]) { def getKey(in: T) = cleanFun(in) override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]] } - javaStream.partitionByHash(keyExtractor) + stream.partitionByHash(keyExtractor) } /** @@ -289,7 +296,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * Note: This method works only on single field keys. */ def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] = - javaStream.partitionCustom(partitioner, field) + stream.partitionCustom(partitioner, field) /** * Partitions a POJO DataStream on the specified key fields using a custom partitioner. @@ -299,7 +306,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * Note: This method works only on single field keys. */ def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String) - : DataStream[T] = javaStream.partitionCustom(partitioner, field) + : DataStream[T] = stream.partitionCustom(partitioner, field) /** * Partitions a DataStream on the key returned by the selector, using a custom partitioner. @@ -316,7 +323,7 @@ class DataStream[T](javaStream: JavaStream[T]) { def getKey(in: T) = cleanFun(in) override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]] } - javaStream.partitionCustom(partitioner, keyExtractor) + stream.partitionCustom(partitioner, keyExtractor) } /** @@ -326,14 +333,14 @@ class DataStream[T](javaStream: JavaStream[T]) { * parallel instances of the next processing operator. * */ - def broadcast: DataStream[T] = javaStream.broadcast() + def broadcast: DataStream[T] = stream.broadcast() /** - * Sets the partitioning of the DataStream so that the output values all go to + * Sets the partitioning of the DataStream so that the output values all go to * the first instance of the next processing operator. Use this setting with care * since it might cause a serious performance bottleneck in the application. */ - def global: DataStream[T] = javaStream.global() + def global: DataStream[T] = stream.global() /** * Sets the partitioning of the DataStream so that the output tuples @@ -342,7 +349,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * processing operator. * */ - def shuffle: DataStream[T] = javaStream.shuffle() + def shuffle: DataStream[T] = stream.shuffle() /** * Sets the partitioning of the DataStream so that the output tuples @@ -352,7 +359,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * instances of the next processing operator. * */ - def forward: DataStream[T] = javaStream.forward() + def forward: DataStream[T] = stream.forward() /** * Sets the partitioning of the DataStream so that the output tuples @@ -361,7 +368,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * the next processing operator. * */ - def rebalance: DataStream[T] = javaStream.rebalance() + def rebalance: DataStream[T] = stream.rebalance() /** * Initiates an iterative part of the program that creates a loop by feeding @@ -379,17 +386,17 @@ class DataStream[T](javaStream: JavaStream[T]) { * can use the maxWaitTime parameter to set a max waiting time for the iteration head. * If no data received in the set time the stream terminates. * <p> - * By default the feedback partitioning is set to match the input, to override this set + * By default the feedback partitioning is set to match the input, to override this set * the keepPartitioning flag to true * */ def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), maxWaitTimeMillis:Long = 0, keepPartitioning: Boolean = false) : DataStream[R] = { - val iterativeStream = javaStream.iterate(maxWaitTimeMillis) + val iterativeStream = stream.iterate(maxWaitTimeMillis) val (feedback, output) = stepFunction(new DataStream[T](iterativeStream)) - iterativeStream.closeWith(feedback.getJavaStream) + iterativeStream.closeWith(feedback.javaStream) output } @@ -416,11 +423,11 @@ class DataStream[T](javaStream: JavaStream[T]) { def iterate[R, F: TypeInformation: ClassTag](stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = { val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]] - val connectedIterativeStream = javaStream.iterate(maxWaitTimeMillis). + val connectedIterativeStream = stream.iterate(maxWaitTimeMillis). withFeedbackType(feedbackType) val (feedback, output) = stepFunction(connectedIterativeStream) - connectedIterativeStream.closeWith(feedback.getJavaStream) + connectedIterativeStream.closeWith(feedback.javaStream) output } @@ -448,9 +455,9 @@ class DataStream[T](javaStream: JavaStream[T]) { } val outType : TypeInformation[R] = implicitly[TypeInformation[R]] - javaStream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]] + stream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]] } - + /** * Creates a new DataStream by applying the given function to every element and flattening * the results. @@ -461,7 +468,7 @@ class DataStream[T](javaStream: JavaStream[T]) { } val outType : TypeInformation[R] = implicitly[TypeInformation[R]] - javaStream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]] + stream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]] } /** @@ -501,7 +508,7 @@ class DataStream[T](javaStream: JavaStream[T]) { if (filter == null) { throw new NullPointerException("Filter function must not be null.") } - javaStream.filter(filter) + stream.filter(filter) } /** @@ -567,7 +574,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * @param slide The slide interval in number of elements. */ def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow] = { - new AllWindowedStream(javaStream.countWindowAll(size, slide)) + new AllWindowedStream(stream.countWindowAll(size, slide)) } /** @@ -580,7 +587,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * @param size The size of the windows in number of elements. */ def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow] = { - new AllWindowedStream(javaStream.countWindowAll(size)) + new AllWindowedStream(stream.countWindowAll(size)) } /** @@ -600,7 +607,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * @return The trigger windows data stream. */ def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W] = { - new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](javaStream, assigner)) + new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](stream, assigner)) } /** * Extracts a timestamp from an element and assigns it as the internal timestamp of that element. @@ -614,7 +621,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * @see org.apache.flink.streaming.api.watermark.Watermark */ def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = { - javaStream.assignTimestamps(clean(extractor)) + stream.assignTimestamps(clean(extractor)) } /** @@ -635,7 +642,7 @@ class DataStream[T](javaStream: JavaStream[T]) { cleanExtractor(element) } } - javaStream.assignTimestamps(extractorFunction) + stream.assignTimestamps(extractorFunction) } /** @@ -644,7 +651,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * OutputSelector. Calling this method on an operator creates a new * [[SplitStream]]. */ - def split(selector: OutputSelector[T]): SplitStream[T] = javaStream.split(selector) + def split(selector: OutputSelector[T]): SplitStream[T] = stream.split(selector) /** * Creates a new [[SplitStream]] that contains only the elements satisfying the @@ -685,7 +692,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * written. * */ - def print(): DataStreamSink[T] = javaStream.print() + def print(): DataStreamSink[T] = stream.print() /** * Writes a DataStream to the standard output stream (stderr). @@ -695,7 +702,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * * @return The closed DataStream. */ - def printToErr() = javaStream.printToErr() + def printToErr() = stream.printToErr() /** * Writes a DataStream to the file specified by path in text format. For @@ -705,7 +712,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * @return The closed DataStream */ def writeAsText(path: String): DataStreamSink[T] = - javaStream.writeAsText(path, 0L) + stream.writeAsText(path, 0L) /** * Writes a DataStream to the file specified by path in text format. The @@ -718,7 +725,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * @return The closed DataStream */ def writeAsText(path: String, millis: Long): DataStreamSink[T] = - javaStream.writeAsText(path, millis) + stream.writeAsText(path, millis) /** * Writes a DataStream to the file specified by path in text format. For @@ -731,9 +738,9 @@ class DataStream[T](javaStream: JavaStream[T]) { */ def writeAsText(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T] = { if (writeMode != null) { - javaStream.writeAsText(path, writeMode) + stream.writeAsText(path, writeMode) } else { - javaStream.writeAsText(path) + stream.writeAsText(path) } } @@ -754,9 +761,9 @@ class DataStream[T](javaStream: JavaStream[T]) { millis: Long) : DataStreamSink[T] = { if (writeMode != null) { - javaStream.writeAsText(path, writeMode, millis) + stream.writeAsText(path, writeMode, millis) } else { - javaStream.writeAsText(path, millis) + stream.writeAsText(path, millis) } } @@ -846,12 +853,12 @@ class DataStream[T](javaStream: JavaStream[T]) { rowDelimiter: String, fieldDelimiter: String) : DataStreamSink[T] = { - require(javaStream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.") + require(stream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.") val of = new ScalaCsvOutputFormat[Product](new Path(path), rowDelimiter, fieldDelimiter) if (writeMode != null) { of.setWriteMode(writeMode) } - javaStream.write(of.asInstanceOf[OutputFormat[T]], millis) + stream.write(of.asInstanceOf[OutputFormat[T]], millis) } /** @@ -859,7 +866,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * writing is performed periodically, in every millis milliseconds. */ def write(format: OutputFormat[T], millis: Long): DataStreamSink[T] = { - javaStream.write(format, millis) + stream.write(format, millis) } /** @@ -870,7 +877,7 @@ class DataStream[T](javaStream: JavaStream[T]) { hostname: String, port: Integer, schema: SerializationSchema[T]): DataStreamSink[T] = { - javaStream.writeToSocket(hostname, port, schema) + stream.writeToSocket(hostname, port, schema) } /** @@ -880,7 +887,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T] = - javaStream.addSink(sinkFunction) + stream.addSink(sinkFunction) /** * Adds the given sink to this DataStream. Only streams with sinks added @@ -904,7 +911,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. */ private[flink] def clean[F <: AnyRef](f: F): F = { - new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f) + new StreamExecutionEnvironment(stream.getExecutionEnvironment).scalaClean(f) } } http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala index c259724..f7bc570 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala @@ -95,7 +95,7 @@ object JoinedStreams { * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. */ private[flink] def clean[F <: AnyRef](f: F): F = { - new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f) + new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f) } } @@ -168,7 +168,7 @@ object JoinedStreams { * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. */ private[flink] def clean[F <: AnyRef](f: F): F = { - new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f) + new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f) } } @@ -263,7 +263,7 @@ object JoinedStreams { */ def apply[T: TypeInformation](function: JoinFunction[T1, T2, T]): DataStream[T] = { - val join = new JavaJoinedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream) + val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream) join .where(keySelector1) @@ -280,7 +280,7 @@ object JoinedStreams { */ def apply[T: TypeInformation](function: FlatJoinFunction[T1, T2, T]): DataStream[T] = { - val join = new JavaJoinedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream) + val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream) join .where(keySelector1) @@ -296,7 +296,7 @@ object JoinedStreams { * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. */ private[flink] def clean[F <: AnyRef](f: F): F = { - new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f) + new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f) } } http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index 7da7bc3..4ec8f81 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -63,7 +63,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { Time.of(100, TimeUnit.MILLISECONDS))) .reduce(reducer) - val transform1 = window1.getJavaStream.getTransformation + val transform1 = window1.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator1 = transform1.getOperator @@ -82,7 +82,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { out: Collector[(String, Int)]) { } }) - val transform2 = window2.getJavaStream.getTransformation + val transform2 = window2.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator2 = transform2.getOperator @@ -105,7 +105,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { .trigger(CountTrigger.of(100)) .reduce(reducer) - val transform1 = window1.getJavaStream.getTransformation + val transform1 = window1.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator1 = transform1.getOperator @@ -128,7 +128,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { out: Collector[(String, Int)]) { } }) - val transform2 = window2.getJavaStream.getTransformation + val transform2 = window2.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator2 = transform2.getOperator @@ -155,7 +155,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS))) .reduce(reducer) - val transform1 = window1.getJavaStream.getTransformation + val transform1 = window1.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator1 = transform1.getOperator @@ -179,7 +179,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { out: Collector[(String, Int)]) { } }) - val transform2 = window2.getJavaStream.getTransformation + val transform2 = window2.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator2 = transform2.getOperator @@ -214,7 +214,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { out: Collector[(String, Int)]) { } }) - val transform1 = window1.getJavaStream.getTransformation + val transform1 = window1.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator1 = transform1.getOperator @@ -239,7 +239,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { out: Collector[(String, Int)]) { } }) - val transform2 = window2.getJavaStream.getTransformation + val transform2 = window2.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator2 = transform2.getOperator http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index 66e10ed..0b4eb86 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -516,7 +516,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { private def getOperatorForDataStream(dataStream: DataStream[_]): StreamOperator[_] = { dataStream.print() - val env = dataStream.getJavaStream.getExecutionEnvironment + val env = dataStream.javaStream.getExecutionEnvironment val streamGraph: StreamGraph = env.getStreamGraph streamGraph.getStreamNode(dataStream.getId).getOperator } http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala index 2131026..60a02e7 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala @@ -98,12 +98,12 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { splittedResult .select("0") .map(_._2) - .getJavaStream + .javaStream .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE) splittedResult .select("1") .map(_._2) - .getJavaStream + .javaStream .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE) val groupedSequence = 0 until numElements groupBy( _ % numKeys) http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala index d584f07..7ba3194 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala @@ -37,7 +37,6 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { val excludedNames = Seq( // These are only used internally. Should be internal API but Java doesn't have // private[flink]. - "org.apache.flink.streaming.api.datastream.DataStream.getExecutionEnvironment", "org.apache.flink.streaming.api.datastream.DataStream.getType", "org.apache.flink.streaming.api.datastream.DataStream.copy", "org.apache.flink.streaming.api.datastream.DataStream.transform", http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index 46981ab..90cce66 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -57,7 +57,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Time.of(100, TimeUnit.MILLISECONDS))) .reduce(reducer) - val transform1 = window1.getJavaStream.getTransformation + val transform1 = window1.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator1 = transform1.getOperator @@ -77,7 +77,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { out: Collector[(String, Int)]) { } }) - val transform2 = window2.getJavaStream.getTransformation + val transform2 = window2.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator2 = transform2.getOperator @@ -101,7 +101,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .trigger(CountTrigger.of(100)) .reduce(reducer) - val transform1 = window1.getJavaStream.getTransformation + val transform1 = window1.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator1 = transform1.getOperator @@ -126,7 +126,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { out: Collector[(String, Int)]) { } }) - val transform2 = window2.getJavaStream.getTransformation + val transform2 = window2.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator2 = transform2.getOperator @@ -154,7 +154,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS))) .reduce(reducer) - val transform1 = window1.getJavaStream.getTransformation + val transform1 = window1.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator1 = transform1.getOperator @@ -180,7 +180,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { out: Collector[(String, Int)]) { } }) - val transform2 = window2.getJavaStream.getTransformation + val transform2 = window2.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator2 = transform2.getOperator @@ -215,7 +215,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { out: Collector[(String, Int)]) { } }) - val transform1 = window1.getJavaStream.getTransformation + val transform1 = window1.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator1 = transform1.getOperator @@ -240,7 +240,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { out: Collector[(String, Int)]) { } }) - val transform2 = window2.getJavaStream.getTransformation + val transform2 = window2.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] val operator2 = transform2.getOperator