http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 736c41b..04c1980 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, 
MapFunction, Partitioner}
 import org.apache.flink.api.common.io.OutputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -50,6 +50,7 @@ class DataStream[T](stream: JavaStream[T]) {
 
   /**
     * Returns the [[StreamExecutionEnvironment]] associated with the current 
[[DataStream]].
+ *
     * @return associated execution environment
     */
   def getExecutionEnvironment: StreamExecutionEnvironment =
@@ -60,7 +61,7 @@ class DataStream[T](stream: JavaStream[T]) {
    *
    * @return ID of the DataStream
    */
-  @Experimental
+  @PublicEvolving
   def getId = stream.getId
 
   /**
@@ -128,7 +129,7 @@ class DataStream[T](stream: JavaStream[T]) {
     * @param uid The unique user-specified ID of this transformation.
     * @return The operator with the specified ID.
     */
-  @Experimental
+  @PublicEvolving
   def uid(uid: String) : DataStream[T] = javaStream match {
     case stream : SingleOutputStreamOperator[T,_] => stream.uid(uid)
     case _ => throw new UnsupportedOperationException("Only supported for 
operators.")
@@ -142,7 +143,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * however it is not advised for performance considerations.
    *
    */
-  @Experimental
+  @PublicEvolving
   def disableChaining(): DataStream[T] = {
     stream match {
       case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining();
@@ -158,7 +159,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * previous tasks even if possible.
    *
    */
-  @Experimental
+  @PublicEvolving
   def startNewChain(): DataStream[T] = {
     stream match {
       case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain();
@@ -175,7 +176,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * All subsequent operators are assigned to the default resource group.
    *
    */
-  @Experimental
+  @PublicEvolving
   def isolateResources(): DataStream[T] = {
     stream match {
       case ds: SingleOutputStreamOperator[_, _] => ds.isolateResources();
@@ -196,7 +197,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * degree of parallelism for the operators must be decreased from the
    * default.
    */
-  @Experimental
+  @PublicEvolving
   def startNewResourceGroup(): DataStream[T] = {
     stream match {
       case ds: SingleOutputStreamOperator[_, _] => ds.startNewResourceGroup();
@@ -345,14 +346,14 @@ class DataStream[T](stream: JavaStream[T]) {
    * the first instance of the next processing operator. Use this setting with 
care
    * since it might cause a serious performance bottleneck in the application.
    */
-  @Experimental
+  @PublicEvolving
   def global: DataStream[T] = stream.global()
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples
    * are shuffled to the next component.
    */
-  @Experimental
+  @PublicEvolving
   def shuffle: DataStream[T] = stream.shuffle()
 
   /**
@@ -385,7 +386,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * In cases where the different parallelisms are not multiples of each other 
one or several
    * downstream operations will have a differing number of inputs from 
upstream operations.
    */
-  @Experimental
+  @PublicEvolving
   def rescale: DataStream[T] = stream.rescale()
 
   /**
@@ -408,7 +409,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * the keepPartitioning flag to true
    *
    */
-  @Experimental
+  @PublicEvolving
   def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),
                     maxWaitTimeMillis:Long = 0,
                     keepPartitioning: Boolean = false) : DataStream[R] = {
@@ -438,7 +439,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * to 0 then the iteration sources will indefinitely, so the job must be 
killed to stop.
    *
    */
-  @Experimental
+  @PublicEvolving
   def iterate[R, F: TypeInformation: ClassTag](stepFunction: 
ConnectedStreams[T, F] =>
     (DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = {
     val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]]
@@ -625,7 +626,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * @param assigner The `WindowAssigner` that assigns elements to windows.
    * @return The trigger windows data stream.
    */
-  @Experimental
+  @PublicEvolving
   def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): 
AllWindowedStream[T, W] = {
     new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](stream, 
assigner))
   }
@@ -640,7 +641,7 @@ class DataStream[T](stream: JavaStream[T]) {
    *
    * @see org.apache.flink.streaming.api.watermark.Watermark
    */
-  @Experimental
+  @PublicEvolving
   def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = {
     stream.assignTimestamps(clean(extractor))
   }
@@ -656,7 +657,7 @@ class DataStream[T](stream: JavaStream[T]) {
    *
    * @see org.apache.flink.streaming.api.watermark.Watermark
    */
-  @Experimental
+  @PublicEvolving
   def assignAscendingTimestamps(extractor: T => Long): DataStream[T] = {
     val cleanExtractor = clean(extractor)
     val extractorFunction = new AscendingTimestampExtractor[T] {
@@ -714,7 +715,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * written.
    *
    */
-  @Experimental
+  @PublicEvolving
   def print(): DataStreamSink[T] = stream.print()
 
   /**
@@ -725,7 +726,7 @@ class DataStream[T](stream: JavaStream[T]) {
    *
    * @return The closed DataStream.
    */
-  @Experimental
+  @PublicEvolving
   def printToErr() = stream.printToErr()
 
   /**
@@ -735,7 +736,7 @@ class DataStream[T](stream: JavaStream[T]) {
     * @param path The path pointing to the location the text file is written to
     * @return The closed DataStream
     */
-  @Experimental
+  @PublicEvolving
   def writeAsText(path: String): DataStreamSink[T] =
     stream.writeAsText(path, 0L)
 
@@ -749,7 +750,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * @param millis The file update frequency
    * @return The closed DataStream
    */
-  @Experimental
+  @PublicEvolving
   def writeAsText(path: String, millis: Long): DataStreamSink[T] =
     stream.writeAsText(path, millis)
 
@@ -762,7 +763,7 @@ class DataStream[T](stream: JavaStream[T]) {
     *                  OVERWRITE.
     * @return The closed DataStream
     */
-  @Experimental
+  @PublicEvolving
   def writeAsText(path: String, writeMode: FileSystem.WriteMode): 
DataStreamSink[T] = {
     if (writeMode != null) {
       stream.writeAsText(path, writeMode)
@@ -782,7 +783,7 @@ class DataStream[T](stream: JavaStream[T]) {
     * @param millis The file update frequency
     * @return The closed DataStream
     */
-  @Experimental
+  @PublicEvolving
   def writeAsText(
       path: String,
       writeMode: FileSystem.WriteMode,
@@ -802,7 +803,7 @@ class DataStream[T](stream: JavaStream[T]) {
     * @param path Path to the location of the CSV file
     * @return The closed DataStream
     */
-  @Experimental
+  @PublicEvolving
   def writeAsCsv(path: String): DataStreamSink[T] = {
     writeAsCsv(
       path,
@@ -820,7 +821,7 @@ class DataStream[T](stream: JavaStream[T]) {
     * @param millis File update frequency
     * @return The closed DataStream
     */
-  @Experimental
+  @PublicEvolving
   def writeAsCsv(path: String, millis: Long): DataStreamSink[T] = {
     writeAsCsv(
       path,
@@ -838,7 +839,7 @@ class DataStream[T](stream: JavaStream[T]) {
     * @param writeMode Controls whether an existing file is overwritten or not
     * @return The closed DataStream
     */
-  @Experimental
+  @PublicEvolving
   def writeAsCsv(path: String, writeMode: FileSystem.WriteMode): 
DataStreamSink[T] = {
     writeAsCsv(
       path,
@@ -857,7 +858,7 @@ class DataStream[T](stream: JavaStream[T]) {
     * @param millis File update frequency
     * @return The closed DataStream
     */
-  @Experimental
+  @PublicEvolving
   def writeAsCsv(path: String, writeMode: FileSystem.WriteMode, millis: Long): 
DataStreamSink[T] = {
     writeAsCsv(
       path,
@@ -878,7 +879,7 @@ class DataStream[T](stream: JavaStream[T]) {
     * @param fieldDelimiter Delimiter for consecutive fields
     * @return The closed DataStream
     */
-  @Experimental
+  @PublicEvolving
   def writeAsCsv(
       path: String,
       writeMode: FileSystem.WriteMode,
@@ -898,7 +899,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * Writes a DataStream using the given [[OutputFormat]]. The
    * writing is performed periodically, in every millis milliseconds.
    */
-  @Experimental
+  @PublicEvolving
   def write(format: OutputFormat[T], millis: Long): DataStreamSink[T] = {
     stream.write(format, millis)
   }
@@ -907,7 +908,7 @@ class DataStream[T](stream: JavaStream[T]) {
    * Writes the DataStream to a socket as a byte array. The format of the 
output is
    * specified by a [[SerializationSchema]].
    */
-  @Experimental
+  @PublicEvolving
   def writeToSocket(
       hostname: String,
       port: Integer,

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index b6fbadf..21c5d84 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
@@ -148,7 +148,7 @@ object JoinedStreams {
     /**
      * Specifies the window on which the join operation works.
      */
-    @Experimental
+    @PublicEvolving
     def window[W <: Window](
         assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, 
T2], W])
         : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
@@ -197,7 +197,7 @@ object JoinedStreams {
     /**
      * Sets the [[Trigger]] that should be used to trigger window emission.
      */
-    @Experimental
+    @PublicEvolving
     def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, 
T2], _ >: W])
     : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
       new WithWindow[T1, T2, KEY, W](
@@ -216,7 +216,7 @@ object JoinedStreams {
      * Note: When using an evictor window performance will degrade 
significantly, since
      * pre-aggregation of window results cannot be used.
      */
-    @Experimental
+    @PublicEvolving
     def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, 
T2], _ >: W])
     : JoinedStreams.WithWindow[T1, T2, KEY, W] = {
       new WithWindow[T1, T2, KEY, W](

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 271796b..923aad6 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.{Experimental, Internal, Public}
+import org.apache.flink.annotation.{PublicEvolving, Internal, Public}
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
@@ -112,7 +112,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) 
extends DataStream[T]
    * @param assigner The `WindowAssigner` that assigns elements to windows.
    * @return The trigger windows data stream.
    */
-  @Experimental
+  @PublicEvolving
   def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): 
WindowedStream[T, K, W] = {
     new WindowedStream(new WindowedJavaStream[T, K, W](javaStream, assigner))
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 4317931..58b100e 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.api.scala
 
 import com.esotericsoftware.kryo.Serializer
-import org.apache.flink.annotation.{Internal, Experimental, Public}
+import org.apache.flink.annotation.{Internal, PublicEvolving, Public}
 import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
@@ -91,7 +91,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * avoiding serialization and de-serialization.
    *
    */
-  @Experimental
+  @PublicEvolving
   def disableOperatorChaining(): StreamExecutionEnvironment = {
     javaEnv.disableOperatorChaining()
     this
@@ -127,7 +127,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *           If true checkpointing will be enabled for iterative jobs as 
well.
    */
   @deprecated
-  @Experimental
+  @PublicEvolving
   def enableCheckpointing(interval : Long,
                           mode: CheckpointingMode,
                           force: Boolean) : StreamExecutionEnvironment = {
@@ -191,7 +191,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * failure the job will be resubmitted to the cluster indefinitely.
    */
   @deprecated
-  @Experimental
+  @PublicEvolving
   def enableCheckpointing() : StreamExecutionEnvironment = {
     javaEnv.enableCheckpointing()
     this
@@ -218,7 +218,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * program can be executed highly available and strongly consistent 
(assuming that Flink
    * is run in high-availability mode).
    */
-  @Experimental
+  @PublicEvolving
   def setStateBackend(backend: AbstractStateBackend): 
StreamExecutionEnvironment = {
     javaEnv.setStateBackend(backend)
     this
@@ -227,7 +227,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Returns the state backend that defines how to store and checkpoint state.
    */
-  @Experimental
+  @PublicEvolving
   def getStateBackend: AbstractStateBackend = javaEnv.getStateBackend()
   
   /**
@@ -235,7 +235,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * effectively disables fault tolerance. A value of "-1" indicates that the 
system
    * default value (as defined in the configuration) should be used.
    */
-  @Experimental
+  @PublicEvolving
   def setNumberOfExecutionRetries(numRetries: Int): Unit = {
     javaEnv.setNumberOfExecutionRetries(numRetries)
   }
@@ -245,7 +245,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * of "-1" indicates that the system default value (as defined in the 
configuration)
    * should be used.
    */
-  @Experimental
+  @PublicEvolving
   def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
 
   // 
--------------------------------------------------------------------------------------------
@@ -327,7 +327,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *
    * @param characteristic The time characteristic.
    */
-  @Experimental
+  @PublicEvolving
   def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = 
{
     javaEnv.setStreamTimeCharacteristic(characteristic)
   }
@@ -336,10 +336,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Gets the time characteristic/
    *
    * @see #setStreamTimeCharacteristic
-   *
    * @return The time characteristic.
    */
-  @Experimental
+  @PublicEvolving
   def getStreamTimeCharacteristic = javaEnv.getStreamTimeCharacteristic()
 
   // 
--------------------------------------------------------------------------------------------
@@ -474,7 +473,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * character set. The maximum retry interval is specified in seconds, in case
    * of temporary service outage reconnection is initiated every second.
    */
-  @Experimental
+  @PublicEvolving
   def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', 
maxRetry: Long = 0):
     DataStream[String] =
     javaEnv.socketTextStream(hostname, port)
@@ -485,7 +484,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * determine the type of the data produced by the input format. It will 
attempt to determine the
    * data type by reflection, unless the input format implements the 
ResultTypeQueryable interface.
    */
-  @Experimental
+  @PublicEvolving
   def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, 
_]): DataStream[T] =
     javaEnv.createInput(inputFormat)
 
@@ -562,6 +561,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
    * Getter of the wrapped 
[[org.apache.flink.streaming.api.environment.StreamExecutionEnvironment]]
+ *
    * @return The encased ExecutionEnvironment
    */
   @Internal
@@ -590,7 +590,7 @@ object StreamExecutionEnvironment {
    * @param parallelism
    * The parallelism to use as the default local parallelism.
    */
-  @Experimental
+  @PublicEvolving
   def setDefaultLocalParallelism(parallelism: Int) : Unit =
     StreamExecutionEnvironment.setDefaultLocalParallelism(parallelism)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 15b9505..6385831 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{WindowedStream => 
JavaWStream}
@@ -64,7 +64,7 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
   /**
    * Sets the [[Trigger]] that should be used to trigger window emission.
    */
-  @Experimental
+  @PublicEvolving
   def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W] = {
     javaStream.trigger(trigger)
     this
@@ -76,7 +76,7 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
    * Note: When using an evictor window performance will degrade 
significantly, since
    * pre-aggregation of window results cannot be used.
    */
-  @Experimental
+  @PublicEvolving
   def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W] = {
     javaStream.evictor(evictor)
     this

Reply via email to