This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8864976c479520a20b32a2ff61d67a5c4dde4f7
Author: Dian Fu <dia...@apache.org>
AuthorDate: Fri Apr 23 13:04:25 2021 +0800

    [FLINK-20720][docs][python] Add documentation about output types for Python 
DataStream API
    
    This closes #15733.
---
 .../docs/dev/datastream/operators/overview.md      |   4 +-
 .../dev/python/datastream/operators/overview.md    | 119 +++++++++++++++++----
 .../docs/dev/datastream/operators/overview.md      |   2 +-
 .../dev/python/datastream/operators/overview.md    | 119 +++++++++++++++++----
 4 files changed, 197 insertions(+), 47 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/operators/overview.md 
b/docs/content.zh/docs/dev/datastream/operators/overview.md
index 9c4ac57..c26ac98 100644
--- a/docs/content.zh/docs/dev/datastream/operators/overview.md
+++ b/docs/content.zh/docs/dev/datastream/operators/overview.md
@@ -3,7 +3,7 @@ title: 概览
 weight: 1
 type: docs
 aliases:
-  - /dev/stream/operators/
+  - /zh/dev/stream/operators/
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -182,7 +182,7 @@ keyedStream.reduce { _ + _ }
 {{< /tab >}}
 {{< tab "Python" >}}
 ```python
-data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), 
(4, 'b')], type_info=Types.ROW([Types.INT(), Types.STRING()]))
+data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), 
(4, 'b')], type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
 data_stream.key_by(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], b[1]))
 ```
 {{< /tab >}}
diff --git a/docs/content.zh/docs/dev/python/datastream/operators/overview.md 
b/docs/content.zh/docs/dev/python/datastream/operators/overview.md
index 8e1ac66..ed61d77 100644
--- a/docs/content.zh/docs/dev/python/datastream/operators/overview.md
+++ b/docs/content.zh/docs/dev/python/datastream/operators/overview.md
@@ -26,28 +26,24 @@ under the License.
 
 # Operators
 
-
 Operators transform one or more DataStreams into a new DataStream. Programs 
can combine multiple transformations into 
 sophisticated dataflow topologies.
 
-
-
-# DataStream Transformations
+## DataStream Transformations
 
 DataStream programs in Flink are regular programs that implement 
transformations on data streams (e.g., mapping, 
 filtering, reducing). Please see [operators]({{< ref 
"docs/dev/datastream/operators/overview" >}})
-for an overview of the available stream transformations in Python DataStream 
API.
+for an overview of the available transformations in Python DataStream API.
 
-# Functions
-Most transformations require a user-defined function as input to define the 
functionality of the transformation. The 
-following describes different ways of defining user-defined functions.
+## Functions
+Transformations accept user-defined functions as input to define the 
functionality of the transformations.
+The following section describes different ways of defining Python user-defined 
functions in Python DataStream API.
 
-## Implementing Function Interfaces
+### Implementing Function Interfaces
 Different Function interfaces are provided for different transformations in 
the Python DataStream API. For example, 
 `MapFunction` is provided for the `map` transformation, `FilterFunction` is 
provided for the `filter` transformation, etc.
 Users can implement the corresponding Function interface according to the type 
of the transformation. Take MapFunction for
-instance: 
-
+instance:
 
 ```python
 # Implementing MapFunction
@@ -60,24 +56,19 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], 
type_info=Types.INT())
 mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
 ```
 
-
-<span class="label label-info">Note</span> In Python DataStream API, users can 
specify the output type information of the transformation explicityly. If not 
-specified, the output type will be `Types.PICKLED_BYTE_ARRAY` so that data 
will be in a form of byte array generated by 
-the pickle seriallizer. For more details about the `Pickle Serialization`, 
please refer to [DataTypes]({{< ref "docs/dev/python/datastream/data_types" 
>}}#pickle-serialization).
-
-## Lambda Function
-As shown in the following example, all the transformations can also accept a 
lambda function to define the functionality of the transformation:
+### Lambda Function
+As shown in the following example, the transformations can also accept a 
lambda function to define the functionality of the transformation:
 
 ```python
 data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
 mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())
 ```
 
-<span class="label label-info">Note</span> Operations ConnectedStream.map() 
and ConnectedStream.flat_map() do not support
-lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` 
seperately.
+<span class="label label-info">Note</span> `ConnectedStream.map()` and 
`ConnectedStream.flat_map()` do not support
+lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` 
separately.
 
-## Python Function
-Users can also use Python function:
+### Python Function
+Users could also use Python function to define the functionality of the 
transformation:
 
 ```python
 def my_map_func(value):
@@ -87,3 +78,87 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], 
type_info=Types.INT())
 mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())
 ```
 
+## Output Type
+
+Users could specify the output type information of the transformation 
explicitly in Python DataStream API. If not
+specified, the output type will be `Types.PICKLED_BYTE_ARRAY` by default, and 
the result data will be serialized using pickle serializer.
+For more details about the pickle serializer, please refer to [Pickle 
Serialization]({{< ref "docs/dev/python/datastream/data_types" 
>}}#pickle-serialization).
+
+Generally, the output type needs to be specified in the following scenarios.
+
+### Convert DataStream into Table
+
+```python
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment
+
+
+def data_stream_api_demo():
+    env = StreamExecutionEnvironment.get_execution_environment()
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+    t_env.execute_sql("""
+            CREATE TABLE my_source (
+              a INT,
+              b VARCHAR
+            ) WITH (
+              'connector' = 'datagen',
+              'number-of-rows' = '10'
+            )
+        """)
+
+    ds = t_env.to_append_stream(
+        t_env.from_path('my_source'),
+        Types.ROW([Types.INT(), Types.STRING()]))
+
+    def split(s):
+        splits = s[1].split("|")
+        for sp in splits:
+            yield s[0], sp
+
+    ds = ds.map(lambda i: (i[0] + 1, i[1])) \
+           .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
+           .key_by(lambda i: i[1]) \
+           .reduce(lambda i, j: (i[0] + j[0], i[1]))
+
+    t_env.execute_sql("""
+            CREATE TABLE my_sink (
+              a INT,
+              b VARCHAR
+            ) WITH (
+              'connector' = 'print'
+            )
+        """)
+
+    table = t_env.from_data_stream(ds)
+    table_result = table.execute_insert("my_sink")
+
+    # 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出
+    # 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法
+    table_result.wait()
+
+
+if __name__ == '__main__':
+    data_stream_api_demo()
+```
+
+The output type must be specified for the flat_map operation in the above 
example which will be used as
+the output type of the reduce operation implicitly. The reason is that
+`t_env.from_data_stream(ds)` requires the output type of `ds` must be a 
composite type.
+
+### Write DataStream to Sink
+
+```python
+from pyflink.common.typeinfo import Types
+
+def split(s):
+    splits = s[1].split("|")
+    for sp in splits:
+        yield s[0], sp
+
+ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])) 
\
+  .sink_to(...)
+```
+
+Generally, the output type needs to be specified for the map operation in the 
above example if the sink only accepts special kinds of data, e.g. Row, etc.
diff --git a/docs/content/docs/dev/datastream/operators/overview.md 
b/docs/content/docs/dev/datastream/operators/overview.md
index c41bd3b..93e32b8 100644
--- a/docs/content/docs/dev/datastream/operators/overview.md
+++ b/docs/content/docs/dev/datastream/operators/overview.md
@@ -184,7 +184,7 @@ keyedStream.reduce { _ + _ }
 {{< /tab >}}
 {{< tab "Python" >}}
 ```python
-data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), 
(4, 'b')], type_info=Types.ROW([Types.INT(), Types.STRING()]))
+data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), 
(4, 'b')], type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
 data_stream.key_by(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], b[1]))
 ```
 {{< /tab >}}
diff --git a/docs/content/docs/dev/python/datastream/operators/overview.md 
b/docs/content/docs/dev/python/datastream/operators/overview.md
index 30cd642..b072f80 100644
--- a/docs/content/docs/dev/python/datastream/operators/overview.md
+++ b/docs/content/docs/dev/python/datastream/operators/overview.md
@@ -26,28 +26,24 @@ under the License.
 
 # Operators
 
-
 Operators transform one or more DataStreams into a new DataStream. Programs 
can combine multiple transformations into 
 sophisticated dataflow topologies.
 
-
-
-# DataStream Transformations
+## DataStream Transformations
 
 DataStream programs in Flink are regular programs that implement 
transformations on data streams (e.g., mapping, 
 filtering, reducing). Please see [operators]({{< ref 
"docs/dev/datastream/operators/overview" >}})
-for an overview of the available stream transformations in Python DataStream 
API.
+for an overview of the available transformations in Python DataStream API.
 
-# Functions
-Most transformations require a user-defined function as input to define the 
functionality of the transformation. The 
-following describes different ways of defining user-defined functions.
+## Functions
+Transformations accept user-defined functions as input to define the 
functionality of the transformations.
+The following section describes different ways of defining Python user-defined 
functions in Python DataStream API.
 
-## Implementing Function Interfaces
+### Implementing Function Interfaces
 Different Function interfaces are provided for different transformations in 
the Python DataStream API. For example, 
 `MapFunction` is provided for the `map` transformation, `FilterFunction` is 
provided for the `filter` transformation, etc.
 Users can implement the corresponding Function interface according to the type 
of the transformation. Take MapFunction for
-instance: 
-
+instance:
 
 ```python
 # Implementing MapFunction
@@ -60,24 +56,19 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], 
type_info=Types.INT())
 mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
 ```
 
-
-<span class="label label-info">Note</span> In Python DataStream API, users can 
specify the output type information of the transformation explicityly. If not 
-specified, the output type will be `Types.PICKLED_BYTE_ARRAY` so that data 
will be in a form of byte array generated by 
-the pickle seriallizer. For more details about the `Pickle Serialization`, 
please refer to [DataTypes]({{< ref "docs/dev/python/datastream/data_types" 
>}}#pickle-serialization).
-
-## Lambda Function
-As shown in the following example, all the transformations can also accept a 
lambda function to define the functionality of the transformation:
+### Lambda Function
+As shown in the following example, the transformations can also accept a 
lambda function to define the functionality of the transformation:
 
 ```python
 data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
 mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())
 ```
 
-<span class="label label-info">Note</span> Operations ConnectedStream.map() 
and ConnectedStream.flat_map() do not support
-lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` 
seperately.
+<span class="label label-info">Note</span> `ConnectedStream.map()` and 
`ConnectedStream.flat_map()` do not support
+lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` 
separately.
 
-## Python Function
-Users can also use Python function:
+### Python Function
+Users could also use Python function to define the functionality of the 
transformation:
 
 ```python
 def my_map_func(value):
@@ -87,3 +78,87 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], 
type_info=Types.INT())
 mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())
 ```
 
+## Output Type
+
+Users could specify the output type information of the transformation 
explicitly in Python DataStream API. If not
+specified, the output type will be `Types.PICKLED_BYTE_ARRAY` by default, and 
the result data will be serialized using pickle serializer.
+For more details about the pickle serializer, please refer to [Pickle 
Serialization]({{< ref "docs/dev/python/datastream/data_types" 
>}}#pickle-serialization).
+
+Generally, the output type needs to be specified in the following scenarios.
+
+### Convert DataStream into Table
+
+```python
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment
+
+
+def data_stream_api_demo():
+    env = StreamExecutionEnvironment.get_execution_environment()
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+    t_env.execute_sql("""
+            CREATE TABLE my_source (
+              a INT,
+              b VARCHAR
+            ) WITH (
+              'connector' = 'datagen',
+              'number-of-rows' = '10'
+            )
+        """)
+
+    ds = t_env.to_append_stream(
+        t_env.from_path('my_source'),
+        Types.ROW([Types.INT(), Types.STRING()]))
+
+    def split(s):
+        splits = s[1].split("|")
+        for sp in splits:
+            yield s[0], sp
+
+    ds = ds.map(lambda i: (i[0] + 1, i[1])) \
+           .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
+           .key_by(lambda i: i[1]) \
+           .reduce(lambda i, j: (i[0] + j[0], i[1]))
+
+    t_env.execute_sql("""
+            CREATE TABLE my_sink (
+              a INT,
+              b VARCHAR
+            ) WITH (
+              'connector' = 'print'
+            )
+        """)
+
+    table = t_env.from_data_stream(ds)
+    table_result = table.execute_insert("my_sink")
+
+    # 1)wait for job finishes and only used in local execution, otherwise, it 
may happen that the script exits with the job is still running
+    # 2)should be removed when submitting the job to a remote cluster such as 
YARN, standalone, K8s etc in detach mode
+    table_result.wait()
+
+
+if __name__ == '__main__':
+    data_stream_api_demo()
+```
+
+The output type must be specified for the flat_map operation in the above 
example which will be used as
+the output type of the reduce operation implicitly. The reason is that
+`t_env.from_data_stream(ds)` requires the output type of `ds` must be a 
composite type.
+
+### Write DataStream to Sink
+
+```python
+from pyflink.common.typeinfo import Types
+
+def split(s):
+    splits = s[1].split("|")
+    for sp in splits:
+        yield s[0], sp
+
+ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])) 
\
+  .sink_to(...)
+```
+
+Generally, the output type needs to be specified for the map operation in the 
above example if the sink only accepts special kinds of data, e.g. Row, etc.

Reply via email to