This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new cff00f5 [FLINK-23330][table-api] Deprecate old fromDataStream, toAppendStream, toRetractStream cff00f5 is described below commit cff00f59411832eecde720dcb4cc7da64ac242f6 Author: Timo Walther <twal...@apache.org> AuthorDate: Tue Aug 17 09:33:43 2021 +0200 [FLINK-23330][table-api] Deprecate old fromDataStream, toAppendStream, toRetractStream This closes #16855. --- .../docs/deployment/repls/scala_shell.md | 32 ++++++++++++---------- docs/content/docs/deployment/repls/scala_shell.md | 32 ++++++++++++---------- .../api/bridge/java/StreamTableEnvironment.java | 31 +++++++++++++++++++++ .../api/bridge/scala/StreamTableEnvironment.scala | 24 +++++++++++++++- 4 files changed, 90 insertions(+), 29 deletions(-) diff --git a/docs/content.zh/docs/deployment/repls/scala_shell.md b/docs/content.zh/docs/deployment/repls/scala_shell.md index 889dea1..4e2baa5 100644 --- a/docs/content.zh/docs/deployment/repls/scala_shell.md +++ b/docs/content.zh/docs/deployment/repls/scala_shell.md @@ -104,7 +104,7 @@ The example below is a wordcount program using Table API: {{< tabs "a5a84572-8c20-46a1-b0bc-7c3347a9ff43" >}} {{< tab "stream" >}} ```scala -Scala-Flink> import org.apache.flink.table.functions.TableFunction +Scala-Flink> val stenv = StreamTableEnvironment.create(senv) Scala-Flink> val textSource = stenv.fromDataStream( senv.fromElements( "To be, or not to be,--that is the question:--", @@ -112,6 +112,7 @@ Scala-Flink> val textSource = stenv.fromDataStream( "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,"), 'text) +Scala-Flink> import org.apache.flink.table.functions.TableFunction Scala-Flink> class $Split extends TableFunction[String] { def eval(s: String): Unit = { s.toLowerCase.split("\\W+").foreach(collect) @@ -120,20 +121,21 @@ Scala-Flink> class $Split extends TableFunction[String] { Scala-Flink> val split = new $Split Scala-Flink> textSource.join(split('text) as 'word). groupBy('word).select('word, 'word.count as 'count). - toRetractStream[(String, Long)].print + toChangelogStream().print Scala-Flink> senv.execute("Table Wordcount") ``` {{< /tab >}} {{< tab "batch" >}} ```scala -Scala-Flink> import org.apache.flink.table.functions.TableFunction -Scala-Flink> val textSource = btenv.fromDataSet( - benv.fromElements( +Scala-Flink> val btenv = StreamTableEnvironment.create(senv, EnvironmentSettings.inBatchMode()) +Scala-Flink> val textSource = btenv.fromDataStream( + senv.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,"), 'text) +Scala-Flink> import org.apache.flink.table.functions.TableFunction Scala-Flink> class $Split extends TableFunction[String] { def eval(s: String): Unit = { s.toLowerCase.split("\\W+").foreach(collect) @@ -142,7 +144,7 @@ Scala-Flink> class $Split extends TableFunction[String] { Scala-Flink> val split = new $Split Scala-Flink> textSource.join(split('text) as 'word). groupBy('word).select('word, 'word.count as 'count). - toDataSet[(String, Long)].print + toDataStream().print ``` {{< /tab >}} {{< /tabs >}} @@ -155,7 +157,7 @@ The following example is a wordcount program written in SQL: {{< tabs "3b210000-4585-497a-8636-c7583d10ff42" >}} {{< tab "stream" >}} ```scala -Scala-Flink> import org.apache.flink.table.functions.TableFunction +Scala-Flink> val stenv = StreamTableEnvironment.create(senv) Scala-Flink> val textSource = stenv.fromDataStream( senv.fromElements( "To be, or not to be,--that is the question:--", @@ -164,44 +166,46 @@ Scala-Flink> val textSource = stenv.fromDataStream( "Or to take arms against a sea of troubles,"), 'text) Scala-Flink> stenv.createTemporaryView("text_source", textSource) +Scala-Flink> import org.apache.flink.table.functions.TableFunction Scala-Flink> class $Split extends TableFunction[String] { def eval(s: String): Unit = { s.toLowerCase.split("\\W+").foreach(collect) } } -Scala-Flink> stenv.registerFunction("split", new $Split) +Scala-Flink> stenv.createTemporarySystemFunction("split", new $Split) Scala-Flink> val result = stenv.sqlQuery("""SELECT T.word, count(T.word) AS `count` FROM text_source JOIN LATERAL table(split(text)) AS T(word) ON TRUE GROUP BY T.word""") -Scala-Flink> result.toRetractStream[(String, Long)].print +Scala-Flink> result.toChangelogStream().print Scala-Flink> senv.execute("SQL Wordcount") ``` {{< /tab >}} {{< tab "batch" >}} ```scala -Scala-Flink> import org.apache.flink.table.functions.TableFunction -Scala-Flink> val textSource = btenv.fromDataSet( - benv.fromElements( +Scala-Flink> val btenv = StreamTableEnvironment.create(senv, EnvironmentSettings.inBatchMode()) +Scala-Flink> val textSource = btenv.fromDataStream( + senv.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,"), 'text) Scala-Flink> btenv.createTemporaryView("text_source", textSource) +Scala-Flink> import org.apache.flink.table.functions.TableFunction Scala-Flink> class $Split extends TableFunction[String] { def eval(s: String): Unit = { s.toLowerCase.split("\\W+").foreach(collect) } } -Scala-Flink> btenv.registerFunction("split", new $Split) +Scala-Flink> btenv.createTemporarySystemFunction("split", new $Split) Scala-Flink> val result = btenv.sqlQuery("""SELECT T.word, count(T.word) AS `count` FROM text_source JOIN LATERAL table(split(text)) AS T(word) ON TRUE GROUP BY T.word""") -Scala-Flink> result.toDataSet[(String, Long)].print +Scala-Flink> result.toDataStream().print ``` {{< /tab >}} {{< /tabs >}} diff --git a/docs/content/docs/deployment/repls/scala_shell.md b/docs/content/docs/deployment/repls/scala_shell.md index 72211c4..353f501 100644 --- a/docs/content/docs/deployment/repls/scala_shell.md +++ b/docs/content/docs/deployment/repls/scala_shell.md @@ -104,7 +104,7 @@ The example below is a wordcount program using Table API: {{< tabs "a5a84572-8c20-46a1-b0bc-7c3347a9ff43" >}} {{< tab "stream" >}} ```scala -Scala-Flink> import org.apache.flink.table.functions.TableFunction +Scala-Flink> val stenv = StreamTableEnvironment.create(senv) Scala-Flink> val textSource = stenv.fromDataStream( senv.fromElements( "To be, or not to be,--that is the question:--", @@ -112,6 +112,7 @@ Scala-Flink> val textSource = stenv.fromDataStream( "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,"), 'text) +Scala-Flink> import org.apache.flink.table.functions.TableFunction Scala-Flink> class $Split extends TableFunction[String] { def eval(s: String): Unit = { s.toLowerCase.split("\\W+").foreach(collect) @@ -120,20 +121,21 @@ Scala-Flink> class $Split extends TableFunction[String] { Scala-Flink> val split = new $Split Scala-Flink> textSource.join(split('text) as 'word). groupBy('word).select('word, 'word.count as 'count). - toRetractStream[(String, Long)].print + toChangelogStream().print Scala-Flink> senv.execute("Table Wordcount") ``` {{< /tab >}} {{< tab "batch" >}} ```scala -Scala-Flink> import org.apache.flink.table.functions.TableFunction -Scala-Flink> val textSource = btenv.fromDataSet( - benv.fromElements( +Scala-Flink> val btenv = StreamTableEnvironment.create(senv, EnvironmentSettings.inBatchMode()) +Scala-Flink> val textSource = btenv.fromDataStream( + senv.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,"), 'text) +Scala-Flink> import org.apache.flink.table.functions.TableFunction Scala-Flink> class $Split extends TableFunction[String] { def eval(s: String): Unit = { s.toLowerCase.split("\\W+").foreach(collect) @@ -142,7 +144,7 @@ Scala-Flink> class $Split extends TableFunction[String] { Scala-Flink> val split = new $Split Scala-Flink> textSource.join(split('text) as 'word). groupBy('word).select('word, 'word.count as 'count). - toDataSet[(String, Long)].print + toDataStream().print ``` {{< /tab >}} {{< /tabs >}} @@ -155,7 +157,7 @@ The following example is a wordcount program written in SQL: {{< tabs "3b210000-4585-497a-8636-c7583d10ff42" >}} {{< tab "stream" >}} ```scala -Scala-Flink> import org.apache.flink.table.functions.TableFunction +Scala-Flink> val stenv = StreamTableEnvironment.create(senv) Scala-Flink> val textSource = stenv.fromDataStream( senv.fromElements( "To be, or not to be,--that is the question:--", @@ -164,44 +166,46 @@ Scala-Flink> val textSource = stenv.fromDataStream( "Or to take arms against a sea of troubles,"), 'text) Scala-Flink> stenv.createTemporaryView("text_source", textSource) +Scala-Flink> import org.apache.flink.table.functions.TableFunction Scala-Flink> class $Split extends TableFunction[String] { def eval(s: String): Unit = { s.toLowerCase.split("\\W+").foreach(collect) } } -Scala-Flink> stenv.registerFunction("split", new $Split) +Scala-Flink> stenv.createTemporarySystemFunction("split", new $Split) Scala-Flink> val result = stenv.sqlQuery("""SELECT T.word, count(T.word) AS `count` FROM text_source JOIN LATERAL table(split(text)) AS T(word) ON TRUE GROUP BY T.word""") -Scala-Flink> result.toRetractStream[(String, Long)].print +Scala-Flink> result.toChangelogStream().print Scala-Flink> senv.execute("SQL Wordcount") ``` {{< /tab >}} {{< tab "batch" >}} ```scala -Scala-Flink> import org.apache.flink.table.functions.TableFunction -Scala-Flink> val textSource = btenv.fromDataSet( - benv.fromElements( +Scala-Flink> val btenv = StreamTableEnvironment.create(senv, EnvironmentSettings.inBatchMode()) +Scala-Flink> val textSource = btenv.fromDataStream( + senv.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,"), 'text) Scala-Flink> btenv.createTemporaryView("text_source", textSource) +Scala-Flink> import org.apache.flink.table.functions.TableFunction Scala-Flink> class $Split extends TableFunction[String] { def eval(s: String): Unit = { s.toLowerCase.split("\\W+").foreach(collect) } } -Scala-Flink> btenv.registerFunction("split", new $Split) +Scala-Flink> btenv.createTemporarySystemFunction("split", new $Split) Scala-Flink> val result = btenv.sqlQuery("""SELECT T.word, count(T.word) AS `count` FROM text_source JOIN LATERAL table(split(text)) AS T(word) ON TRUE GROUP BY T.word""") -Scala-Flink> result.toDataSet[(String, Long)].print +Scala-Flink> result.toDataStream().print ``` {{< /tab >}} {{< /tabs >}} diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java index 2991a2d..7e4bd4c 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java @@ -826,7 +826,12 @@ public interface StreamTableEnvironment extends TableEnvironment { * of the {@code Table}. * @param <T> The type of the {@link DataStream}. * @return The converted {@link Table}. + * @deprecated Use {@link #fromDataStream(DataStream, Schema)} instead. In most cases, {@link + * #fromDataStream(DataStream)} should already be sufficient. It integrates with the new + * type system and supports all kinds of {@link DataTypes} that the table runtime can + * consume. The semantics might be slightly different for raw and structured types. */ + @Deprecated <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields); /** @@ -1010,7 +1015,13 @@ public interface StreamTableEnvironment extends TableEnvironment { * @param fields The fields expressions to map original fields of the DataStream to the fields * of the View. * @param <T> The type of the {@link DataStream}. + * @deprecated Use {@link #createTemporaryView(String, DataStream, Schema)} instead. In most + * cases, {@link #createTemporaryView(String, DataStream)} should already be sufficient. It + * integrates with the new type system and supports all kinds of {@link DataTypes} that the + * table runtime can consume. The semantics might be slightly different for raw and + * structured types. */ + @Deprecated <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields); /** @@ -1031,7 +1042,13 @@ public interface StreamTableEnvironment extends TableEnvironment { * @param clazz The class of the type of the resulting {@link DataStream}. * @param <T> The type of the resulting {@link DataStream}. * @return The converted {@link DataStream}. + * @deprecated Use {@link #toDataStream(Table, Class)} instead. It integrates with the new type + * system and supports all kinds of {@link DataTypes} that the table runtime can produce. + * The semantics might be slightly different for raw and structured types. Use {@code + * toDataStream(DataTypes.of(TypeInformation.of(Class)))} if {@link TypeInformation} should + * be used as source of truth. */ + @Deprecated <T> DataStream<T> toAppendStream(Table table, Class<T> clazz); /** @@ -1053,7 +1070,13 @@ public interface StreamTableEnvironment extends TableEnvironment { * DataStream}. * @param <T> The type of the resulting {@link DataStream}. * @return The converted {@link DataStream}. + * @deprecated Use {@link #toDataStream(Table, Class)} instead. It integrates with the new type + * system and supports all kinds of {@link DataTypes} that the table runtime can produce. + * The semantics might be slightly different for raw and structured types. Use {@code + * toDataStream(DataTypes.of(TypeInformation.of(Class)))} if {@link TypeInformation} should + * be used as source of truth. */ + @Deprecated <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo); /** @@ -1076,7 +1099,11 @@ public interface StreamTableEnvironment extends TableEnvironment { * @param clazz The class of the requested record type. * @param <T> The type of the requested record type. * @return The converted {@link DataStream}. + * @deprecated Use {@link #toChangelogStream(Table, Schema)} instead. It integrates with the new + * type system and supports all kinds of {@link DataTypes} and every {@link ChangelogMode} + * that the table runtime can produce. */ + @Deprecated <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz); /** @@ -1099,7 +1126,11 @@ public interface StreamTableEnvironment extends TableEnvironment { * @param typeInfo The {@link TypeInformation} of the requested record type. * @param <T> The type of the requested record type. * @return The converted {@link DataStream}. + * @deprecated Use {@link #toChangelogStream(Table, Schema)} instead. It integrates with the new + * type system and supports all kinds of {@link DataTypes} and every {@link ChangelogMode} + * that the table runtime can produce. */ + @Deprecated <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo); /** diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala index 5123d66..8ccad40 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala @@ -619,7 +619,7 @@ trait StreamTableEnvironment extends TableEnvironment { * ) * }}} * - * <p>2. Reference input fields by position: + * 2. Reference input fields by position: * In this mode, fields are simply renamed. Event-time attributes can * replace the field on their position in the input data (if it is of correct type) or be * appended at the end. Proctime attributes must be appended at the end. This mode can only be @@ -644,7 +644,13 @@ trait StreamTableEnvironment extends TableEnvironment { * the [[Table]]. * @tparam T The type of the [[DataStream]]. * @return The converted [[Table]]. + * @deprecated Use [[fromDataStream(DataStream, Schema)]] instead. In most cases, + * [[fromDataStream(DataStream)]] should already be sufficient. It integrates with + * the new type system and supports all kinds of [[DataTypes]] that the table runtime + * can consume. The semantics might be slightly different for raw and structured + * types. */ + @deprecated def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table /** @@ -790,7 +796,13 @@ trait StreamTableEnvironment extends TableEnvironment { * @param fields The fields expressions to map original fields of the DataStream to the fields of * the View. * @tparam T The type of the [[DataStream]]. + * @deprecated Use [[createTemporaryView(String, DataStream, Schema)]] instead. In most cases, + * [[createTemporaryView(String, DataStream)]] should already be sufficient. It + * integrates with the new type system and supports all kinds of [[DataTypes]] that + * the table runtime can consume. The semantics might be slightly different for raw + * and structured types. */ + @deprecated def createTemporaryView[T](path: String, dataStream: DataStream[T], fields: Expression*): Unit /** @@ -807,7 +819,13 @@ trait StreamTableEnvironment extends TableEnvironment { * @param table The [[Table]] to convert. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. + * @deprecated Use [[toDataStream(Table, Class)]] instead. It integrates with the new type + * system and supports all kinds of [[DataTypes]] that the table runtime can produce. + * The semantics might be slightly different for raw and structured types. Use + * `toDataStream(DataTypes.of(Types.of[Class]))` if [[TypeInformation]] + * should be used as source of truth. */ + @deprecated def toAppendStream[T: TypeInformation](table: Table): DataStream[T] /** @@ -820,7 +838,11 @@ trait StreamTableEnvironment extends TableEnvironment { * @param table The [[Table]] to convert. * @tparam T The type of the requested data type. * @return The converted [[DataStream]]. + * @deprecated Use [[toChangelogStream(Table, Schema)]] instead. It integrates with the new + * type system and supports all kinds of [[DataTypes]] and every [[ChangelogMode]] + * that the table runtime can produce. */ + @deprecated def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] /**