http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 736c41b..04c1980 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.annotation.{Experimental, Public} +import org.apache.flink.annotation.{PublicEvolving, Public} import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner} import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.api.common.typeinfo.TypeInformation @@ -50,6 +50,7 @@ class DataStream[T](stream: JavaStream[T]) { /** * Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]]. + * * @return associated execution environment */ def getExecutionEnvironment: StreamExecutionEnvironment = @@ -60,7 +61,7 @@ class DataStream[T](stream: JavaStream[T]) { * * @return ID of the DataStream */ - @Experimental + @PublicEvolving def getId = stream.getId /** @@ -128,7 +129,7 @@ class DataStream[T](stream: JavaStream[T]) { * @param uid The unique user-specified ID of this transformation. * @return The operator with the specified ID. */ - @Experimental + @PublicEvolving def uid(uid: String) : DataStream[T] = javaStream match { case stream : SingleOutputStreamOperator[T,_] => stream.uid(uid) case _ => throw new UnsupportedOperationException("Only supported for operators.") @@ -142,7 +143,7 @@ class DataStream[T](stream: JavaStream[T]) { * however it is not advised for performance considerations. * */ - @Experimental + @PublicEvolving def disableChaining(): DataStream[T] = { stream match { case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining(); @@ -158,7 +159,7 @@ class DataStream[T](stream: JavaStream[T]) { * previous tasks even if possible. * */ - @Experimental + @PublicEvolving def startNewChain(): DataStream[T] = { stream match { case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain(); @@ -175,7 +176,7 @@ class DataStream[T](stream: JavaStream[T]) { * All subsequent operators are assigned to the default resource group. * */ - @Experimental + @PublicEvolving def isolateResources(): DataStream[T] = { stream match { case ds: SingleOutputStreamOperator[_, _] => ds.isolateResources(); @@ -196,7 +197,7 @@ class DataStream[T](stream: JavaStream[T]) { * degree of parallelism for the operators must be decreased from the * default. */ - @Experimental + @PublicEvolving def startNewResourceGroup(): DataStream[T] = { stream match { case ds: SingleOutputStreamOperator[_, _] => ds.startNewResourceGroup(); @@ -345,14 +346,14 @@ class DataStream[T](stream: JavaStream[T]) { * the first instance of the next processing operator. Use this setting with care * since it might cause a serious performance bottleneck in the application. */ - @Experimental + @PublicEvolving def global: DataStream[T] = stream.global() /** * Sets the partitioning of the DataStream so that the output tuples * are shuffled to the next component. */ - @Experimental + @PublicEvolving def shuffle: DataStream[T] = stream.shuffle() /** @@ -385,7 +386,7 @@ class DataStream[T](stream: JavaStream[T]) { * In cases where the different parallelisms are not multiples of each other one or several * downstream operations will have a differing number of inputs from upstream operations. */ - @Experimental + @PublicEvolving def rescale: DataStream[T] = stream.rescale() /** @@ -408,7 +409,7 @@ class DataStream[T](stream: JavaStream[T]) { * the keepPartitioning flag to true * */ - @Experimental + @PublicEvolving def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), maxWaitTimeMillis:Long = 0, keepPartitioning: Boolean = false) : DataStream[R] = { @@ -438,7 +439,7 @@ class DataStream[T](stream: JavaStream[T]) { * to 0 then the iteration sources will indefinitely, so the job must be killed to stop. * */ - @Experimental + @PublicEvolving def iterate[R, F: TypeInformation: ClassTag](stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = { val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]] @@ -625,7 +626,7 @@ class DataStream[T](stream: JavaStream[T]) { * @param assigner The `WindowAssigner` that assigns elements to windows. * @return The trigger windows data stream. */ - @Experimental + @PublicEvolving def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W] = { new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](stream, assigner)) } @@ -640,7 +641,7 @@ class DataStream[T](stream: JavaStream[T]) { * * @see org.apache.flink.streaming.api.watermark.Watermark */ - @Experimental + @PublicEvolving def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = { stream.assignTimestamps(clean(extractor)) } @@ -656,7 +657,7 @@ class DataStream[T](stream: JavaStream[T]) { * * @see org.apache.flink.streaming.api.watermark.Watermark */ - @Experimental + @PublicEvolving def assignAscendingTimestamps(extractor: T => Long): DataStream[T] = { val cleanExtractor = clean(extractor) val extractorFunction = new AscendingTimestampExtractor[T] { @@ -714,7 +715,7 @@ class DataStream[T](stream: JavaStream[T]) { * written. * */ - @Experimental + @PublicEvolving def print(): DataStreamSink[T] = stream.print() /** @@ -725,7 +726,7 @@ class DataStream[T](stream: JavaStream[T]) { * * @return The closed DataStream. */ - @Experimental + @PublicEvolving def printToErr() = stream.printToErr() /** @@ -735,7 +736,7 @@ class DataStream[T](stream: JavaStream[T]) { * @param path The path pointing to the location the text file is written to * @return The closed DataStream */ - @Experimental + @PublicEvolving def writeAsText(path: String): DataStreamSink[T] = stream.writeAsText(path, 0L) @@ -749,7 +750,7 @@ class DataStream[T](stream: JavaStream[T]) { * @param millis The file update frequency * @return The closed DataStream */ - @Experimental + @PublicEvolving def writeAsText(path: String, millis: Long): DataStreamSink[T] = stream.writeAsText(path, millis) @@ -762,7 +763,7 @@ class DataStream[T](stream: JavaStream[T]) { * OVERWRITE. * @return The closed DataStream */ - @Experimental + @PublicEvolving def writeAsText(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T] = { if (writeMode != null) { stream.writeAsText(path, writeMode) @@ -782,7 +783,7 @@ class DataStream[T](stream: JavaStream[T]) { * @param millis The file update frequency * @return The closed DataStream */ - @Experimental + @PublicEvolving def writeAsText( path: String, writeMode: FileSystem.WriteMode, @@ -802,7 +803,7 @@ class DataStream[T](stream: JavaStream[T]) { * @param path Path to the location of the CSV file * @return The closed DataStream */ - @Experimental + @PublicEvolving def writeAsCsv(path: String): DataStreamSink[T] = { writeAsCsv( path, @@ -820,7 +821,7 @@ class DataStream[T](stream: JavaStream[T]) { * @param millis File update frequency * @return The closed DataStream */ - @Experimental + @PublicEvolving def writeAsCsv(path: String, millis: Long): DataStreamSink[T] = { writeAsCsv( path, @@ -838,7 +839,7 @@ class DataStream[T](stream: JavaStream[T]) { * @param writeMode Controls whether an existing file is overwritten or not * @return The closed DataStream */ - @Experimental + @PublicEvolving def writeAsCsv(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T] = { writeAsCsv( path, @@ -857,7 +858,7 @@ class DataStream[T](stream: JavaStream[T]) { * @param millis File update frequency * @return The closed DataStream */ - @Experimental + @PublicEvolving def writeAsCsv(path: String, writeMode: FileSystem.WriteMode, millis: Long): DataStreamSink[T] = { writeAsCsv( path, @@ -878,7 +879,7 @@ class DataStream[T](stream: JavaStream[T]) { * @param fieldDelimiter Delimiter for consecutive fields * @return The closed DataStream */ - @Experimental + @PublicEvolving def writeAsCsv( path: String, writeMode: FileSystem.WriteMode, @@ -898,7 +899,7 @@ class DataStream[T](stream: JavaStream[T]) { * Writes a DataStream using the given [[OutputFormat]]. The * writing is performed periodically, in every millis milliseconds. */ - @Experimental + @PublicEvolving def write(format: OutputFormat[T], millis: Long): DataStreamSink[T] = { stream.write(format, millis) } @@ -907,7 +908,7 @@ class DataStream[T](stream: JavaStream[T]) { * Writes the DataStream to a socket as a byte array. The format of the output is * specified by a [[SerializationSchema]]. */ - @Experimental + @PublicEvolving def writeToSocket( hostname: String, port: Integer,
http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala index b6fbadf..21c5d84 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.annotation.{Experimental, Public} +import org.apache.flink.annotation.{PublicEvolving, Public} import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector @@ -148,7 +148,7 @@ object JoinedStreams { /** * Specifies the window on which the join operation works. */ - @Experimental + @PublicEvolving def window[W <: Window]( assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W]) : JoinedStreams.WithWindow[T1, T2, KEY, W] = { @@ -197,7 +197,7 @@ object JoinedStreams { /** * Sets the [[Trigger]] that should be used to trigger window emission. */ - @Experimental + @PublicEvolving def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) : JoinedStreams.WithWindow[T1, T2, KEY, W] = { new WithWindow[T1, T2, KEY, W]( @@ -216,7 +216,7 @@ object JoinedStreams { * Note: When using an evictor window performance will degrade significantly, since * pre-aggregation of window results cannot be used. */ - @Experimental + @PublicEvolving def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) : JoinedStreams.WithWindow[T1, T2, KEY, W] = { new WithWindow[T1, T2, KEY, W]( http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index 271796b..923aad6 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.annotation.{Experimental, Internal, Public} +import org.apache.flink.annotation.{PublicEvolving, Internal, Public} import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer @@ -112,7 +112,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * @param assigner The `WindowAssigner` that assigns elements to windows. * @return The trigger windows data stream. */ - @Experimental + @PublicEvolving def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W] = { new WindowedStream(new WindowedJavaStream[T, K, W](javaStream, assigner)) } http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 4317931..58b100e 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.scala import com.esotericsoftware.kryo.Serializer -import org.apache.flink.annotation.{Internal, Experimental, Public} +import org.apache.flink.annotation.{Internal, PublicEvolving, Public} import org.apache.flink.api.common.io.{FileInputFormat, InputFormat} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer @@ -91,7 +91,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * avoiding serialization and de-serialization. * */ - @Experimental + @PublicEvolving def disableOperatorChaining(): StreamExecutionEnvironment = { javaEnv.disableOperatorChaining() this @@ -127,7 +127,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * If true checkpointing will be enabled for iterative jobs as well. */ @deprecated - @Experimental + @PublicEvolving def enableCheckpointing(interval : Long, mode: CheckpointingMode, force: Boolean) : StreamExecutionEnvironment = { @@ -191,7 +191,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * failure the job will be resubmitted to the cluster indefinitely. */ @deprecated - @Experimental + @PublicEvolving def enableCheckpointing() : StreamExecutionEnvironment = { javaEnv.enableCheckpointing() this @@ -218,7 +218,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * program can be executed highly available and strongly consistent (assuming that Flink * is run in high-availability mode). */ - @Experimental + @PublicEvolving def setStateBackend(backend: AbstractStateBackend): StreamExecutionEnvironment = { javaEnv.setStateBackend(backend) this @@ -227,7 +227,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { /** * Returns the state backend that defines how to store and checkpoint state. */ - @Experimental + @PublicEvolving def getStateBackend: AbstractStateBackend = javaEnv.getStateBackend() /** @@ -235,7 +235,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * effectively disables fault tolerance. A value of "-1" indicates that the system * default value (as defined in the configuration) should be used. */ - @Experimental + @PublicEvolving def setNumberOfExecutionRetries(numRetries: Int): Unit = { javaEnv.setNumberOfExecutionRetries(numRetries) } @@ -245,7 +245,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * of "-1" indicates that the system default value (as defined in the configuration) * should be used. */ - @Experimental + @PublicEvolving def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries // -------------------------------------------------------------------------------------------- @@ -327,7 +327,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * * @param characteristic The time characteristic. */ - @Experimental + @PublicEvolving def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = { javaEnv.setStreamTimeCharacteristic(characteristic) } @@ -336,10 +336,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * Gets the time characteristic/ * * @see #setStreamTimeCharacteristic - * * @return The time characteristic. */ - @Experimental + @PublicEvolving def getStreamTimeCharacteristic = javaEnv.getStreamTimeCharacteristic() // -------------------------------------------------------------------------------------------- @@ -474,7 +473,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * character set. The maximum retry interval is specified in seconds, in case * of temporary service outage reconnection is initiated every second. */ - @Experimental + @PublicEvolving def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0): DataStream[String] = javaEnv.socketTextStream(hostname, port) @@ -485,7 +484,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * determine the type of the data produced by the input format. It will attempt to determine the * data type by reflection, unless the input format implements the ResultTypeQueryable interface. */ - @Experimental + @PublicEvolving def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] = javaEnv.createInput(inputFormat) @@ -562,6 +561,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { /** * Getter of the wrapped [[org.apache.flink.streaming.api.environment.StreamExecutionEnvironment]] + * * @return The encased ExecutionEnvironment */ @Internal @@ -590,7 +590,7 @@ object StreamExecutionEnvironment { * @param parallelism * The parallelism to use as the default local parallelism. */ - @Experimental + @PublicEvolving def setDefaultLocalParallelism(parallelism: Int) : Unit = StreamExecutionEnvironment.setDefaultLocalParallelism(parallelism) http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index 15b9505..6385831 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.annotation.{Experimental, Public} +import org.apache.flink.annotation.{PublicEvolving, Public} import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream} @@ -64,7 +64,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { /** * Sets the [[Trigger]] that should be used to trigger window emission. */ - @Experimental + @PublicEvolving def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W] = { javaStream.trigger(trigger) this @@ -76,7 +76,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * Note: When using an evictor window performance will degrade significantly, since * pre-aggregation of window results cannot be used. */ - @Experimental + @PublicEvolving def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W] = { javaStream.evictor(evictor) this