This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cff00f5  [FLINK-23330][table-api] Deprecate old fromDataStream, 
toAppendStream, toRetractStream
cff00f5 is described below

commit cff00f59411832eecde720dcb4cc7da64ac242f6
Author: Timo Walther <twal...@apache.org>
AuthorDate: Tue Aug 17 09:33:43 2021 +0200

    [FLINK-23330][table-api] Deprecate old fromDataStream, toAppendStream, 
toRetractStream
    
    This closes #16855.
---
 .../docs/deployment/repls/scala_shell.md           | 32 ++++++++++++----------
 docs/content/docs/deployment/repls/scala_shell.md  | 32 ++++++++++++----------
 .../api/bridge/java/StreamTableEnvironment.java    | 31 +++++++++++++++++++++
 .../api/bridge/scala/StreamTableEnvironment.scala  | 24 +++++++++++++++-
 4 files changed, 90 insertions(+), 29 deletions(-)

diff --git a/docs/content.zh/docs/deployment/repls/scala_shell.md 
b/docs/content.zh/docs/deployment/repls/scala_shell.md
index 889dea1..4e2baa5 100644
--- a/docs/content.zh/docs/deployment/repls/scala_shell.md
+++ b/docs/content.zh/docs/deployment/repls/scala_shell.md
@@ -104,7 +104,7 @@ The example below is a wordcount program using Table API:
 {{< tabs "a5a84572-8c20-46a1-b0bc-7c3347a9ff43" >}}
 {{< tab "stream" >}}
 ```scala
-Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val stenv = StreamTableEnvironment.create(senv)
 Scala-Flink> val textSource = stenv.fromDataStream(
   senv.fromElements(
     "To be, or not to be,--that is the question:--",
@@ -112,6 +112,7 @@ Scala-Flink> val textSource = stenv.fromDataStream(
     "The slings and arrows of outrageous fortune",
     "Or to take arms against a sea of troubles,"),
   'text)
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
 Scala-Flink> class $Split extends TableFunction[String] {
     def eval(s: String): Unit = {
       s.toLowerCase.split("\\W+").foreach(collect)
@@ -120,20 +121,21 @@ Scala-Flink> class $Split extends TableFunction[String] {
 Scala-Flink> val split = new $Split
 Scala-Flink> textSource.join(split('text) as 'word).
     groupBy('word).select('word, 'word.count as 'count).
-    toRetractStream[(String, Long)].print
+    toChangelogStream().print
 Scala-Flink> senv.execute("Table Wordcount")
 ```
 {{< /tab >}}
 {{< tab "batch" >}}
 ```scala
-Scala-Flink> import org.apache.flink.table.functions.TableFunction
-Scala-Flink> val textSource = btenv.fromDataSet(
-  benv.fromElements(
+Scala-Flink> val btenv = StreamTableEnvironment.create(senv, 
EnvironmentSettings.inBatchMode())
+Scala-Flink> val textSource = btenv.fromDataStream(
+  senv.fromElements(
     "To be, or not to be,--that is the question:--",
     "Whether 'tis nobler in the mind to suffer",
     "The slings and arrows of outrageous fortune",
     "Or to take arms against a sea of troubles,"), 
   'text)
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
 Scala-Flink> class $Split extends TableFunction[String] {
     def eval(s: String): Unit = {
       s.toLowerCase.split("\\W+").foreach(collect)
@@ -142,7 +144,7 @@ Scala-Flink> class $Split extends TableFunction[String] {
 Scala-Flink> val split = new $Split
 Scala-Flink> textSource.join(split('text) as 'word).
     groupBy('word).select('word, 'word.count as 'count).
-    toDataSet[(String, Long)].print
+    toDataStream().print
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -155,7 +157,7 @@ The following example is a wordcount program written in SQL:
 {{< tabs "3b210000-4585-497a-8636-c7583d10ff42" >}}
 {{< tab "stream" >}}
 ```scala
-Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val stenv = StreamTableEnvironment.create(senv)
 Scala-Flink> val textSource = stenv.fromDataStream(
   senv.fromElements(
     "To be, or not to be,--that is the question:--",
@@ -164,44 +166,46 @@ Scala-Flink> val textSource = stenv.fromDataStream(
     "Or to take arms against a sea of troubles,"), 
   'text)
 Scala-Flink> stenv.createTemporaryView("text_source", textSource)
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
 Scala-Flink> class $Split extends TableFunction[String] {
     def eval(s: String): Unit = {
       s.toLowerCase.split("\\W+").foreach(collect)
     }
   }
-Scala-Flink> stenv.registerFunction("split", new $Split)
+Scala-Flink> stenv.createTemporarySystemFunction("split", new $Split)
 Scala-Flink> val result = stenv.sqlQuery("""SELECT T.word, count(T.word) AS 
`count` 
     FROM text_source 
     JOIN LATERAL table(split(text)) AS T(word) 
     ON TRUE 
     GROUP BY T.word""")
-Scala-Flink> result.toRetractStream[(String, Long)].print
+Scala-Flink> result.toChangelogStream().print
 Scala-Flink> senv.execute("SQL Wordcount")
 ```
 {{< /tab >}}
 {{< tab "batch" >}}
 ```scala
-Scala-Flink> import org.apache.flink.table.functions.TableFunction
-Scala-Flink> val textSource = btenv.fromDataSet(
-  benv.fromElements(
+Scala-Flink> val btenv = StreamTableEnvironment.create(senv, 
EnvironmentSettings.inBatchMode())
+Scala-Flink> val textSource = btenv.fromDataStream(
+  senv.fromElements(
     "To be, or not to be,--that is the question:--",
     "Whether 'tis nobler in the mind to suffer",
     "The slings and arrows of outrageous fortune",
     "Or to take arms against a sea of troubles,"), 
   'text)
 Scala-Flink> btenv.createTemporaryView("text_source", textSource)
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
 Scala-Flink> class $Split extends TableFunction[String] {
     def eval(s: String): Unit = {
       s.toLowerCase.split("\\W+").foreach(collect)
     }
   }
-Scala-Flink> btenv.registerFunction("split", new $Split)
+Scala-Flink> btenv.createTemporarySystemFunction("split", new $Split)
 Scala-Flink> val result = btenv.sqlQuery("""SELECT T.word, count(T.word) AS 
`count` 
     FROM text_source 
     JOIN LATERAL table(split(text)) AS T(word) 
     ON TRUE 
     GROUP BY T.word""")
-Scala-Flink> result.toDataSet[(String, Long)].print
+Scala-Flink> result.toDataStream().print
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git a/docs/content/docs/deployment/repls/scala_shell.md 
b/docs/content/docs/deployment/repls/scala_shell.md
index 72211c4..353f501 100644
--- a/docs/content/docs/deployment/repls/scala_shell.md
+++ b/docs/content/docs/deployment/repls/scala_shell.md
@@ -104,7 +104,7 @@ The example below is a wordcount program using Table API:
 {{< tabs "a5a84572-8c20-46a1-b0bc-7c3347a9ff43" >}}
 {{< tab "stream" >}}
 ```scala
-Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val stenv = StreamTableEnvironment.create(senv)
 Scala-Flink> val textSource = stenv.fromDataStream(
   senv.fromElements(
     "To be, or not to be,--that is the question:--",
@@ -112,6 +112,7 @@ Scala-Flink> val textSource = stenv.fromDataStream(
     "The slings and arrows of outrageous fortune",
     "Or to take arms against a sea of troubles,"),
   'text)
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
 Scala-Flink> class $Split extends TableFunction[String] {
     def eval(s: String): Unit = {
       s.toLowerCase.split("\\W+").foreach(collect)
@@ -120,20 +121,21 @@ Scala-Flink> class $Split extends TableFunction[String] {
 Scala-Flink> val split = new $Split
 Scala-Flink> textSource.join(split('text) as 'word).
     groupBy('word).select('word, 'word.count as 'count).
-    toRetractStream[(String, Long)].print
+    toChangelogStream().print
 Scala-Flink> senv.execute("Table Wordcount")
 ```
 {{< /tab >}}
 {{< tab "batch" >}}
 ```scala
-Scala-Flink> import org.apache.flink.table.functions.TableFunction
-Scala-Flink> val textSource = btenv.fromDataSet(
-  benv.fromElements(
+Scala-Flink> val btenv = StreamTableEnvironment.create(senv, 
EnvironmentSettings.inBatchMode())
+Scala-Flink> val textSource = btenv.fromDataStream(
+  senv.fromElements(
     "To be, or not to be,--that is the question:--",
     "Whether 'tis nobler in the mind to suffer",
     "The slings and arrows of outrageous fortune",
     "Or to take arms against a sea of troubles,"), 
   'text)
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
 Scala-Flink> class $Split extends TableFunction[String] {
     def eval(s: String): Unit = {
       s.toLowerCase.split("\\W+").foreach(collect)
@@ -142,7 +144,7 @@ Scala-Flink> class $Split extends TableFunction[String] {
 Scala-Flink> val split = new $Split
 Scala-Flink> textSource.join(split('text) as 'word).
     groupBy('word).select('word, 'word.count as 'count).
-    toDataSet[(String, Long)].print
+    toDataStream().print
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -155,7 +157,7 @@ The following example is a wordcount program written in SQL:
 {{< tabs "3b210000-4585-497a-8636-c7583d10ff42" >}}
 {{< tab "stream" >}}
 ```scala
-Scala-Flink> import org.apache.flink.table.functions.TableFunction
+Scala-Flink> val stenv = StreamTableEnvironment.create(senv)
 Scala-Flink> val textSource = stenv.fromDataStream(
   senv.fromElements(
     "To be, or not to be,--that is the question:--",
@@ -164,44 +166,46 @@ Scala-Flink> val textSource = stenv.fromDataStream(
     "Or to take arms against a sea of troubles,"), 
   'text)
 Scala-Flink> stenv.createTemporaryView("text_source", textSource)
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
 Scala-Flink> class $Split extends TableFunction[String] {
     def eval(s: String): Unit = {
       s.toLowerCase.split("\\W+").foreach(collect)
     }
   }
-Scala-Flink> stenv.registerFunction("split", new $Split)
+Scala-Flink> stenv.createTemporarySystemFunction("split", new $Split)
 Scala-Flink> val result = stenv.sqlQuery("""SELECT T.word, count(T.word) AS 
`count` 
     FROM text_source 
     JOIN LATERAL table(split(text)) AS T(word) 
     ON TRUE 
     GROUP BY T.word""")
-Scala-Flink> result.toRetractStream[(String, Long)].print
+Scala-Flink> result.toChangelogStream().print
 Scala-Flink> senv.execute("SQL Wordcount")
 ```
 {{< /tab >}}
 {{< tab "batch" >}}
 ```scala
-Scala-Flink> import org.apache.flink.table.functions.TableFunction
-Scala-Flink> val textSource = btenv.fromDataSet(
-  benv.fromElements(
+Scala-Flink> val btenv = StreamTableEnvironment.create(senv, 
EnvironmentSettings.inBatchMode())
+Scala-Flink> val textSource = btenv.fromDataStream(
+  senv.fromElements(
     "To be, or not to be,--that is the question:--",
     "Whether 'tis nobler in the mind to suffer",
     "The slings and arrows of outrageous fortune",
     "Or to take arms against a sea of troubles,"), 
   'text)
 Scala-Flink> btenv.createTemporaryView("text_source", textSource)
+Scala-Flink> import org.apache.flink.table.functions.TableFunction
 Scala-Flink> class $Split extends TableFunction[String] {
     def eval(s: String): Unit = {
       s.toLowerCase.split("\\W+").foreach(collect)
     }
   }
-Scala-Flink> btenv.registerFunction("split", new $Split)
+Scala-Flink> btenv.createTemporarySystemFunction("split", new $Split)
 Scala-Flink> val result = btenv.sqlQuery("""SELECT T.word, count(T.word) AS 
`count` 
     FROM text_source 
     JOIN LATERAL table(split(text)) AS T(word) 
     ON TRUE 
     GROUP BY T.word""")
-Scala-Flink> result.toDataSet[(String, Long)].print
+Scala-Flink> result.toDataStream().print
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
index 2991a2d..7e4bd4c 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
@@ -826,7 +826,12 @@ public interface StreamTableEnvironment extends 
TableEnvironment {
      *     of the {@code Table}.
      * @param <T> The type of the {@link DataStream}.
      * @return The converted {@link Table}.
+     * @deprecated Use {@link #fromDataStream(DataStream, Schema)} instead. In 
most cases, {@link
+     *     #fromDataStream(DataStream)} should already be sufficient. It 
integrates with the new
+     *     type system and supports all kinds of {@link DataTypes} that the 
table runtime can
+     *     consume. The semantics might be slightly different for raw and 
structured types.
      */
+    @Deprecated
     <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);
 
     /**
@@ -1010,7 +1015,13 @@ public interface StreamTableEnvironment extends 
TableEnvironment {
      * @param fields The fields expressions to map original fields of the 
DataStream to the fields
      *     of the View.
      * @param <T> The type of the {@link DataStream}.
+     * @deprecated Use {@link #createTemporaryView(String, DataStream, 
Schema)} instead. In most
+     *     cases, {@link #createTemporaryView(String, DataStream)} should 
already be sufficient. It
+     *     integrates with the new type system and supports all kinds of 
{@link DataTypes} that the
+     *     table runtime can consume. The semantics might be slightly 
different for raw and
+     *     structured types.
      */
+    @Deprecated
     <T> void createTemporaryView(String path, DataStream<T> dataStream, 
Expression... fields);
 
     /**
@@ -1031,7 +1042,13 @@ public interface StreamTableEnvironment extends 
TableEnvironment {
      * @param clazz The class of the type of the resulting {@link DataStream}.
      * @param <T> The type of the resulting {@link DataStream}.
      * @return The converted {@link DataStream}.
+     * @deprecated Use {@link #toDataStream(Table, Class)} instead. It 
integrates with the new type
+     *     system and supports all kinds of {@link DataTypes} that the table 
runtime can produce.
+     *     The semantics might be slightly different for raw and structured 
types. Use {@code
+     *     toDataStream(DataTypes.of(TypeInformation.of(Class)))} if {@link 
TypeInformation} should
+     *     be used as source of truth.
      */
+    @Deprecated
     <T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
 
     /**
@@ -1053,7 +1070,13 @@ public interface StreamTableEnvironment extends 
TableEnvironment {
      *     DataStream}.
      * @param <T> The type of the resulting {@link DataStream}.
      * @return The converted {@link DataStream}.
+     * @deprecated Use {@link #toDataStream(Table, Class)} instead. It 
integrates with the new type
+     *     system and supports all kinds of {@link DataTypes} that the table 
runtime can produce.
+     *     The semantics might be slightly different for raw and structured 
types. Use {@code
+     *     toDataStream(DataTypes.of(TypeInformation.of(Class)))} if {@link 
TypeInformation} should
+     *     be used as source of truth.
      */
+    @Deprecated
     <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);
 
     /**
@@ -1076,7 +1099,11 @@ public interface StreamTableEnvironment extends 
TableEnvironment {
      * @param clazz The class of the requested record type.
      * @param <T> The type of the requested record type.
      * @return The converted {@link DataStream}.
+     * @deprecated Use {@link #toChangelogStream(Table, Schema)} instead. It 
integrates with the new
+     *     type system and supports all kinds of {@link DataTypes} and every 
{@link ChangelogMode}
+     *     that the table runtime can produce.
      */
+    @Deprecated
     <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> 
clazz);
 
     /**
@@ -1099,7 +1126,11 @@ public interface StreamTableEnvironment extends 
TableEnvironment {
      * @param typeInfo The {@link TypeInformation} of the requested record 
type.
      * @param <T> The type of the requested record type.
      * @return The converted {@link DataStream}.
+     * @deprecated Use {@link #toChangelogStream(Table, Schema)} instead. It 
integrates with the new
+     *     type system and supports all kinds of {@link DataTypes} and every 
{@link ChangelogMode}
+     *     that the table runtime can produce.
      */
+    @Deprecated
     <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, 
TypeInformation<T> typeInfo);
 
     /**
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
index 5123d66..8ccad40 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
@@ -619,7 +619,7 @@ trait StreamTableEnvironment extends TableEnvironment {
     *   )
     * }}}
     *
-    * <p>2. Reference input fields by position:
+    * 2. Reference input fields by position:
     * In this mode, fields are simply renamed. Event-time attributes can
     * replace the field on their position in the input data (if it is of 
correct type) or be
     * appended at the end. Proctime attributes must be appended at the end. 
This mode can only be
@@ -644,7 +644,13 @@ trait StreamTableEnvironment extends TableEnvironment {
     *               the [[Table]].
     * @tparam T The type of the [[DataStream]].
     * @return The converted [[Table]].
+    * @deprecated Use [[fromDataStream(DataStream, Schema)]] instead. In most 
cases,
+    *             [[fromDataStream(DataStream)]] should already be sufficient. 
It integrates with
+    *             the new type system and supports all kinds of [[DataTypes]] 
that the table runtime
+    *             can consume. The semantics might be slightly different for 
raw and structured
+    *             types.
     */
+  @deprecated
   def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table
 
   /**
@@ -790,7 +796,13 @@ trait StreamTableEnvironment extends TableEnvironment {
     * @param fields The fields expressions to map original fields of the 
DataStream to the fields of
     *               the View.
     * @tparam T The type of the [[DataStream]].
+    * @deprecated Use [[createTemporaryView(String, DataStream, Schema)]] 
instead. In most cases,
+    *             [[createTemporaryView(String, DataStream)]] should already 
be sufficient. It
+    *             integrates with the new type system and supports all kinds 
of [[DataTypes]] that
+    *             the table runtime can consume. The semantics might be 
slightly different for raw
+    *             and structured types.
     */
+  @deprecated
   def createTemporaryView[T](path: String, dataStream: DataStream[T], fields: 
Expression*): Unit
 
   /**
@@ -807,7 +819,13 @@ trait StreamTableEnvironment extends TableEnvironment {
     * @param table The [[Table]] to convert.
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
+    * @deprecated Use [[toDataStream(Table, Class)]] instead. It integrates 
with the new type
+    *             system and supports all kinds of [[DataTypes]] that the 
table runtime can produce.
+    *             The semantics might be slightly different for raw and 
structured types. Use
+    *             `toDataStream(DataTypes.of(Types.of[Class]))` if 
[[TypeInformation]]
+    *             should be used as source of truth.
     */
+  @deprecated
   def toAppendStream[T: TypeInformation](table: Table): DataStream[T]
 
   /**
@@ -820,7 +838,11 @@ trait StreamTableEnvironment extends TableEnvironment {
     * @param table The [[Table]] to convert.
     * @tparam T The type of the requested data type.
     * @return The converted [[DataStream]].
+    * @deprecated Use [[toChangelogStream(Table, Schema)]] instead. It 
integrates with the new
+    *             type system and supports all kinds of [[DataTypes]] and 
every [[ChangelogMode]]
+    *             that the table runtime can produce.
     */
+  @deprecated
   def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, 
T)]
 
   /**

Reply via email to