http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 5dd2988..223ebee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -183,7 +183,7 @@ public class RemoteEnvironment extends ExecutionEnvironment 
{
        }
 
        @Override
-       @Experimental
+       @PublicEvolving
        public void startNewSession() throws Exception {
                dispose();
                jobID = JobID.generate();

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
index 2eda077..fdd114e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
@@ -17,6 +17,7 @@
  */
 
 package org.apache.flink.api.java.functions;
+
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
index dd00c31..0ce518e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
@@ -26,7 +26,7 @@ import java.lang.annotation.Retention;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -310,7 +310,7 @@ public class FunctionAnnotation {
         */
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
-       @Experimental
+       @PublicEvolving
        public @interface ReadFields {
                String[] value();
        }
@@ -341,7 +341,7 @@ public class FunctionAnnotation {
         */
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
-       @Experimental
+       @PublicEvolving
        public @interface ReadFieldsFirst {
                String[] value();
        }
@@ -372,7 +372,7 @@ public class FunctionAnnotation {
         */
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
-       @Experimental
+       @PublicEvolving
        public @interface ReadFieldsSecond {
                String[] value();
        }
@@ -389,7 +389,7 @@ public class FunctionAnnotation {
         */
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
-       @Experimental
+       @PublicEvolving
        public @interface SkipCodeAnalysis {
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 9c6621d..3d656a4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DataSource;
@@ -110,7 +110,7 @@ public class CsvReader {
         * @return The CSV reader instance itself, to allow for fluent function 
chaining.
         */
        @Deprecated
-       @Experimental
+       @PublicEvolving
        public CsvReader fieldDelimiter(char delimiter) {
                this.fieldDelimiter = String.valueOf(delimiter);
                return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index 3b5ff2d..e69e16e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import java.util.Arrays;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -315,7 +315,7 @@ public class CrossOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OUT,
                 */
                @SuppressWarnings({ "hiding", "unchecked" })
                @Deprecated
-               @Experimental
+               @PublicEvolving
                public <OUT extends Tuple> CrossOperator<I1, I2, OUT> 
types(Class<?>... types) {
                        TupleTypeInfo<OUT> typeInfo = 
(TupleTypeInfo<OUT>)this.getResultType();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index e11f489..37f6cc2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
@@ -112,7 +112,7 @@ public class DataSink<T> {
         * @see Order
         */
        @Deprecated
-       @Experimental
+       @PublicEvolving
        public DataSink<T> sortLocalOutput(int field, Order order) {
 
                // get flat keys
@@ -159,7 +159,7 @@ public class DataSink<T> {
         * @see Order
         */
        @Deprecated
-       @Experimental
+       @PublicEvolving
        public DataSink<T> sortLocalOutput(String fieldExpression, Order order) 
{
 
                int numFields;

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
index 4e6b5a4..af6f65b 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.NonParallelInput;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
@@ -113,7 +113,7 @@ public class DataSource<OUT> extends Operator<OUT, 
DataSource<OUT>> {
         *
         * @return The SplitDataProperties for the InputSplits of this 
DataSource.
         */
-       @Experimental
+       @PublicEvolving
        public SplitDataProperties<OUT> getSplitDataProperties() {
                if(this.splitDataProperties == null) {
                        this.splitDataProperties = new 
SplitDataProperties<OUT>(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
index 85f7fe8..cc1cd66 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import java.util.Arrays;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.aggregators.Aggregator;
@@ -204,7 +204,7 @@ public class DeltaIteration<ST, WT> {
         * 
         * @return The DeltaIteration itself, to allow chaining function calls.
         */
-       @Experimental
+       @PublicEvolving
        public DeltaIteration<ST, WT> registerAggregator(String name, 
Aggregator<?> aggregator) {
                this.aggregators.registerAggregator(name, aggregator);
                return this;
@@ -215,7 +215,7 @@ public class DeltaIteration<ST, WT> {
         * 
         * @return The registry with all aggregators.
         */
-       @Experimental
+       @PublicEvolving
        public AggregatorRegistry getAggregators() {
                return this.aggregators;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
index 48c72bb..c7ff6ab 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
@@ -106,7 +106,7 @@ public class IterativeDataSet<T> extends 
SingleInputOperator<T, T, IterativeData
         * 
         * @return The IterativeDataSet itself, to allow chaining function 
calls.
         */
-       @Experimental
+       @PublicEvolving
        public IterativeDataSet<T> registerAggregator(String name, 
Aggregator<?> aggregator) {
                this.aggregators.registerAggregator(name, aggregator);
                return this;
@@ -126,7 +126,7 @@ public class IterativeDataSet<T> extends 
SingleInputOperator<T, T, IterativeData
         * 
         * @return The IterativeDataSet itself, to allow chaining function 
calls.
         */
-       @Experimental
+       @PublicEvolving
        public <X extends Value> IterativeDataSet<T> 
registerAggregationConvergenceCriterion(
                        String name, Aggregator<X> aggregator, 
ConvergenceCriterion<X> convergenceCheck)
        {
@@ -141,7 +141,7 @@ public class IterativeDataSet<T> extends 
SingleInputOperator<T, T, IterativeData
         * 
         * @return The registry for aggregators.
         */
-       @Experimental
+       @PublicEvolving
        public AggregatorRegistry getAggregators() {
                return aggregators;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 4274a4a..1d1ec27 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -24,7 +24,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
@@ -742,7 +742,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                 */
                @SuppressWarnings({ "unchecked", "hiding" })
                @Deprecated
-               @Experimental
+               @PublicEvolving
                public <OUT extends Tuple> JoinOperator<I1, I2, OUT> 
types(Class<?>... types) {
                        TupleTypeInfo<OUT> typeInfo = 
(TupleTypeInfo<OUT>)this.getResultType();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 95fe5c8..d8a5835 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -22,7 +22,7 @@ import java.util.Arrays;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
@@ -76,7 +76,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
         */
        @SuppressWarnings("unchecked")
        @Deprecated
-       @Experimental
+       @PublicEvolving
        public <R extends Tuple> ProjectOperator<IN, R> types(Class<?>... 
types) {
                TupleTypeInfo<R> typeInfo = 
(TupleTypeInfo<R>)this.getResultType();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
index 3834df1..78e5231 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.utils;
 
 import com.google.common.collect.Lists;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
@@ -43,7 +43,7 @@ import java.util.List;
  * This class provides simple utility methods for zipping elements in a data 
set with an index
  * or with a unique identifier.
  */
-@Experimental
+@PublicEvolving
 public final class DataSetUtils {
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index fb049f3..bfd6d12 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.utils;
 import com.google.common.base.Preconditions;
 import org.apache.commons.cli.Option;
 import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
@@ -191,7 +191,7 @@ public class ParameterTool extends 
ExecutionConfig.GlobalJobParameters implement
         * @throws IOException If arguments cannot be parsed by {@link 
GenericOptionsParser}
         * @see GenericOptionsParser
         */
-       @Experimental
+       @PublicEvolving
        public static ParameterTool fromGenericOptionsParser(String[] args) 
throws IOException {
                Option[] options = new 
GenericOptionsParser(args).getCommandLine().getOptions();
                Map<String, String> map = new HashMap<String, String>();

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index b8cbbd2..e47bc42 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.api.scala
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.InvalidProgramException
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator
 import org.apache.flink.api.common.aggregators.Aggregator
@@ -190,7 +190,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * @param name The name under which the aggregator is registered.
    * @param aggregator The aggregator class.
    */
-  @Experimental
+  @PublicEvolving
   def registerAggregator(name: String, aggregator: Aggregator[_]): DataSet[T] 
= {
     javaSet match {
       case di: DeltaIterationResultSet[_, _] =>
@@ -1632,7 +1632,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * @deprecated Use [[printOnTaskManager(String)]] instead.
    */
   @deprecated
-  @Experimental
+  @PublicEvolving
   def print(sinkIdentifier: String): DataSink[T] = {
     output(new PrintingOutputFormat[T](sinkIdentifier, false))
   }
@@ -1645,7 +1645,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * @deprecated Use [[printOnTaskManager(String)]] instead.
    */
   @deprecated
-  @Experimental
+  @PublicEvolving
   def printToErr(sinkIdentifier: String): DataSink[T] = {
       output(new PrintingOutputFormat[T](sinkIdentifier, true))
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index a92750a..b3d2430 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala
 
 import com.esotericsoftware.kryo.Serializer
 import com.google.common.base.Preconditions
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
@@ -97,7 +97,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * effectively disables fault tolerance. A value of "-1" indicates that the 
system
    * default value (as defined in the configuration) should be used.
    */
-  @Experimental
+  @PublicEvolving
   def setNumberOfExecutionRetries(numRetries: Int): Unit = {
     javaEnv.setNumberOfExecutionRetries(numRetries)
   }
@@ -107,14 +107,14 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * of "-1" indicates that the system default value (as defined in the 
configuration)
    * should be used.
    */
-  @Experimental
+  @PublicEvolving
   def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
 
   /**
    * Gets the UUID by which this environment is identified. The UUID sets the 
execution context
    * in the cluster or local environment.
    */
-  @Experimental
+  @PublicEvolving
   def getId: JobID = {
     javaEnv.getId
   }
@@ -127,7 +127,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Gets the UUID by which this environment is identified, as a string.
    */
-  @Experimental
+  @PublicEvolving
   def getIdString: String = {
     javaEnv.getIdString
   }
@@ -135,7 +135,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Starts a new session, discarding all intermediate results.
    */
-  @Experimental
+  @PublicEvolving
   def startNewSession() {
     javaEnv.startNewSession()
   }
@@ -143,9 +143,10 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Sets the session timeout to hold the intermediate results of a job. This 
only
    * applies the updated timeout in future executions.
+ *
    * @param timeout The timeout in seconds.
    */
-  @Experimental
+  @PublicEvolving
   def setSessionTimeout(timeout: Long) {
     javaEnv.setSessionTimeout(timeout)
   }
@@ -157,7 +158,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    *
    * @return The session timeout, in seconds.
    */
-  @Experimental
+  @PublicEvolving
   def getSessionTimeout: Long = {
     javaEnv.getSessionTimeout
   }
@@ -383,7 +384,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapred.FileInputFormat]]. The
    * given inputName is set on the given job.
    */
-  @Experimental
+  @PublicEvolving
   def readHadoopFile[K, V](
       mapredInputFormat: MapredFileInputFormat[K, V],
       key: Class[K],
@@ -400,7 +401,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapred.FileInputFormat]]. A
    * [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
    */
-  @Experimental
+  @PublicEvolving
   def readHadoopFile[K, V](
       mapredInputFormat: MapredFileInputFormat[K, V],
       key: Class[K],
@@ -414,7 +415,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * Creates a [[DataSet]] from 
[[org.apache.hadoop.mapred.SequenceFileInputFormat]]
    * A [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is 
created.
    */
-  @Experimental
+  @PublicEvolving
   def readSequenceFile[K, V](
       key: Class[K],
       value: Class[V],
@@ -427,7 +428,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapred.InputFormat]].
    */
-  @Experimental
+  @PublicEvolving
   def createHadoopInput[K, V](
       mapredInputFormat: MapredInputFormat[K, V],
       key: Class[K],
@@ -442,7 +443,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
    * The given inputName is set on the given job.
    */
-  @Experimental
+  @PublicEvolving
   def readHadoopFile[K, V](
       mapreduceInputFormat: MapreduceFileInputFormat[K, V],
       key: Class[K],
@@ -460,7 +461,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A
    * [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be 
created.
    */
-  @Experimental
+  @PublicEvolving
   def readHadoopFile[K, V](
       mapreduceInputFormat: MapreduceFileInputFormat[K, V],
       key: Class[K],
@@ -473,7 +474,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapreduce.InputFormat]].
    */
-  @Experimental
+  @PublicEvolving
   def createHadoopInput[K, V](
       mapreduceInputFormat: MapreduceInputFormat[K, V],
       key: Class[K],
@@ -686,9 +687,10 @@ object ExecutionEnvironment {
    * Creates an execution environment that uses Java Collections underneath. 
This will execute in a
    * single thread in the current JVM. It is very fast but will fail if the 
data does not fit into
    * memory. This is useful during implementation and for debugging.
+ *
    * @return
    */
-  @Experimental
+  @PublicEvolving
   def createCollectionsEnvironment: ExecutionEnvironment = {
     new ExecutionEnvironment(new CollectionEnvironment)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index eb41b4b..d658fde 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.typeutils
 import java.util
 import java.util.regex.{Pattern, Matcher}
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.operators.Keys
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -46,7 +46,7 @@ abstract class CaseClassTypeInfo[T <: Product](
     val fieldNames: Seq[String])
   extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) {
 
-  @Experimental
+  @PublicEvolving
   override def getGenericParameters: java.util.List[TypeInformation[_]] = {
     typeParamTypeInfos.toList.asJava
   }
@@ -63,12 +63,12 @@ abstract class CaseClassTypeInfo[T <: Product](
     Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD)
   private val PATTERN_INT_FIELD: Pattern = Pattern.compile(REGEX_INT_FIELD)
 
-  @Experimental
+  @PublicEvolving
   def getFieldIndices(fields: Array[String]): Array[Int] = {
     fields map { x => fieldNames.indexOf(x) }
   }
 
-  @Experimental
+  @PublicEvolving
   override def getFlatFields(
       fieldExpression: String,
       offset: Int,
@@ -150,7 +150,7 @@ abstract class CaseClassTypeInfo[T <: Product](
     }
   }
 
-  @Experimental
+  @PublicEvolving
   override def getTypeAt[X](fieldExpression: String) : TypeInformation[X] = {
 
     val matcher: Matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression)
@@ -193,10 +193,10 @@ abstract class CaseClassTypeInfo[T <: Product](
       "\" in type " + this + ".")
   }
 
-  @Experimental
+  @PublicEvolving
   override def getFieldNames: Array[String] = fieldNames.toArray
 
-  @Experimental
+  @PublicEvolving
   override def getFieldIndex(fieldName: String): Int = {
     val result = fieldNames.indexOf(fieldName)
     if (result != fieldNames.lastIndexOf(fieldName)) {
@@ -206,7 +206,7 @@ abstract class CaseClassTypeInfo[T <: Product](
     }
   }
 
-  @Experimental
+  @PublicEvolving
   override def createTypeComparatorBuilder(): TypeComparatorBuilder[T] = {
     new CaseClassTypeComparatorBuilder
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
index cb39e7b..406f073 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
@@ -34,22 +34,22 @@ class EitherTypeInfo[A, B, T <: Either[A, B]](
     val rightTypeInfo: TypeInformation[B])
   extends TypeInformation[T] {
 
-  @Experimental
+  @PublicEvolving
   override def isBasicType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def isTupleType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def isKeyType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def getTotalFields: Int = 1
-  @Experimental
+  @PublicEvolving
   override def getArity: Int = 1
-  @Experimental
+  @PublicEvolving
   override def getTypeClass = clazz
-  @Experimental
+  @PublicEvolving
   override def getGenericParameters = List[TypeInformation[_]](leftTypeInfo, 
rightTypeInfo).asJava
 
-  @Experimental
+  @PublicEvolving
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
     val leftSerializer = if (leftTypeInfo != null) {
       leftTypeInfo.createSerializer(executionConfig)

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
index 79a2866..92d2704 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
@@ -33,28 +33,28 @@ class EnumValueTypeInfo[E <: Enumeration](val enum: E, val 
clazz: Class[E#Value]
 
   type T = E#Value
 
-  @Experimental
+  @PublicEvolving
   override def isBasicType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def isTupleType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def isKeyType: Boolean = true
-  @Experimental
+  @PublicEvolving
   override def getTotalFields: Int = 1
-  @Experimental
+  @PublicEvolving
   override def getArity: Int = 1
-  @Experimental
+  @PublicEvolving
   override def getTypeClass = clazz
-  @Experimental
+  @PublicEvolving
   override def getGenericParameters = List.empty[TypeInformation[_]].asJava
 
 
-  @Experimental
+  @PublicEvolving
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
     new EnumValueSerializer[E](enum)
   }
 
-  @Experimental
+  @PublicEvolving
   override def createComparator(ascOrder: Boolean, config: ExecutionConfig): 
TypeComparator[T] = {
     new EnumValueComparator[E](ascOrder)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index df12955..70db4fa 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
@@ -31,23 +31,23 @@ import scala.collection.JavaConverters._
 class OptionTypeInfo[A, T <: Option[A]](private val elemTypeInfo: 
TypeInformation[A])
   extends TypeInformation[T] {
 
-  @Experimental
+  @PublicEvolving
   override def isBasicType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def isTupleType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def isKeyType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def getTotalFields: Int = 1
-  @Experimental
+  @PublicEvolving
   override def getArity: Int = 1
-  @Experimental
+  @PublicEvolving
   override def getTypeClass = classOf[Option[_]].asInstanceOf[Class[T]]
-  @Experimental
+  @PublicEvolving
   override def getGenericParameters = 
List[TypeInformation[_]](elemTypeInfo).asJava
 
 
-  @Experimental
+  @PublicEvolving
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
     if (elemTypeInfo == null) {
       // this happens when the type of a DataSet is None, i.e. DataSet[None]

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
index b0f760a..bc2aabf 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
@@ -25,20 +25,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
 @Public
 class ScalaNothingTypeInfo extends TypeInformation[Nothing] {
 
-  @Experimental
+  @PublicEvolving
   override def isBasicType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def isTupleType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def getArity: Int = 0
-  @Experimental
+  @PublicEvolving
   override def getTotalFields: Int = 0
-  @Experimental
+  @PublicEvolving
   override def getTypeClass: Class[Nothing] = classOf[Nothing]
-  @Experimental
+  @PublicEvolving
   override def isKeyType: Boolean = false
 
-  @Experimental
+  @PublicEvolving
   override def createSerializer(config: ExecutionConfig): 
TypeSerializer[Nothing] =
     (new NothingSerializer).asInstanceOf[TypeSerializer[Nothing]]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index 855caa9..82fd8ae 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
@@ -33,22 +33,22 @@ abstract class TraversableTypeInfo[T <: TraversableOnce[E], 
E](
     val elementTypeInfo: TypeInformation[E])
   extends TypeInformation[T] {
 
-  @Experimental
+  @PublicEvolving
   override def isBasicType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def isTupleType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def isKeyType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def getTotalFields: Int = 1
-  @Experimental
+  @PublicEvolving
   override def getArity: Int = 1
-  @Experimental
+  @PublicEvolving
   override def getTypeClass: Class[T] = clazz
-  @Experimental
+  @PublicEvolving
   override def getGenericParameters = 
List[TypeInformation[_]](elementTypeInfo).asJava
 
-  @Experimental
+  @PublicEvolving
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T]
 
   override def equals(other: Any): Boolean = {

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
index 880c636..0a5a06d 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
@@ -33,22 +33,22 @@ import scala.util.Try
 class TryTypeInfo[A, T <: Try[A]](val elemTypeInfo: TypeInformation[A])
   extends TypeInformation[T] {
 
-  @Experimental
+  @PublicEvolving
   override def isBasicType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def isTupleType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def isKeyType: Boolean = false
-  @Experimental
+  @PublicEvolving
   override def getTotalFields: Int = 1
-  @Experimental
+  @PublicEvolving
   override def getArity: Int = 1
-  @Experimental
+  @PublicEvolving
   override def getTypeClass = classOf[Try[_]].asInstanceOf[Class[T]]
-  @Experimental
+  @PublicEvolving
   override def getGenericParameters = 
List[TypeInformation[_]](elemTypeInfo).asJava
 
-  @Experimental
+  @PublicEvolving
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
     if (elemTypeInfo == null) {
       // this happens when the type of a DataSet is None, i.e. DataSet[Failure]

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala
index fa46a8a..5d4a443 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala
@@ -17,27 +17,27 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 
 @Public
 class UnitTypeInfo extends TypeInformation[Unit] {
-  @Experimental
+  @PublicEvolving
   override def isBasicType(): Boolean = false
-  @Experimental
+  @PublicEvolving
   override def isTupleType(): Boolean = false
-  @Experimental
+  @PublicEvolving
   override def getArity(): Int = 0
-  @Experimental
+  @PublicEvolving
   override def getTotalFields(): Int = 0
-  @Experimental
+  @PublicEvolving
   override def getTypeClass(): Class[Unit] = classOf[Unit]
-  @Experimental
+  @PublicEvolving
   override def isKeyType(): Boolean = false
 
-  @Experimental
+  @PublicEvolving
   override def createSerializer(config: ExecutionConfig): TypeSerializer[Unit] 
=
     (new UnitSerializer).asInstanceOf[TypeSerializer[Unit]]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
index 7a03053..6407093 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.scala
 
-import org.apache.flink.annotation.Experimental
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.Utils
 import org.apache.flink.api.java.Utils.ChecksumHashCode
@@ -37,7 +37,7 @@ package object utils {
    *
    * @param self Data Set
    */
-  @Experimental
+  @PublicEvolving
   implicit class DataSetUtils[T: TypeInformation : ClassTag](val self: 
DataSet[T]) {
 
     /**
@@ -118,7 +118,6 @@ package object utils {
       * as well as the checksum (sum over element hashes).
       *
       * @return A ChecksumHashCode with the count and checksum of elements in 
the data set.
-      *
       * @see [[org.apache.flink.api.java.Utils.ChecksumHashCodeHelper]]
       */
     def checksumHashCode(): ChecksumHashCode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index b7dc795..2902795 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
@@ -84,7 +84,7 @@ public class AllWindowedStream<T, W extends Window> {
        private Evictor<? super T, ? super W> evictor;
 
 
-       @Experimental
+       @PublicEvolving
        public AllWindowedStream(DataStream<T> input,
                        WindowAssigner<? super T, W> windowAssigner) {
                this.input = input;
@@ -95,7 +95,7 @@ public class AllWindowedStream<T, W extends Window> {
        /**
         * Sets the {@code Trigger} that should be used to trigger window 
emission.
         */
-       @Experimental
+       @PublicEvolving
        public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> 
trigger) {
                this.trigger = trigger;
                return this;
@@ -108,7 +108,7 @@ public class AllWindowedStream<T, W extends Window> {
         * Note: When using an evictor window performance will degrade 
significantly, since
         * pre-aggregation of window results cannot be used.
         */
-       @Experimental
+       @PublicEvolving
        public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> 
evictor) {
                this.evictor = evictor;
                return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index b552a26..9e2bc5d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -145,7 +145,7 @@ public class CoGroupedStreams<T1, T2> {
                        /**
                         * Specifies the window on which the co-group operation 
works.
                         */
-                       @Experimental
+                       @PublicEvolving
                        public <W extends Window> WithWindow<T1, T2, KEY, W> 
window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
                                return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
                        }
@@ -202,7 +202,7 @@ public class CoGroupedStreams<T1, T2> {
                /**
                 * Sets the {@code Trigger} that should be used to trigger 
window emission.
                 */
-               @Experimental
+               @PublicEvolving
                public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super 
TaggedUnion<T1, T2>, ? super W> newTrigger) {
                        return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
                                        windowAssigner, newTrigger, evictor);
@@ -215,7 +215,7 @@ public class CoGroupedStreams<T1, T2> {
                 * Note: When using an evictor window performance will degrade 
significantly, since
                 * pre-aggregation of window results cannot be used.
                 */
-               @Experimental
+               @PublicEvolving
                public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super 
TaggedUnion<T1, T2>, ? super W> newEvictor) {
                        return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
                                        windowAssigner, trigger, newEvictor);

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 0d3064d..b340e6e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
@@ -308,7 +308,7 @@ public class ConnectedStreams<IN1, IN2> {
                return transform("Co-Flat Map", outTypeInfo, new 
CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
        }
 
-       @Experimental
+       @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT, ?> transform(String 
functionName,
                        TypeInformation<OUT> outTypeInfo,
                        TwoInputStreamOperator<IN1, IN2, OUT> operator) {

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 3eae2e8..64d0821 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -21,7 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -406,7 +406,7 @@ public class DataStream<T> {
         *
         * @return The DataStream with shuffle partitioning set.
         */
-       @Experimental
+       @PublicEvolving
        public DataStream<T> shuffle() {
                return setConnectionType(new ShufflePartitioner<T>());
        }
@@ -452,7 +452,7 @@ public class DataStream<T> {
         *
         * @return The DataStream with rescale partitioning set.
         */
-       @Experimental
+       @PublicEvolving
        public DataStream<T> rescale() {
                return setConnectionType(new RescalePartitioner<T>());
        }
@@ -465,7 +465,7 @@ public class DataStream<T> {
         *
         * @return The DataStream with shuffle partitioning set.
         */
-       @Experimental
+       @PublicEvolving
        public DataStream<T> global() {
                return setConnectionType(new GlobalPartitioner<T>());
        }
@@ -497,7 +497,7 @@ public class DataStream<T> {
         *
         * @return The iterative data stream created.
         */
-       @Experimental
+       @PublicEvolving
        public IterativeStream<T> iterate() {
                return new IterativeStream<T>(this, 0);
        }
@@ -533,7 +533,7 @@ public class DataStream<T> {
         *
         * @return The iterative data stream created.
         */
-       @Experimental
+       @PublicEvolving
        public IterativeStream<T> iterate(long maxWaitTimeMillis) {
                return new IterativeStream<T>(this, maxWaitTimeMillis);
        }
@@ -621,7 +621,7 @@ public class DataStream<T> {
         * @see Tuple
         * @see DataStream
         */
-       @Experimental
+       @PublicEvolving
        public <R extends Tuple> SingleOutputStreamOperator<R, ?> 
project(int... fieldIndexes) {
                return new StreamProjection<T>(this, 
fieldIndexes).projectTupleX();
        }
@@ -733,7 +733,7 @@ public class DataStream<T> {
         * @param assigner The {@code WindowAssigner} that assigns elements to 
windows.
         * @return The trigger windows data stream.
         */
-       @Experimental
+       @PublicEvolving
        public <W extends Window> AllWindowedStream<T, W> 
windowAll(WindowAssigner<? super T, W> assigner) {
                return new AllWindowedStream<>(this, assigner);
        }
@@ -752,7 +752,7 @@ public class DataStream<T> {
         *
         * @param extractor The TimestampExtractor that is called for each 
element of the DataStream.
         */
-       @Experimental
+       @PublicEvolving
        public SingleOutputStreamOperator<T, ?> 
assignTimestamps(TimestampExtractor<T> extractor) {
                // match parallelism to input, otherwise dop=1 sources could 
lead to some strange
                // behaviour: the watermark will creep along very slowly 
because the elements
@@ -772,7 +772,7 @@ public class DataStream<T> {
         *
         * @return The closed DataStream.
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> print() {
                PrintSinkFunction<T> printFunction = new PrintSinkFunction<T>();
                return addSink(printFunction);
@@ -787,7 +787,7 @@ public class DataStream<T> {
         *
         * @return The closed DataStream.
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> printToErr() {
                PrintSinkFunction<T> printFunction = new 
PrintSinkFunction<T>(true);
                return addSink(printFunction);
@@ -805,7 +805,7 @@ public class DataStream<T> {
         *
         * @return The closed DataStream.
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> writeAsText(String path) {
                return write(new TextOutputFormat<T>(new Path(path)), 0L);
        }
@@ -825,7 +825,7 @@ public class DataStream<T> {
         *
         * @return The closed DataStream.
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> writeAsText(String path, long millis) {
                TextOutputFormat<T> tof = new TextOutputFormat<T>(new 
Path(path));
                return write(tof, millis);
@@ -846,7 +846,7 @@ public class DataStream<T> {
         *
         * @return The closed DataStream.
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
                TextOutputFormat<T> tof = new TextOutputFormat<T>(new 
Path(path));
                tof.setWriteMode(writeMode);
@@ -870,7 +870,7 @@ public class DataStream<T> {
         *
         * @return The closed DataStream.
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> writeAsText(String path, WriteMode writeMode, 
long millis) {
                TextOutputFormat<T> tof = new TextOutputFormat<T>(new 
Path(path));
                tof.setWriteMode(writeMode);
@@ -889,7 +889,7 @@ public class DataStream<T> {
         *
         * @return the closed DataStream
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> writeAsCsv(String path) {
                return writeAsCsv(path, null, 0L, 
CsvOutputFormat.DEFAULT_LINE_DELIMITER, 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
        }
@@ -909,7 +909,7 @@ public class DataStream<T> {
         *
         * @return the closed DataStream
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> writeAsCsv(String path, long millis) {
                return writeAsCsv(path, null, millis, 
CsvOutputFormat.DEFAULT_LINE_DELIMITER, 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
        }
@@ -929,7 +929,7 @@ public class DataStream<T> {
         *
         * @return the closed DataStream
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode) {
                return writeAsCsv(path, writeMode, 0L, 
CsvOutputFormat.DEFAULT_LINE_DELIMITER, 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
        }
@@ -952,7 +952,7 @@ public class DataStream<T> {
         *
         * @return the closed DataStream
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode, 
long millis) {
                return writeAsCsv(path, writeMode, millis, 
CsvOutputFormat.DEFAULT_LINE_DELIMITER, 
CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
        }
@@ -980,7 +980,7 @@ public class DataStream<T> {
         * @return the closed DataStream
         */
        @SuppressWarnings("unchecked")
-       @Experimental
+       @PublicEvolving
        public <X extends Tuple> DataStreamSink<T> writeAsCsv(
                        String path,
                        WriteMode writeMode,
@@ -1015,7 +1015,7 @@ public class DataStream<T> {
         *            schema for serialization
         * @return the closed DataStream
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> writeToSocket(String hostName, int port, 
SerializationSchema<T> schema) {
                DataStreamSink<T> returnStream = addSink(new 
SocketClientSink<T>(hostName, port, schema, 0));
                returnStream.setParallelism(1); // It would not work if 
multiple instances would connect to the same port
@@ -1029,7 +1029,7 @@ public class DataStream<T> {
         * @param millis the write frequency
         * @return The closed DataStream
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> write(OutputFormat<T> format, long millis) {
                return addSink(new FileSinkFunctionByMillis<T>(format, millis));
        }
@@ -1048,7 +1048,7 @@ public class DataStream<T> {
         *            type of the return stream
         * @return the data stream constructed
         */
-       @Experimental
+       @PublicEvolving
        public <R> SingleOutputStreamOperator<R, ?> transform(String 
operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> 
operator) {
 
                // read the output type of the input Transform to coax out 
errors about MissingTypeInfo

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index fcfe98d..d6cdeff 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -70,7 +70,7 @@ public class DataStreamSink<T> {
         * @param uid The unique user-specified ID of this transformation.
         * @return The operator with the specified ID.
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> uid(String uid) {
                transformation.setUid(uid);
                return this;
@@ -98,7 +98,7 @@ public class DataStreamSink<T> {
         *
         * @return The sink with chaining disabled
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSink<T> disableChaining() {
                this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
                return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
index d03e8e0..f6b54b7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -34,7 +34,7 @@ import java.util.Collection;
  * 
  * @param <T> Type of the elements in this Stream
  */
-@Experimental
+@PublicEvolving
 public class IterativeStream<T> extends SingleOutputStreamOperator<T, 
IterativeStream<T>> {
 
        // We store these so that we can create a co-iteration if we need to

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index aa866eb..f131b6e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -137,7 +137,7 @@ public class JoinedStreams<T1, T2> {
                        /**
                         * Specifies the window on which the join operation 
works.
                         */
-                       @Experimental
+                       @PublicEvolving
                        public <W extends Window> WithWindow<T1, T2, KEY, W> 
window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
                                return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
                        }
@@ -171,7 +171,7 @@ public class JoinedStreams<T1, T2> {
 
                private final Evictor<? super TaggedUnion<T1, T2>, ? super W> 
evictor;
 
-               @Experimental
+               @PublicEvolving
                protected WithWindow(DataStream<T1> input1,
                                DataStream<T2> input2,
                                KeySelector<T1, KEY> keySelector1,
@@ -197,7 +197,7 @@ public class JoinedStreams<T1, T2> {
                /**
                 * Sets the {@code Trigger} that should be used to trigger 
window emission.
                 */
-               @Experimental
+               @PublicEvolving
                public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super 
TaggedUnion<T1, T2>, ? super W> newTrigger) {
                        return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
                                        windowAssigner, newTrigger, evictor);
@@ -210,7 +210,7 @@ public class JoinedStreams<T1, T2> {
                 * Note: When using an evictor window performance will degrade 
significantly, since
                 * pre-aggregation of window results cannot be used.
                 */
-               @Experimental
+               @PublicEvolving
                public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super 
TaggedUnion<T1, T2>, ? super W> newEvictor) {
                        return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, keyType,
                                        windowAssigner, trigger, newEvictor);

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 6077381..9fa4545 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.FoldFunction;
@@ -132,7 +132,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
        // 
------------------------------------------------------------------------
        
        @Override
-       @Experimental
+       @PublicEvolving
        public <R> SingleOutputStreamOperator<R, ?> transform(String 
operatorName,
                        TypeInformation<R> outTypeInfo, 
OneInputStreamOperator<T, R> operator) {
 
@@ -222,7 +222,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         * @param assigner The {@code WindowAssigner} that assigns elements to 
windows.
         * @return The trigger windows data stream.
         */
-       @Experimental
+       @PublicEvolving
        public <W extends Window> WindowedStream<T, KEY, W> 
window(WindowAssigner<? super T, W> assigner) {
                return new WindowedStream<>(this, assigner);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 95e6d36..a11d53b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -77,7 +77,7 @@ public class SingleOutputStreamOperator<T, O extends 
SingleOutputStreamOperator<
         * @param uid The unique user-specified ID of this transformation.
         * @return The operator with the specified ID.
         */
-       @Experimental
+       @PublicEvolving
        public SingleOutputStreamOperator<T, O> uid(String uid) {
                transformation.setUid(uid);
                return this;
@@ -121,7 +121,7 @@ public class SingleOutputStreamOperator<T, O extends 
SingleOutputStreamOperator<
 
        @SuppressWarnings("unchecked")
        @Override
-       @Experimental
+       @PublicEvolving
        public SingleOutputStreamOperator<T, O> shuffle() {
                return (SingleOutputStreamOperator<T, O>) super.shuffle();
        }
@@ -140,14 +140,14 @@ public class SingleOutputStreamOperator<T, O extends 
SingleOutputStreamOperator<
 
        @SuppressWarnings("unchecked")
        @Override
-       @Experimental
+       @PublicEvolving
        public SingleOutputStreamOperator<T, O> rescale() {
                return (SingleOutputStreamOperator<T, O>) super.rescale();
        }
 
        @SuppressWarnings("unchecked")
        @Override
-       @Experimental
+       @PublicEvolving
        public SingleOutputStreamOperator<T, O> global() {
                return (SingleOutputStreamOperator<T, O>) super.global();
        }
@@ -161,7 +161,7 @@ public class SingleOutputStreamOperator<T, O extends 
SingleOutputStreamOperator<
         *            The selected {@link ChainingStrategy}
         * @return The operator with the modified chaining strategy
         */
-       @Experimental
+       @PublicEvolving
        private SingleOutputStreamOperator<T, O> 
setChainingStrategy(ChainingStrategy strategy) {
                this.transformation.setChainingStrategy(strategy);
                return this;
@@ -176,7 +176,7 @@ public class SingleOutputStreamOperator<T, O extends 
SingleOutputStreamOperator<
         * 
         * @return The operator with chaining disabled
         */
-       @Experimental
+       @PublicEvolving
        public SingleOutputStreamOperator<T, O> disableChaining() {
                return setChainingStrategy(ChainingStrategy.NEVER);
        }
@@ -188,7 +188,7 @@ public class SingleOutputStreamOperator<T, O extends 
SingleOutputStreamOperator<
         * 
         * @return The operator with chaining set.
         */
-       @Experimental
+       @PublicEvolving
        public SingleOutputStreamOperator<T, O> startNewChain() {
                return setChainingStrategy(ChainingStrategy.HEAD);
        }
@@ -327,7 +327,7 @@ public class SingleOutputStreamOperator<T, O extends 
SingleOutputStreamOperator<
         * 
         * @return The operator as a part of a new resource group.
         */
-       @Experimental
+       @PublicEvolving
        public SingleOutputStreamOperator<T, O> startNewResourceGroup() {
                transformation.setResourceStrategy(ResourceStrategy.NEWGROUP);
                return this;
@@ -343,7 +343,7 @@ public class SingleOutputStreamOperator<T, O extends 
SingleOutputStreamOperator<
         * 
         * @return The operator with isolated resource group.
         */
-       @Experimental
+       @PublicEvolving
        public SingleOutputStreamOperator<T, O> isolateResources() {
                transformation.setResourceStrategy(ResourceStrategy.ISOLATE);
                return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
index 0f0f301..50c3aa9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.datastream;
 
 import com.google.common.collect.Lists;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.transformations.SelectTransformation;
 import org.apache.flink.streaming.api.transformations.SplitTransformation;
@@ -32,7 +32,7 @@ import 
org.apache.flink.streaming.api.transformations.SplitTransformation;
  * @param <OUT> The type of the elements in the Stream
  */
 
-@Experimental
+@PublicEvolving
 public class SplitStream<OUT> extends DataStream<OUT> {
 
        protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> 
outputSelector) {

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index f945399..88e619a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
@@ -98,7 +98,7 @@ public class WindowedStream<T, K, W extends Window> {
        private Evictor<? super T, ? super W> evictor;
 
 
-       @Experimental
+       @PublicEvolving
        public WindowedStream(KeyedStream<T, K> input,
                        WindowAssigner<? super T, W> windowAssigner) {
                this.input = input;
@@ -109,7 +109,7 @@ public class WindowedStream<T, K, W extends Window> {
        /**
         * Sets the {@code Trigger} that should be used to trigger window 
emission.
         */
-       @Experimental
+       @PublicEvolving
        public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> 
trigger) {
                this.trigger = trigger;
                return this;
@@ -122,7 +122,7 @@ public class WindowedStream<T, K, W extends Window> {
         * Note: When using an evictor window performance will degrade 
significantly, since
         * pre-aggregation of window results cannot be used.
         */
-       @Experimental
+       @PublicEvolving
        public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> 
evictor) {
                this.evictor = evictor;
                return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 6c2d72c..327b524 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.environment;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
@@ -206,7 +206,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
         * @deprecated This will be removed once iterations properly 
participate in checkpointing.
         */
        @Deprecated
-       @Experimental
+       @PublicEvolving
        public boolean isForceCheckpointing() {
                return forceCheckpointing;
        }
@@ -219,7 +219,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
         * @deprecated This will be removed once iterations properly 
participate in checkpointing.
         */
        @Deprecated
-       @Experimental
+       @PublicEvolving
        public void setForceCheckpointing(boolean forceCheckpointing) {
                this.forceCheckpointing = forceCheckpointing;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 1e29155..8d829c6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.environment;
 import com.esotericsoftware.kryo.Serializer;
 import com.google.common.base.Preconditions;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -224,7 +224,7 @@ public abstract class StreamExecutionEnvironment {
         *
         * @return StreamExecutionEnvironment with chaining disabled.
         */
-       @Experimental
+       @PublicEvolving
        public StreamExecutionEnvironment disableOperatorChaining() {
                this.isChainingEnabled = false;
                return this;
@@ -235,7 +235,7 @@ public abstract class StreamExecutionEnvironment {
         *
         * @return {@code true} if chaining is enabled, false otherwise.
         */
-       @Experimental
+       @PublicEvolving
        public boolean isChainingEnabled() {
                return isChainingEnabled;
        }
@@ -321,7 +321,7 @@ public abstract class StreamExecutionEnvironment {
         */
        @Deprecated
        @SuppressWarnings("deprecation")
-       @Experimental
+       @PublicEvolving
        public StreamExecutionEnvironment enableCheckpointing(long interval, 
CheckpointingMode mode, boolean force) {
                checkpointCfg.setCheckpointingMode(mode);
                checkpointCfg.setCheckpointInterval(interval);
@@ -346,7 +346,7 @@ public abstract class StreamExecutionEnvironment {
         * @deprecated Use {@link #enableCheckpointing(long)} instead.
         */
        @Deprecated
-       @Experimental
+       @PublicEvolving
        public StreamExecutionEnvironment enableCheckpointing() {
                checkpointCfg.setCheckpointInterval(500);
                return this;
@@ -368,7 +368,7 @@ public abstract class StreamExecutionEnvironment {
         */
        @Deprecated
        @SuppressWarnings("deprecation")
-       @Experimental
+       @PublicEvolving
        public boolean isForceCheckpointing() {
                return checkpointCfg.isForceCheckpointing();
        }
@@ -406,7 +406,7 @@ public abstract class StreamExecutionEnvironment {
         * 
         * @see #getStateBackend()
         */
-       @Experimental
+       @PublicEvolving
        public StreamExecutionEnvironment setStateBackend(AbstractStateBackend 
backend) {
                this.defaultStateBackend = requireNonNull(backend);
                return this;
@@ -418,7 +418,7 @@ public abstract class StreamExecutionEnvironment {
         * 
         * @see #setStateBackend(AbstractStateBackend)
         */
-       @Experimental
+       @PublicEvolving
        public AbstractStateBackend getStateBackend() {
                return defaultStateBackend;
        }
@@ -432,7 +432,7 @@ public abstract class StreamExecutionEnvironment {
         * @param numberOfExecutionRetries
         *              The number of times the system will try to re-execute 
failed tasks.
         */
-       @Experimental
+       @PublicEvolving
        public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
                config.setNumberOfExecutionRetries(numberOfExecutionRetries);
        }
@@ -444,7 +444,7 @@ public abstract class StreamExecutionEnvironment {
         *
         * @return The number of times the system will try to re-execute failed 
tasks.
         */
-       @Experimental
+       @PublicEvolving
        public int getNumberOfExecutionRetries() {
                return config.getNumberOfExecutionRetries();
        }
@@ -456,7 +456,7 @@ public abstract class StreamExecutionEnvironment {
         * @param parallelism
         *              The parallelism to use as the default local parallelism.
         */
-       @Experimental
+       @PublicEvolving
        public static void setDefaultLocalParallelism(int parallelism) {
                defaultLocalParallelism = parallelism;
        }
@@ -562,7 +562,7 @@ public abstract class StreamExecutionEnvironment {
         * 
         * @param characteristic The time characteristic.
         */
-       @Experimental
+       @PublicEvolving
        public void setStreamTimeCharacteristic(TimeCharacteristic 
characteristic) {
                this.timeCharacteristic = requireNonNull(characteristic);
                if (characteristic == TimeCharacteristic.ProcessingTime) {
@@ -581,7 +581,7 @@ public abstract class StreamExecutionEnvironment {
         *
         * @return The time characteristic.
         */
-       @Experimental
+       @PublicEvolving
        public TimeCharacteristic getStreamTimeCharacteristic() {
                return timeCharacteristic;
        }
@@ -1013,7 +1013,7 @@ public abstract class StreamExecutionEnvironment {
         *              a       negative value ensures retrying forever.
         * @return A data stream containing the strings received from the socket
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSource<String> socketTextStream(String hostname, int 
port, char delimiter, long maxRetry) {
                return addSource(new SocketTextStreamFunction(hostname, port, 
delimiter, maxRetry),
                                "Socket Stream");
@@ -1032,7 +1032,7 @@ public abstract class StreamExecutionEnvironment {
         *              A character which splits received strings into records
         * @return A data stream containing the strings received from the socket
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSource<String> socketTextStream(String hostname, int 
port, char delimiter) {
                return socketTextStream(hostname, port, delimiter, 0);
        }
@@ -1049,7 +1049,7 @@ public abstract class StreamExecutionEnvironment {
         *              allocated.
         * @return A data stream containing the strings received from the socket
         */
-       @Experimental
+       @PublicEvolving
        public DataStreamSource<String> socketTextStream(String hostname, int 
port) {
                return socketTextStream(hostname, port, '\n');
        }
@@ -1070,7 +1070,7 @@ public abstract class StreamExecutionEnvironment {
         *              The type of the returned data stream
         * @return The data stream that represents the data created by the 
input format
         */
-       @Experimental
+       @PublicEvolving
        public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> 
inputFormat) {
                return createInput(inputFormat, 
TypeExtractor.getInputFormatTypes(inputFormat), "Custom File source");
        }
@@ -1089,7 +1089,7 @@ public abstract class StreamExecutionEnvironment {
         *              The type of the returned data stream
         * @return The data stream that represents the data created by the 
input format
         */
-       @Experimental
+       @PublicEvolving
        public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> 
inputFormat, TypeInformation<OUT> typeInfo) {
                return createInput(inputFormat, typeInfo, "Custom File source");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
index 8a516f1..9b04ad9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.functions.source;
 
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 
 /**
  * A marker interface that must be implemented by {@link SourceFunction}s that 
emit elements with
@@ -38,5 +38,5 @@ import org.apache.flink.annotation.Experimental;
  *
  * @param <T> Type of the elements emitted by this source.
  */
-@Experimental
+@PublicEvolving
 public interface EventTimeSourceFunction<T> extends SourceFunction<T> { }

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index 7ba10fd..c4139bd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -132,7 +132,7 @@ public interface SourceFunction<T> extends Function, 
Serializable {
                 * @param element The element to emit
                 * @param timestamp The timestamp in milliseconds
                 */
-               @Experimental
+               @PublicEvolving
                public void collectWithTimestamp(T element, long timestamp);
 
                /**
@@ -147,7 +147,7 @@ public interface SourceFunction<T> extends Function, 
Serializable {
                 *
                 * @param mark The {@link Watermark} to emit
                 */
-               @Experimental
+               @PublicEvolving
                void emitWatermark(Watermark mark);
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 904bd89..8f0d785 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => 
JavaAllWStream}
@@ -61,7 +61,7 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
   /**
    * Sets the [[Trigger]] that should be used to trigger window emission.
    */
-  @Experimental
+  @PublicEvolving
   def trigger(trigger: Trigger[_ >: T, _ >: W]): AllWindowedStream[T, W] = {
     javaStream.trigger(trigger)
     this
@@ -73,7 +73,7 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
    * Note: When using an evictor window performance will degrade 
significantly, since
    * pre-aggregation of window results cannot be used.
    */
-  @Experimental
+  @PublicEvolving
   def evictor(evictor: Evictor[_ >: T, _ >: W]): AllWindowedStream[T, W] = {
     javaStream.evictor(evictor)
     this

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index 27bc497..ce96e4f 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.functions.CoGroupFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
@@ -150,7 +150,7 @@ object CoGroupedStreams {
     /**
      * Specifies the window on which the co-group operation works.
      */
-    @Experimental
+    @PublicEvolving
     def window[W <: Window](
         assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, 
T2], W])
         : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
@@ -186,7 +186,7 @@ object CoGroupedStreams {
    * @tparam KEY Type of the key. This must be the same for both inputs
    * @tparam W Type of { @link Window} on which the co-group operation works.
    */
-  @Experimental
+  @PublicEvolving
   class WithWindow[T1, T2, KEY, W <: Window](
       input1: DataStream[T1],
       input2: DataStream[T2],
@@ -200,7 +200,7 @@ object CoGroupedStreams {
     /**
      * Sets the [[Trigger]] that should be used to trigger window emission.
      */
-    @Experimental
+    @PublicEvolving
     def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, 
T2], _ >: W])
     : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
       new WithWindow[T1, T2, KEY, W](
@@ -219,7 +219,7 @@ object CoGroupedStreams {
      * Note: When using an evictor window performance will degrade 
significantly, since
      * pre-aggregation of window results cannot be used.
      */
-    @Experimental
+    @PublicEvolving
     def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, 
T2], _ >: W])
     : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = {
       new WithWindow[T1, T2, KEY, W](

Reply via email to