This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a8864976c479520a20b32a2ff61d67a5c4dde4f7 Author: Dian Fu <dia...@apache.org> AuthorDate: Fri Apr 23 13:04:25 2021 +0800 [FLINK-20720][docs][python] Add documentation about output types for Python DataStream API This closes #15733. --- .../docs/dev/datastream/operators/overview.md | 4 +- .../dev/python/datastream/operators/overview.md | 119 +++++++++++++++++---- .../docs/dev/datastream/operators/overview.md | 2 +- .../dev/python/datastream/operators/overview.md | 119 +++++++++++++++++---- 4 files changed, 197 insertions(+), 47 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/operators/overview.md b/docs/content.zh/docs/dev/datastream/operators/overview.md index 9c4ac57..c26ac98 100644 --- a/docs/content.zh/docs/dev/datastream/operators/overview.md +++ b/docs/content.zh/docs/dev/datastream/operators/overview.md @@ -3,7 +3,7 @@ title: 概览 weight: 1 type: docs aliases: - - /dev/stream/operators/ + - /zh/dev/stream/operators/ --- <!-- Licensed to the Apache Software Foundation (ASF) under one @@ -182,7 +182,7 @@ keyedStream.reduce { _ + _ } {{< /tab >}} {{< tab "Python" >}} ```python -data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.ROW([Types.INT(), Types.STRING()])) +data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.TUPLE([Types.INT(), Types.STRING()])) data_stream.key_by(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], b[1])) ``` {{< /tab >}} diff --git a/docs/content.zh/docs/dev/python/datastream/operators/overview.md b/docs/content.zh/docs/dev/python/datastream/operators/overview.md index 8e1ac66..ed61d77 100644 --- a/docs/content.zh/docs/dev/python/datastream/operators/overview.md +++ b/docs/content.zh/docs/dev/python/datastream/operators/overview.md @@ -26,28 +26,24 @@ under the License. # Operators - Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated dataflow topologies. - - -# DataStream Transformations +## DataStream Transformations DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., mapping, filtering, reducing). Please see [operators]({{< ref "docs/dev/datastream/operators/overview" >}}) -for an overview of the available stream transformations in Python DataStream API. +for an overview of the available transformations in Python DataStream API. -# Functions -Most transformations require a user-defined function as input to define the functionality of the transformation. The -following describes different ways of defining user-defined functions. +## Functions +Transformations accept user-defined functions as input to define the functionality of the transformations. +The following section describes different ways of defining Python user-defined functions in Python DataStream API. -## Implementing Function Interfaces +### Implementing Function Interfaces Different Function interfaces are provided for different transformations in the Python DataStream API. For example, `MapFunction` is provided for the `map` transformation, `FilterFunction` is provided for the `filter` transformation, etc. Users can implement the corresponding Function interface according to the type of the transformation. Take MapFunction for -instance: - +instance: ```python # Implementing MapFunction @@ -60,24 +56,19 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT()) mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT()) ``` - -<span class="label label-info">Note</span> In Python DataStream API, users can specify the output type information of the transformation explicityly. If not -specified, the output type will be `Types.PICKLED_BYTE_ARRAY` so that data will be in a form of byte array generated by -the pickle seriallizer. For more details about the `Pickle Serialization`, please refer to [DataTypes]({{< ref "docs/dev/python/datastream/data_types" >}}#pickle-serialization). - -## Lambda Function -As shown in the following example, all the transformations can also accept a lambda function to define the functionality of the transformation: +### Lambda Function +As shown in the following example, the transformations can also accept a lambda function to define the functionality of the transformation: ```python data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT()) mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT()) ``` -<span class="label label-info">Note</span> Operations ConnectedStream.map() and ConnectedStream.flat_map() do not support -lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` seperately. +<span class="label label-info">Note</span> `ConnectedStream.map()` and `ConnectedStream.flat_map()` do not support +lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` separately. -## Python Function -Users can also use Python function: +### Python Function +Users could also use Python function to define the functionality of the transformation: ```python def my_map_func(value): @@ -87,3 +78,87 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT()) mapped_stream = data_stream.map(my_map_func, output_type=Types.INT()) ``` +## Output Type + +Users could specify the output type information of the transformation explicitly in Python DataStream API. If not +specified, the output type will be `Types.PICKLED_BYTE_ARRAY` by default, and the result data will be serialized using pickle serializer. +For more details about the pickle serializer, please refer to [Pickle Serialization]({{< ref "docs/dev/python/datastream/data_types" >}}#pickle-serialization). + +Generally, the output type needs to be specified in the following scenarios. + +### Convert DataStream into Table + +```python +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment + + +def data_stream_api_demo(): + env = StreamExecutionEnvironment.get_execution_environment() + t_env = StreamTableEnvironment.create(stream_execution_environment=env) + + t_env.execute_sql(""" + CREATE TABLE my_source ( + a INT, + b VARCHAR + ) WITH ( + 'connector' = 'datagen', + 'number-of-rows' = '10' + ) + """) + + ds = t_env.to_append_stream( + t_env.from_path('my_source'), + Types.ROW([Types.INT(), Types.STRING()])) + + def split(s): + splits = s[1].split("|") + for sp in splits: + yield s[0], sp + + ds = ds.map(lambda i: (i[0] + 1, i[1])) \ + .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \ + .key_by(lambda i: i[1]) \ + .reduce(lambda i, j: (i[0] + j[0], i[1])) + + t_env.execute_sql(""" + CREATE TABLE my_sink ( + a INT, + b VARCHAR + ) WITH ( + 'connector' = 'print' + ) + """) + + table = t_env.from_data_stream(ds) + table_result = table.execute_insert("my_sink") + + # 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出 + # 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法 + table_result.wait() + + +if __name__ == '__main__': + data_stream_api_demo() +``` + +The output type must be specified for the flat_map operation in the above example which will be used as +the output type of the reduce operation implicitly. The reason is that +`t_env.from_data_stream(ds)` requires the output type of `ds` must be a composite type. + +### Write DataStream to Sink + +```python +from pyflink.common.typeinfo import Types + +def split(s): + splits = s[1].split("|") + for sp in splits: + yield s[0], sp + +ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])) \ + .sink_to(...) +``` + +Generally, the output type needs to be specified for the map operation in the above example if the sink only accepts special kinds of data, e.g. Row, etc. diff --git a/docs/content/docs/dev/datastream/operators/overview.md b/docs/content/docs/dev/datastream/operators/overview.md index c41bd3b..93e32b8 100644 --- a/docs/content/docs/dev/datastream/operators/overview.md +++ b/docs/content/docs/dev/datastream/operators/overview.md @@ -184,7 +184,7 @@ keyedStream.reduce { _ + _ } {{< /tab >}} {{< tab "Python" >}} ```python -data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.ROW([Types.INT(), Types.STRING()])) +data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.TUPLE([Types.INT(), Types.STRING()])) data_stream.key_by(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], b[1])) ``` {{< /tab >}} diff --git a/docs/content/docs/dev/python/datastream/operators/overview.md b/docs/content/docs/dev/python/datastream/operators/overview.md index 30cd642..b072f80 100644 --- a/docs/content/docs/dev/python/datastream/operators/overview.md +++ b/docs/content/docs/dev/python/datastream/operators/overview.md @@ -26,28 +26,24 @@ under the License. # Operators - Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated dataflow topologies. - - -# DataStream Transformations +## DataStream Transformations DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., mapping, filtering, reducing). Please see [operators]({{< ref "docs/dev/datastream/operators/overview" >}}) -for an overview of the available stream transformations in Python DataStream API. +for an overview of the available transformations in Python DataStream API. -# Functions -Most transformations require a user-defined function as input to define the functionality of the transformation. The -following describes different ways of defining user-defined functions. +## Functions +Transformations accept user-defined functions as input to define the functionality of the transformations. +The following section describes different ways of defining Python user-defined functions in Python DataStream API. -## Implementing Function Interfaces +### Implementing Function Interfaces Different Function interfaces are provided for different transformations in the Python DataStream API. For example, `MapFunction` is provided for the `map` transformation, `FilterFunction` is provided for the `filter` transformation, etc. Users can implement the corresponding Function interface according to the type of the transformation. Take MapFunction for -instance: - +instance: ```python # Implementing MapFunction @@ -60,24 +56,19 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT()) mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT()) ``` - -<span class="label label-info">Note</span> In Python DataStream API, users can specify the output type information of the transformation explicityly. If not -specified, the output type will be `Types.PICKLED_BYTE_ARRAY` so that data will be in a form of byte array generated by -the pickle seriallizer. For more details about the `Pickle Serialization`, please refer to [DataTypes]({{< ref "docs/dev/python/datastream/data_types" >}}#pickle-serialization). - -## Lambda Function -As shown in the following example, all the transformations can also accept a lambda function to define the functionality of the transformation: +### Lambda Function +As shown in the following example, the transformations can also accept a lambda function to define the functionality of the transformation: ```python data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT()) mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT()) ``` -<span class="label label-info">Note</span> Operations ConnectedStream.map() and ConnectedStream.flat_map() do not support -lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` seperately. +<span class="label label-info">Note</span> `ConnectedStream.map()` and `ConnectedStream.flat_map()` do not support +lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` separately. -## Python Function -Users can also use Python function: +### Python Function +Users could also use Python function to define the functionality of the transformation: ```python def my_map_func(value): @@ -87,3 +78,87 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT()) mapped_stream = data_stream.map(my_map_func, output_type=Types.INT()) ``` +## Output Type + +Users could specify the output type information of the transformation explicitly in Python DataStream API. If not +specified, the output type will be `Types.PICKLED_BYTE_ARRAY` by default, and the result data will be serialized using pickle serializer. +For more details about the pickle serializer, please refer to [Pickle Serialization]({{< ref "docs/dev/python/datastream/data_types" >}}#pickle-serialization). + +Generally, the output type needs to be specified in the following scenarios. + +### Convert DataStream into Table + +```python +from pyflink.common.typeinfo import Types +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment + + +def data_stream_api_demo(): + env = StreamExecutionEnvironment.get_execution_environment() + t_env = StreamTableEnvironment.create(stream_execution_environment=env) + + t_env.execute_sql(""" + CREATE TABLE my_source ( + a INT, + b VARCHAR + ) WITH ( + 'connector' = 'datagen', + 'number-of-rows' = '10' + ) + """) + + ds = t_env.to_append_stream( + t_env.from_path('my_source'), + Types.ROW([Types.INT(), Types.STRING()])) + + def split(s): + splits = s[1].split("|") + for sp in splits: + yield s[0], sp + + ds = ds.map(lambda i: (i[0] + 1, i[1])) \ + .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \ + .key_by(lambda i: i[1]) \ + .reduce(lambda i, j: (i[0] + j[0], i[1])) + + t_env.execute_sql(""" + CREATE TABLE my_sink ( + a INT, + b VARCHAR + ) WITH ( + 'connector' = 'print' + ) + """) + + table = t_env.from_data_stream(ds) + table_result = table.execute_insert("my_sink") + + # 1)wait for job finishes and only used in local execution, otherwise, it may happen that the script exits with the job is still running + # 2)should be removed when submitting the job to a remote cluster such as YARN, standalone, K8s etc in detach mode + table_result.wait() + + +if __name__ == '__main__': + data_stream_api_demo() +``` + +The output type must be specified for the flat_map operation in the above example which will be used as +the output type of the reduce operation implicitly. The reason is that +`t_env.from_data_stream(ds)` requires the output type of `ds` must be a composite type. + +### Write DataStream to Sink + +```python +from pyflink.common.typeinfo import Types + +def split(s): + splits = s[1].split("|") + for sp in splits: + yield s[0], sp + +ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])) \ + .sink_to(...) +``` + +Generally, the output type needs to be specified for the map operation in the above example if the sink only accepts special kinds of data, e.g. Row, etc.