Repository: flink
Updated Branches:
  refs/heads/master 85ac6d3d4 -> 086acf681


[streaming] [scala] Exposed environment from DataStream

This is needed for streaming library features, is identical to the batch API.

Closes #1480


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/086acf68
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/086acf68
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/086acf68

Branch: refs/heads/master
Commit: 086acf681f01f2da530c04289e0682c56f98a378
Parents: 85ac6d3
Author: Márton Balassi <mbala...@apache.org>
Authored: Wed Dec 30 17:31:37 2015 +0100
Committer: Márton Balassi <mbala...@apache.org>
Committed: Sat Jan 30 08:25:15 2016 +0100

----------------------------------------------------------------------
 .../scala/table/ScalaStreamingTranslator.scala  |   2 +-
 .../apache/flink/api/scala/table/package.scala  |   2 +-
 .../streaming/api/scala/CoGroupedStreams.scala  |   8 +-
 .../flink/streaming/api/scala/DataStream.scala  | 135 ++++++++++---------
 .../streaming/api/scala/JoinedStreams.scala     |  10 +-
 .../api/scala/AllWindowTranslationTest.scala    |  16 +--
 .../streaming/api/scala/DataStreamTest.scala    |   2 +-
 .../api/scala/StreamingOperatorsITCase.scala    |   4 +-
 .../StreamingScalaAPICompletenessTest.scala     |   1 -
 .../api/scala/WindowTranslationTest.scala       |  16 +--
 10 files changed, 101 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
index 88f1b83..86b9044 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
@@ -51,7 +51,7 @@ class ScalaStreamingTranslator extends PlanTranslator {
       resultFields: Seq[(String, TypeInformation[_])]): Table = {
 
     val result =
-      javaTranslator.createTable(repr.getJavaStream, inputType, expressions, 
resultFields)
+      javaTranslator.createTable(repr.javaStream, inputType, expressions, 
resultFields)
 
     new Table(result.operation)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
index e74651b..31373a3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
@@ -90,7 +90,7 @@ package object table extends ImplicitExpressionConversions {
       stream: DataStream[T]): DataStreamConversions[T] = {
     new DataStreamConversions[T](
       stream,
-      stream.getJavaStream.getType.asInstanceOf[CompositeType[T]])
+      stream.javaStream.getType.asInstanceOf[CompositeType[T]])
   }
 
   implicit def table2RowDataStream(

http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index e676f81..3c54e7e 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -97,7 +97,7 @@ object CoGroupedStreams {
      * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
      */
     private[flink] def clean[F <: AnyRef](f: F): F = {
-      new 
StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+      new 
StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
     }
   }
 
@@ -170,7 +170,7 @@ object CoGroupedStreams {
      * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
      */
     private[flink] def clean[F <: AnyRef](f: F): F = {
-      new 
StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+      new 
StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
     }
   }
 
@@ -270,7 +270,7 @@ object CoGroupedStreams {
      */
     def apply[T: TypeInformation](function: CoGroupFunction[T1, T2, T]): 
DataStream[T] = {
 
-      val coGroup = new JavaCoGroupedStreams[T1, T2](input1.getJavaStream, 
input2.getJavaStream)
+      val coGroup = new JavaCoGroupedStreams[T1, T2](input1.javaStream, 
input2.javaStream)
 
       coGroup
         .where(keySelector1)
@@ -286,7 +286,7 @@ object CoGroupedStreams {
      * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
      */
     private[flink] def clean[F <: AnyRef](f: F): F = {
-      new 
StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+      new 
StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 28edc2d..69a8dc5 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -39,33 +39,40 @@ import org.apache.flink.util.Collector
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
-class DataStream[T](javaStream: JavaStream[T]) {
+class DataStream[T](stream: JavaStream[T]) {
 
   /**
    * Gets the underlying java DataStream object.
    */
-  def getJavaStream: JavaStream[T] = javaStream
+  private[flink] def javaStream: JavaStream[T] = stream
+
+  /**
+    * Returns the [[StreamExecutionEnvironment]] associated with the current 
[[DataStream]].
+    * @return associated execution environment
+    */
+  def getExecutionEnvironment: StreamExecutionEnvironment =
+    new StreamExecutionEnvironment(stream.getExecutionEnvironment)
 
   /**
    * Returns the ID of the DataStream.
    *
    * @return ID of the DataStream
    */
-  def getId = javaStream.getId
+  def getId = stream.getId
 
   /**
    * Returns the TypeInformation for the elements of this DataStream.
    */
-  def getType(): TypeInformation[T] = javaStream.getType()
+  def getType(): TypeInformation[T] = stream.getType()
 
   /**
    * Sets the parallelism of this operation. This must be at least 1.
    */
   def setParallelism(parallelism: Int): DataStream[T] = {
-    javaStream match {
+    stream match {
       case ds: SingleOutputStreamOperator[_, _] => 
ds.setParallelism(parallelism)
       case _ =>
-        throw new UnsupportedOperationException("Operator " + 
javaStream.toString +  " cannot " +
+        throw new UnsupportedOperationException("Operator " + stream.toString 
+  " cannot " +
           "have " +
           "parallelism.")
     }
@@ -75,12 +82,12 @@ class DataStream[T](javaStream: JavaStream[T]) {
   /**
    * Returns the parallelism of this operation.
    */
-  def getParallelism = javaStream.getParallelism
+  def getParallelism = stream.getParallelism
 
   /**
    * Returns the execution config.
    */
-  def getExecutionConfig = javaStream.getExecutionConfig
+  def getExecutionConfig = stream.getExecutionConfig
 
   /**
    * Gets the name of the current data stream. This name is
@@ -88,7 +95,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    * @return Name of the stream.
    */
-  def getName : String = javaStream match {
+  def getName : String = stream match {
     case stream : SingleOutputStreamOperator[T,_] => stream.getName
     case _ => throw new
         UnsupportedOperationException("Only supported for operators.")
@@ -100,7 +107,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    * @return The named operator
    */
-  def name(name: String) : DataStream[T] = javaStream match {
+  def name(name: String) : DataStream[T] = stream match {
     case stream : SingleOutputStreamOperator[T,_] => stream.name(name)
     case _ => throw new UnsupportedOperationException("Only supported for 
operators.")
     this
@@ -132,7 +139,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def disableChaining(): DataStream[T] = {
-    javaStream match {
+    stream match {
       case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining();
       case _ =>
         throw new UnsupportedOperationException("Only supported for 
operators.")
@@ -147,7 +154,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def startNewChain(): DataStream[T] = {
-    javaStream match {
+    stream match {
       case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain();
       case _ =>
         throw new UnsupportedOperationException("Only supported for 
operators.")
@@ -163,7 +170,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def isolateResources(): DataStream[T] = {
-    javaStream match {
+    stream match {
       case ds: SingleOutputStreamOperator[_, _] => ds.isolateResources();
       case _ =>
         throw new UnsupportedOperationException("Only supported for 
operators.")
@@ -183,7 +190,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * default.
    */
   def startNewResourceGroup(): DataStream[T] = {
-    javaStream match {
+    stream match {
       case ds: SingleOutputStreamOperator[_, _] => ds.startNewResourceGroup();
       case _ =>
         throw new UnsupportedOperationException("Only supported for 
operators.")
@@ -200,7 +207,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * @return The operator with buffer timeout set.
    */
   def setBufferTimeout(timeoutMillis: Long): DataStream[T] = {
-    javaStream match {
+    stream match {
       case ds: SingleOutputStreamOperator[_, _] => 
ds.setBufferTimeout(timeoutMillis);
       case _ =>
         throw new UnsupportedOperationException("Only supported for 
operators.")
@@ -215,7 +222,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def union(dataStreams: DataStream[T]*): DataStream[T] =
-    javaStream.union(dataStreams.map(_.getJavaStream): _*)
+    stream.union(dataStreams.map(_.javaStream): _*)
 
   /**
    * Creates a new ConnectedStreams by connecting
@@ -223,21 +230,21 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * DataStreams connected using this operators can be used with CoFunctions.
    */
   def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] =
-    javaStream.connect(dataStream.getJavaStream)
-  
+    stream.connect(dataStream.javaStream)
+
   /**
    * Groups the elements of a DataStream by the given key positions (for 
tuple/array types) to
    * be used with grouped operators like grouped reduce or grouped 
aggregations.
    */
-  def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = 
javaStream.keyBy(fields: _*)
+  def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = stream.keyBy(fields: _*)
 
   /**
    * Groups the elements of a DataStream by the given field expressions to
    * be used with grouped operators like grouped reduce or grouped 
aggregations.
    */
   def keyBy(firstField: String, otherFields: String*): KeyedStream[T, 
JavaTuple] =
-   javaStream.keyBy(firstField +: otherFields.toArray: _*)   
-  
+   stream.keyBy(firstField +: otherFields.toArray: _*)
+
   /**
    * Groups the elements of a DataStream by the given K key to
    * be used with grouped operators like grouped reduce or grouped 
aggregations.
@@ -246,26 +253,26 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
     val cleanFun = clean(fun)
     val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]
-    
+
     val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
       def getKey(in: T) = cleanFun(in)
       override def getProducedType: TypeInformation[K] = keyType
     }
-    new JavaKeyedStream(javaStream, keyExtractor, keyType)
+    new JavaKeyedStream(stream, keyExtractor, keyType)
   }
 
   /**
    * Partitions the elements of a DataStream by the given key positions (for 
tuple/array types) to
    * be used with grouped operators like grouped reduce or grouped 
aggregations.
    */
-  def partitionByHash(fields: Int*): DataStream[T] = 
javaStream.partitionByHash(fields: _*)
+  def partitionByHash(fields: Int*): DataStream[T] = 
stream.partitionByHash(fields: _*)
 
   /**
    * Groups the elements of a DataStream by the given field expressions to
    * be used with grouped operators like grouped reduce or grouped 
aggregations.
    */
   def partitionByHash(firstField: String, otherFields: String*): DataStream[T] 
=
-    javaStream.partitionByHash(firstField +: otherFields.toArray: _*)
+    stream.partitionByHash(firstField +: otherFields.toArray: _*)
 
   /**
    * Groups the elements of a DataStream by the given K key to
@@ -278,7 +285,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
       def getKey(in: T) = cleanFun(in)
       override def getProducedType: TypeInformation[K] = 
implicitly[TypeInformation[K]]
     }
-    javaStream.partitionByHash(keyExtractor)
+    stream.partitionByHash(keyExtractor)
   }
 
   /**
@@ -289,7 +296,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Note: This method works only on single field keys.
    */
   def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: 
Int) : DataStream[T] =
-    javaStream.partitionCustom(partitioner, field)
+    stream.partitionCustom(partitioner, field)
 
   /**
    * Partitions a POJO DataStream on the specified key fields using a custom 
partitioner.
@@ -299,7 +306,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Note: This method works only on single field keys.
    */
   def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: 
String)
-  : DataStream[T] = javaStream.partitionCustom(partitioner, field)
+  : DataStream[T] = stream.partitionCustom(partitioner, field)
 
   /**
    * Partitions a DataStream on the key returned by the selector, using a 
custom partitioner.
@@ -316,7 +323,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
       def getKey(in: T) = cleanFun(in)
       override def getProducedType: TypeInformation[K] = 
implicitly[TypeInformation[K]]
     }
-    javaStream.partitionCustom(partitioner, keyExtractor)
+    stream.partitionCustom(partitioner, keyExtractor)
   }
 
   /**
@@ -326,14 +333,14 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * parallel instances of the next processing operator.
    *
    */
-  def broadcast: DataStream[T] = javaStream.broadcast()
+  def broadcast: DataStream[T] = stream.broadcast()
 
   /**
-   * Sets the partitioning of the DataStream so that the output values all go 
to 
+   * Sets the partitioning of the DataStream so that the output values all go 
to
    * the first instance of the next processing operator. Use this setting with 
care
    * since it might cause a serious performance bottleneck in the application.
    */
-  def global: DataStream[T] = javaStream.global()
+  def global: DataStream[T] = stream.global()
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
@@ -342,7 +349,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * processing operator.
    *
    */
-  def shuffle: DataStream[T] = javaStream.shuffle()
+  def shuffle: DataStream[T] = stream.shuffle()
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
@@ -352,7 +359,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * instances of the next processing operator.
    *
    */
-  def forward: DataStream[T] = javaStream.forward()
+  def forward: DataStream[T] = stream.forward()
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
@@ -361,7 +368,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * the next processing operator.
    *
    */
-  def rebalance: DataStream[T] = javaStream.rebalance()
+  def rebalance: DataStream[T] = stream.rebalance()
 
   /**
    * Initiates an iterative part of the program that creates a loop by feeding
@@ -379,17 +386,17 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * can use the maxWaitTime parameter to set a max waiting time for the 
iteration head.
    * If no data received in the set time the stream terminates.
    * <p>
-   * By default the feedback partitioning is set to match the input, to 
override this set 
+   * By default the feedback partitioning is set to match the input, to 
override this set
    * the keepPartitioning flag to true
    *
    */
   def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),
                     maxWaitTimeMillis:Long = 0,
                     keepPartitioning: Boolean = false) : DataStream[R] = {
-    val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
+    val iterativeStream = stream.iterate(maxWaitTimeMillis)
 
     val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
-    iterativeStream.closeWith(feedback.getJavaStream)
+    iterativeStream.closeWith(feedback.javaStream)
     output
   }
 
@@ -416,11 +423,11 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def iterate[R, F: TypeInformation: ClassTag](stepFunction: 
ConnectedStreams[T, F] =>
     (DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = {
     val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]]
-    val connectedIterativeStream = javaStream.iterate(maxWaitTimeMillis).
+    val connectedIterativeStream = stream.iterate(maxWaitTimeMillis).
                                    withFeedbackType(feedbackType)
 
     val (feedback, output) = stepFunction(connectedIterativeStream)
-    connectedIterativeStream.closeWith(feedback.getJavaStream)
+    connectedIterativeStream.closeWith(feedback.javaStream)
     output
   }
 
@@ -448,9 +455,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
     }
 
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
-    javaStream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]]
+    stream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]]
   }
-  
+
   /**
    * Creates a new DataStream by applying the given function to every element 
and flattening
    * the results.
@@ -461,7 +468,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
     }
 
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
-    javaStream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]]
+    stream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]]
   }
 
   /**
@@ -501,7 +508,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
     if (filter == null) {
       throw new NullPointerException("Filter function must not be null.")
     }
-    javaStream.filter(filter)
+    stream.filter(filter)
   }
 
   /**
@@ -567,7 +574,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * @param slide The slide interval in number of elements.
    */
   def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, 
GlobalWindow] = {
-    new AllWindowedStream(javaStream.countWindowAll(size, slide))
+    new AllWindowedStream(stream.countWindowAll(size, slide))
   }
 
   /**
@@ -580,7 +587,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * @param size The size of the windows in number of elements.
    */
   def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow] = {
-    new AllWindowedStream(javaStream.countWindowAll(size))
+    new AllWindowedStream(stream.countWindowAll(size))
   }
 
   /**
@@ -600,7 +607,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * @return The trigger windows data stream.
    */
   def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): 
AllWindowedStream[T, W] = {
-    new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](javaStream, 
assigner))
+    new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](stream, 
assigner))
   }
   /**
    * Extracts a timestamp from an element and assigns it as the internal 
timestamp of that element.
@@ -614,7 +621,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * @see org.apache.flink.streaming.api.watermark.Watermark
    */
   def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = {
-    javaStream.assignTimestamps(clean(extractor))
+    stream.assignTimestamps(clean(extractor))
   }
 
   /**
@@ -635,7 +642,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
         cleanExtractor(element)
       }
     }
-    javaStream.assignTimestamps(extractorFunction)
+    stream.assignTimestamps(extractorFunction)
   }
 
   /**
@@ -644,7 +651,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * OutputSelector. Calling this method on an operator creates a new
    * [[SplitStream]].
    */
-  def split(selector: OutputSelector[T]): SplitStream[T] = 
javaStream.split(selector)
+  def split(selector: OutputSelector[T]): SplitStream[T] = 
stream.split(selector)
 
   /**
    * Creates a new [[SplitStream]] that contains only the elements satisfying 
the
@@ -685,7 +692,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * written.
    *
    */
-  def print(): DataStreamSink[T] = javaStream.print()
+  def print(): DataStreamSink[T] = stream.print()
 
   /**
    * Writes a DataStream to the standard output stream (stderr).
@@ -695,7 +702,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    * @return The closed DataStream.
    */
-  def printToErr() = javaStream.printToErr()
+  def printToErr() = stream.printToErr()
 
   /**
     * Writes a DataStream to the file specified by path in text format. For
@@ -705,7 +712,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
     * @return The closed DataStream
     */
   def writeAsText(path: String): DataStreamSink[T] =
-    javaStream.writeAsText(path, 0L)
+    stream.writeAsText(path, 0L)
 
   /**
    * Writes a DataStream to the file specified by path in text format. The
@@ -718,7 +725,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * @return The closed DataStream
    */
   def writeAsText(path: String, millis: Long): DataStreamSink[T] =
-    javaStream.writeAsText(path, millis)
+    stream.writeAsText(path, millis)
 
   /**
     * Writes a DataStream to the file specified by path in text format. For
@@ -731,9 +738,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
     */
   def writeAsText(path: String, writeMode: FileSystem.WriteMode): 
DataStreamSink[T] = {
     if (writeMode != null) {
-      javaStream.writeAsText(path, writeMode)
+      stream.writeAsText(path, writeMode)
     } else {
-      javaStream.writeAsText(path)
+      stream.writeAsText(path)
     }
   }
 
@@ -754,9 +761,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
       millis: Long)
     : DataStreamSink[T] = {
     if (writeMode != null) {
-      javaStream.writeAsText(path, writeMode, millis)
+      stream.writeAsText(path, writeMode, millis)
     } else {
-      javaStream.writeAsText(path, millis)
+      stream.writeAsText(path, millis)
     }
   }
 
@@ -846,12 +853,12 @@ class DataStream[T](javaStream: JavaStream[T]) {
       rowDelimiter: String,
       fieldDelimiter: String)
     : DataStreamSink[T] = {
-    require(javaStream.getType.isTupleType, "CSV output can only be used with 
Tuple DataSets.")
+    require(stream.getType.isTupleType, "CSV output can only be used with 
Tuple DataSets.")
     val of = new ScalaCsvOutputFormat[Product](new Path(path), rowDelimiter, 
fieldDelimiter)
     if (writeMode != null) {
       of.setWriteMode(writeMode)
     }
-    javaStream.write(of.asInstanceOf[OutputFormat[T]], millis)
+    stream.write(of.asInstanceOf[OutputFormat[T]], millis)
   }
 
   /**
@@ -859,7 +866,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * writing is performed periodically, in every millis milliseconds.
    */
   def write(format: OutputFormat[T], millis: Long): DataStreamSink[T] = {
-    javaStream.write(format, millis)
+    stream.write(format, millis)
   }
 
   /**
@@ -870,7 +877,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
       hostname: String,
       port: Integer,
       schema: SerializationSchema[T]): DataStreamSink[T] = {
-    javaStream.writeToSocket(hostname, port, schema)
+    stream.writeToSocket(hostname, port, schema)
   }
 
   /**
@@ -880,7 +887,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T] =
-    javaStream.addSink(sinkFunction)
+    stream.addSink(sinkFunction)
 
   /**
    * Adds the given sink to this DataStream. Only streams with sinks added
@@ -904,7 +911,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
    */
   private[flink] def clean[F <: AnyRef](f: F): F = {
-    new 
StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
+    new 
StreamExecutionEnvironment(stream.getExecutionEnvironment).scalaClean(f)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index c259724..f7bc570 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -95,7 +95,7 @@ object JoinedStreams {
      * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
      */
     private[flink] def clean[F <: AnyRef](f: F): F = {
-      new 
StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+      new 
StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
     }
   }
 
@@ -168,7 +168,7 @@ object JoinedStreams {
      * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
      */
     private[flink] def clean[F <: AnyRef](f: F): F = {
-      new 
StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+      new 
StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
     }
   }
 
@@ -263,7 +263,7 @@ object JoinedStreams {
      */
     def apply[T: TypeInformation](function: JoinFunction[T1, T2, T]): 
DataStream[T] = {
 
-      val join = new JavaJoinedStreams[T1, T2](input1.getJavaStream, 
input2.getJavaStream)
+      val join = new JavaJoinedStreams[T1, T2](input1.javaStream, 
input2.javaStream)
 
       join
         .where(keySelector1)
@@ -280,7 +280,7 @@ object JoinedStreams {
      */
     def apply[T: TypeInformation](function: FlatJoinFunction[T1, T2, T]): 
DataStream[T] = {
 
-      val join = new JavaJoinedStreams[T1, T2](input1.getJavaStream, 
input2.getJavaStream)
+      val join = new JavaJoinedStreams[T1, T2](input1.javaStream, 
input2.javaStream)
 
       join
         .where(keySelector1)
@@ -296,7 +296,7 @@ object JoinedStreams {
      * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
      */
     private[flink] def clean[F <: AnyRef](f: F): F = {
-      new 
StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
+      new 
StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 7da7bc3..4ec8f81 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -63,7 +63,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         Time.of(100, TimeUnit.MILLISECONDS)))
       .reduce(reducer)
 
-    val transform1 = window1.getJavaStream.getTransformation
+    val transform1 = window1.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator1 = transform1.getOperator
@@ -82,7 +82,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                     out: Collector[(String, Int)]) { }
     })
 
-    val transform2 = window2.getJavaStream.getTransformation
+    val transform2 = window2.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator2 = transform2.getOperator
@@ -105,7 +105,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .trigger(CountTrigger.of(100))
       .reduce(reducer)
 
-    val transform1 = window1.getJavaStream.getTransformation
+    val transform1 = window1.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator1 = transform1.getOperator
@@ -128,7 +128,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                     out: Collector[(String, Int)]) { }
     })
 
-    val transform2 = window2.getJavaStream.getTransformation
+    val transform2 = window2.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator2 = transform2.getOperator
@@ -155,7 +155,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
       .reduce(reducer)
 
-    val transform1 = window1.getJavaStream.getTransformation
+    val transform1 = window1.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator1 = transform1.getOperator
@@ -179,7 +179,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                     out: Collector[(String, Int)]) { }
     })
 
-    val transform2 = window2.getJavaStream.getTransformation
+    val transform2 = window2.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator2 = transform2.getOperator
@@ -214,7 +214,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                    out: Collector[(String, Int)]) { }
       })
 
-    val transform1 = window1.getJavaStream.getTransformation
+    val transform1 = window1.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator1 = transform1.getOperator
@@ -239,7 +239,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                    out: Collector[(String, Int)]) { }
       })
 
-    val transform2 = window2.getJavaStream.getTransformation
+    val transform2 = window2.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator2 = transform2.getOperator

http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 66e10ed..0b4eb86 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -516,7 +516,7 @@ class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
 
   private def getOperatorForDataStream(dataStream: DataStream[_]): 
StreamOperator[_] = {
     dataStream.print()
-    val env = dataStream.getJavaStream.getExecutionEnvironment
+    val env = dataStream.javaStream.getExecutionEnvironment
     val streamGraph: StreamGraph = env.getStreamGraph
     streamGraph.getStreamNode(dataStream.getId).getOperator
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
index 2131026..60a02e7 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -98,12 +98,12 @@ class StreamingOperatorsITCase extends 
ScalaStreamingMultipleProgramsTestBase {
     splittedResult
       .select("0")
       .map(_._2)
-      .getJavaStream
+      .javaStream
       .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE)
     splittedResult
       .select("1")
       .map(_._2)
-      .getJavaStream
+      .javaStream
       .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE)
 
     val groupedSequence = 0 until numElements groupBy( _ % numKeys)

http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index d584f07..7ba3194 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -37,7 +37,6 @@ class StreamingScalaAPICompletenessTest extends 
ScalaAPICompletenessTestBase {
     val excludedNames = Seq(
       // These are only used internally. Should be internal API but Java 
doesn't have
       // private[flink].
-      
"org.apache.flink.streaming.api.datastream.DataStream.getExecutionEnvironment",
       "org.apache.flink.streaming.api.datastream.DataStream.getType",
       "org.apache.flink.streaming.api.datastream.DataStream.copy",
       "org.apache.flink.streaming.api.datastream.DataStream.transform",

http://git-wip-us.apache.org/repos/asf/flink/blob/086acf68/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 46981ab..90cce66 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -57,7 +57,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         Time.of(100, TimeUnit.MILLISECONDS)))
       .reduce(reducer)
 
-    val transform1 = window1.getJavaStream.getTransformation
+    val transform1 = window1.javaStream.getTransformation
         .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
     
     val operator1 = transform1.getOperator
@@ -77,7 +77,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
             out: Collector[(String, Int)]) { }
       })
 
-    val transform2 = window2.getJavaStream.getTransformation
+    val transform2 = window2.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator2 = transform2.getOperator
@@ -101,7 +101,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .trigger(CountTrigger.of(100))
       .reduce(reducer)
 
-    val transform1 = window1.getJavaStream.getTransformation
+    val transform1 = window1.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator1 = transform1.getOperator
@@ -126,7 +126,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                     out: Collector[(String, Int)]) { }
     })
 
-    val transform2 = window2.getJavaStream.getTransformation
+    val transform2 = window2.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator2 = transform2.getOperator
@@ -154,7 +154,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
       .reduce(reducer)
 
-    val transform1 = window1.getJavaStream.getTransformation
+    val transform1 = window1.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator1 = transform1.getOperator
@@ -180,7 +180,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                     out: Collector[(String, Int)]) { }
     })
 
-    val transform2 = window2.getJavaStream.getTransformation
+    val transform2 = window2.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator2 = transform2.getOperator
@@ -215,7 +215,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                    out: Collector[(String, Int)]) { }
       })
 
-    val transform1 = window1.getJavaStream.getTransformation
+    val transform1 = window1.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator1 = transform1.getOperator
@@ -240,7 +240,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                    out: Collector[(String, Int)]) { }
       })
 
-    val transform2 = window2.getJavaStream.getTransformation
+    val transform2 = window2.javaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
 
     val operator2 = transform2.getOperator

Reply via email to