This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 853af01325e [python][docs] Update the documentation of introduction of table api 853af01325e is described below commit 853af01325e9b430598fad13debd9bd0c2aa2af2 Author: Dian Fu <dia...@apache.org> AuthorDate: Thu Apr 28 20:08:20 2022 +0800 [python][docs] Update the documentation of introduction of table api --- .../docs/dev/python/table/intro_to_table_api.md | 65 +++++++++++++++----- .../docs/dev/python/table/table_environment.md | 26 ++++---- docs/content.zh/docs/dev/table/data_stream_api.md | 2 + .../docs/dev/python/table/intro_to_table_api.md | 69 ++++++++++++++++------ .../docs/dev/python/table/table_environment.md | 26 ++++---- 5 files changed, 133 insertions(+), 55 deletions(-) diff --git a/docs/content.zh/docs/dev/python/table/intro_to_table_api.md b/docs/content.zh/docs/dev/python/table/intro_to_table_api.md index e9fc93aca1b..dd08f67402b 100644 --- a/docs/content.zh/docs/dev/python/table/intro_to_table_api.md +++ b/docs/content.zh/docs/dev/python/table/intro_to_table_api.md @@ -102,20 +102,23 @@ table_env = TableEnvironment.create(env_settings) `TableEnvironment` 可以用来: -* 创建 `Table` -* 将 `Table` 注册成临时表 -* 执行 SQL 查询,更多细节可查阅 [SQL]({{< ref "docs/dev/table/sql/overview" >}}) -* 注册用户自定义的 (标量,表值,或者聚合) 函数, 更多细节可查阅 [普通的用户自定义函数]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}) 和 [向量化的用户自定义函数]({{< ref "docs/dev/python/table/udfs/vectorized_python_udfs" >}}) -* 配置作业,更多细节可查阅 [Python 配置]({{< ref "docs/dev/python/python_config" >}}) -* 管理 Python 依赖,更多细节可查阅 [依赖管理]({{< ref "docs/dev/python/dependency_management" >}}) -* 提交作业执行 +* `Table` 管理:[创建表](#create-tables)、列举表、[Table 和 DataStream 互转]({{< ref "docs/dev/table/data_stream_api" >}}#converting-between-datastream-and-table)等。 +* 自定义函数管理:自定义函数的注册、删除、列举等。 关于 Python 自定义函数的更多细节,请参考[普通自定义函数]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}) 和[向量化自定义函数]({{< ref "docs/dev/python/table/udfs/vectorized_python_udfs" >}})章节的介绍。 +* 执行 [SQL]({{< ref "docs/dev/table/sql/overview" >}}) 语句:更多细节可查阅[SQL 查询](#write-sql-queries)章节的介绍。 +* 作业配置管理:更多细节可查阅[Python 配置]({{< ref "docs/dev/python/python_config" >}})章节的介绍。 +* Python 依赖管理:更多细节可查阅[依赖管理]({{< ref "docs/dev/python/dependency_management" >}})章节的介绍。 +* 作业提交:更多细节可查阅[作业提交](#emit-results)章节的介绍。 {{< top >}} +<a name="create-tables"></a> + 创建表 --------------- -`Table` 是 Python Table API 的核心组件。`Table` 是 Table API 作业中间结果的逻辑表示。 +`Table` 是 Python Table API 的核心组件。`Table` 对象由一系列数据转换操作构成,但是它不包含数据本身。 +相反,它描述了如何从数据源中读取数据,以及如何将最终结果写出到外部存储等。表可以被打印、优化并最终在集群中执行。 +表也可以是有限流或无限流,以支持流式处理和批处理场景。 一个 `Table` 实例总是与一个特定的 `TableEnvironment` 相绑定。不支持在同一个查询中合并来自不同 TableEnvironments 的表,例如 join 或者 union 它们。 @@ -163,7 +166,7 @@ table.execute().print() +----------------------+--------------------------------+ ``` -默认情况下,表结构是从数据中自动提取的。 如果自动生成的表模式不符合你的要求,你也可以手动指定: +默认情况下,表结构是从数据中自动提取的。 如果自动生成的表模式不符合你的预期,你也可以手动指定: ```python table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) @@ -187,7 +190,7 @@ Now the type of the "id" column is TINYINT. ### 通过 DDL 创建 -你可以通过 DDL 创建一张表: +你可以通过 DDL 语句创建表,它代表一张从指定的外部存储读取数据的表: ```python from pyflink.table import EnvironmentSettings, TableEnvironment @@ -387,6 +390,8 @@ orders.map(map_function).execute().print() +--------------------------------+----------------------+ ``` +<a name="write-sql-queries"></a> + ### SQL 查询 Flink 的 SQL 基于 [Apache Calcite](https://calcite.apache.org),它实现了标准的 SQL。SQL 查询语句使用字符串来表达。 @@ -541,9 +546,39 @@ table.execute().print() {{< top >}} +<a name="emit-results"></a> + 将结果写出 ---------------- +### 打印结果 + +你可以通过 `TableResult.print` 方法,将表的结果打印到标准输出中。该方法通常用于预览表的中间结果。 + +```python +# prepare source tables +source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"]) + +# Get TableResult +table_result = table_env.execute_sql("select a + 1, b, c from %s" % source) + +# Print the table +table_result.print() +``` + +结果为: + +```text ++----+----------------------+--------------------------------+--------------------------------+ +| op | EXPR$0 | b | c | ++----+----------------------+--------------------------------+--------------------------------+ +| +I | 2 | Hi | Hello | +| +I | 3 | Hello | Hello | ++----+----------------------+--------------------------------+--------------------------------+ +``` + +<span class="label label-info">Note</span> 该方式会触发表的物化,同时将表的内容收集到客户端内存中,所以通过 {{< pythondoc file="pyflink.table.html#pyflink.table.Table.limit" name="Table.limit">}} 来限制收集数据的条数是一种很好的做法。 + ### 将结果数据收集到客户端 你可以使用 `TableResult.collect` 将 Table 的结果收集到客户端,结果的类型为迭代器类型。 @@ -555,10 +590,10 @@ table.execute().print() source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"]) # 得到 TableResult -res = table_env.execute_sql("select a + 1, b, c from %s" % source) +table_result = table_env.execute_sql("select a + 1, b, c from %s" % source) # 遍历结果 -with res.collect() as results: +with table_result.collect() as results: for result in results: print(result) ``` @@ -570,6 +605,8 @@ with res.collect() as results: <Row(3, 'Hello', 'Hello')> ``` +<span class="label label-info">Note</span> 该方式会触发表的物化,同时将表的内容收集到客户端内存中,所以通过 {{< pythondoc file="pyflink.table.html#pyflink.table.Table.limit" name="Table.limit">}} 来限制收集数据的条数是一种很好的做法。 + ### 将结果数据转换为Pandas DataFrame,并收集到客户端 你可以调用 "to_pandas" 方法来 [将一个 `Table` 对象转化成 pandas DataFrame]({{< ref "docs/dev/python/table/conversion_of_pandas" >}}#convert-pyflink-table-to-pandas-dataframe): @@ -587,9 +624,9 @@ print(table.to_pandas()) 1 2 Hello ``` -<span class="label label-info">Note</span> "to_pandas" 会触发表的物化,同时将表的内容收集到客户端内存中,所以通过 {{< pythondoc file="pyflink.table.html#pyflink.table.Table.limit" name="Table.limit">}} 来限制收集数据的条数是一种很好的做法。 +<span class="label label-info">Note</span> 该方式会触发表的物化,同时将表的内容收集到客户端内存中,所以通过 {{< pythondoc file="pyflink.table.html#pyflink.table.Table.limit" name="Table.limit">}} 来限制收集数据的条数是一种很好的做法。 -<span class="label label-info">Note</span> flink planner 不支持 "to_pandas",并且,并不是所有的数据类型都可以转换为 pandas DataFrames。 +<span class="label label-info">Note</span> 并不是所有的数据类型都可以转换为 pandas DataFrames。 ### 将结果写入到一张 Sink 表中 diff --git a/docs/content.zh/docs/dev/python/table/table_environment.md b/docs/content.zh/docs/dev/python/table/table_environment.md index 641540b20c1..d16e757a219 100644 --- a/docs/content.zh/docs/dev/python/table/table_environment.md +++ b/docs/content.zh/docs/dev/python/table/table_environment.md @@ -110,17 +110,6 @@ TableEnvironment API {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.from_path" name="链接">}} </td> </tr> - <tr> - <td> - <strong>sql_query(query)</strong> - </td> - <td> - 执行一条 SQL 查询,并将查询的结果作为一个 `Table` 对象。 - </td> - <td class="text-center"> - {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.sql_query" name="链接">}} - </td> - </tr> <tr> <td> <strong>create_temporary_view(view_path, table)</strong> @@ -170,6 +159,17 @@ TableEnvironment API {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.execute_sql" name="链接">}} </td> </tr> + <tr> + <td> + <strong>sql_query(query)</strong> + </td> + <td> + 执行一条 SQL 查询,并将查询的结果作为一个 `Table` 对象。 + </td> + <td class="text-center"> + {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.sql_query" name="链接">}} + </td> + </tr> </tbody> </table> @@ -570,7 +570,9 @@ TableEnvironment API <a href="{{< ref "docs/dev/python/python_config" >}}">Python 配置</a> 中找到所有可用的配置选项。 <br> <br> 下面的代码示例展示了如何通过这个 API 来设置配置选项:<br> # set the parallelism to 8 <br> -table_env.get_config().set("parallelism.default", "8") +table_env.get_config().set("parallelism.default", "8")<br> +# set the job name <br> +table_env.get_config().set("pipeline.name", "my_first_job") </td> <td class="text-center"> {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.get_config" name="链接">}} diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md index 9e3078505fc..396911e3048 100644 --- a/docs/content.zh/docs/dev/table/data_stream_api.md +++ b/docs/content.zh/docs/dev/table/data_stream_api.md @@ -62,6 +62,8 @@ mentioned here for completeness. {{< top >}} +<a name="converting-between-datastream-and-table"></a> + Converting between DataStream and Table --------------------------------------- diff --git a/docs/content/docs/dev/python/table/intro_to_table_api.md b/docs/content/docs/dev/python/table/intro_to_table_api.md index 13bb85736c0..ce540a68610 100644 --- a/docs/content/docs/dev/python/table/intro_to_table_api.md +++ b/docs/content/docs/dev/python/table/intro_to_table_api.md @@ -101,22 +101,25 @@ table_env = TableEnvironment.create(env_settings) For more details about the different ways to create a `TableEnvironment`, please refer to the [TableEnvironment Documentation]({{< ref "docs/dev/python/table/table_environment" >}}#create-a-tableenvironment). -The `TableEnvironment` is responsible for: +`TableEnvironment` is responsible for: -* Creating `Table`s -* Registering `Table`s as a temporary view -* Executing SQL queries, see [SQL]({{< ref "docs/dev/table/sql/overview" >}}) for more details -* Registering user-defined (scalar, table, or aggregation) functions, see [General User-defined Functions]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}) and [Vectorized User-defined Functions]({{< ref "docs/dev/python/table/udfs/vectorized_python_udfs" >}}) for more details -* Configuring the job, see [Python Configuration]({{< ref "docs/dev/python/python_config" >}}) for more details -* Managing Python dependencies, see [Dependency Management]({{< ref "docs/dev/python/dependency_management" >}}) for more details -* Submitting the jobs for execution +* `Table` management: [Creating Tables](#create-tables), listing Tables, [Conversion between Table and DataStream]({{< ref "docs/dev/table/data_stream_api" >}}#converting-between-datastream-and-table), etc. +* User-defined function management: User-defined function registration, dropping, listing, etc. See [General User-defined Functions]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}) and [Vectorized User-defined Functions]({{< ref "docs/dev/python/table/udfs/vectorized_python_udfs" >}}) for more details about Python user-defined functions. +* Executing [SQL]({{< ref "docs/dev/table/sql/overview" >}}) queries: See [Write SQL Queries](#write-sql-queries) for more details. +* Job configuration: See [Python Configuration]({{< ref "docs/dev/python/python_config" >}}) for more details. +* Python dependency management: See [Dependency Management]({{< ref "docs/dev/python/dependency_management" >}}) for more details. +* Job submission: See [Emit Results](#emit-results) for more details. {{< top >}} Create Tables --------------- -`Table` is a core component of the Python Table API. A `Table` is a logical representation of the intermediate result of a Table API Job. +`Table` is a core component of the Python Table API. A `Table` object describes a pipeline of data transformations. It does not +contain the data itself in any way. Instead, it describes how to read data from a table source, +and how to eventually write data to a table sink. The declared pipeline can be +printed, optimized, and eventually executed in a cluster. The pipeline can work with bounded or +unbounded streams which enables both streaming and batch scenarios. A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine tables from different TableEnvironments in same query, e.g., to join or union them. @@ -146,7 +149,7 @@ The results are as following: +----------------------+--------------------------------+ ``` -You can also create the Table with specified column names: +You can also create a Table with specified column names: ```python table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) @@ -164,7 +167,7 @@ The results are as following: +----------------------+--------------------------------+ ``` -By default, the table schema is extracted from the data automatically. If the automatically generated table schema isn't satisfactory, you can also specify it manually: +By default, the table schema is extracted from the data automatically. If the automatically generated table schema isn't as expected, you can also specify it manually: ```python table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) @@ -188,7 +191,7 @@ Now the type of the "id" column is TINYINT. ### Create using DDL statements -You can also create a Table using SQL DDL statements. +You can also create a Table using SQL DDL statements. It represents a Table which reads data from the specified external storage. ```python from pyflink.table import EnvironmentSettings, TableEnvironment @@ -545,6 +548,35 @@ The results are as following: Emit Results ---------------- +### Print the Table + +You can call the `TableResult.print` method to print the content of the Table to console. +This is usually used when you want to preview the table. + +```python +# prepare source tables +source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"]) + +# Get TableResult +table_result = table_env.execute_sql("select a + 1, b, c from %s" % source) + +# Print the table +table_result.print() +``` + +The results are as following: + +```text ++----+----------------------+--------------------------------+--------------------------------+ +| op | EXPR$0 | b | c | ++----+----------------------+--------------------------------+--------------------------------+ +| +I | 2 | Hi | Hello | +| +I | 3 | Hello | Hello | ++----+----------------------+--------------------------------+--------------------------------+ +``` + +<span class="label label-info">Note</span> It will trigger the materialization of the table and collect table content to the memory of the client, it's a good practice to limit the number of rows collected via {{< pythondoc file="pyflink.table.html#pyflink.table.Table.limit" name="Table.limit">}}. + ### Collect Results to Client You can call the `TableResult.collect` method to collect results of a table to client. @@ -557,10 +589,10 @@ The following code shows how to use the `TableResult.collect()` method: source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"]) # Get TableResult -res = table_env.execute_sql("select a + 1, b, c from %s" % source) +table_result = table_env.execute_sql("select a + 1, b, c from %s" % source) # Traversal result -with res.collect() as results: +with table_result.collect() as results: for result in results: print(result) ``` @@ -572,6 +604,8 @@ The results are as following: <Row(3, 'Hello', 'Hello')> ``` +<span class="label label-info">Note</span> It will trigger the materialization of the table and collect table content to the memory of the client, it's a good practice to limit the number of rows collected via {{< pythondoc file="pyflink.table.html#pyflink.table.Table.limit" name="Table.limit">}}. + ### Collect Results to Client by converting it to pandas DataFrame You can call the "to_pandas" method to [convert a `Table` object to a pandas DataFrame]({{< ref "docs/dev/python/table/conversion_of_pandas" >}}#convert-pyflink-table-to-pandas-dataframe): @@ -589,9 +623,9 @@ The results are as following: 1 2 Hello ``` -<span class="label label-info">Note</span> "to_pandas" will trigger the materialization of the table and collect table content to the memory of the client, it's a good practice to limit the number of rows collected via {{< pythondoc file="pyflink.table.html#pyflink.table.Table.limit" name="Table.limit">}}. +<span class="label label-info">Note</span> It will trigger the materialization of the table and collect table content to the memory of the client, it's a good practice to limit the number of rows collected via {{< pythondoc file="pyflink.table.html#pyflink.table.Table.limit" name="Table.limit">}}. -<span class="label label-info">Note</span> "to_pandas" is not supported by the flink planner, and not all data types can be emitted to pandas DataFrames. +<span class="label label-info">Note</span> Not all the data types are supported. ### Emit Results to One Sink Table @@ -676,7 +710,8 @@ Explain Tables ----------------- The Table API provides a mechanism to explain the logical and optimized query plans used to compute a `Table`. -This is done through the `Table.explain()` or `StatementSet.explain()` methods. `Table.explain()`returns the plan of a `Table`. `StatementSet.explain()` returns the plan for multiple sinks. These methods return a string describing three things: +This is done through the `Table.explain()` or `StatementSet.explain()` methods. `Table.explain()` returns the plan of a `Table`. +`StatementSet.explain()` is used to get the plan for a job which contains multiple sinks. These methods return a string describing three things: 1. the Abstract Syntax Tree of the relational query, i.e., the unoptimized logical query plan, 2. the optimized logical query plan, and diff --git a/docs/content/docs/dev/python/table/table_environment.md b/docs/content/docs/dev/python/table/table_environment.md index 5f2cfc61c9b..ac5ad71f66f 100644 --- a/docs/content/docs/dev/python/table/table_environment.md +++ b/docs/content/docs/dev/python/table/table_environment.md @@ -110,17 +110,6 @@ These APIs are used to create/remove Table API/SQL Tables and write queries: {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.from_path" name="link">}} </td> </tr> - <tr> - <td> - <strong>sql_query(query)</strong> - </td> - <td> - Evaluates a SQL query and retrieves the result as a `Table` object. - </td> - <td class="text-center"> - {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.sql_query" name="link">}} - </td> - </tr> <tr> <td> <strong>create_temporary_view(view_path, table)</strong> @@ -170,6 +159,17 @@ These APIs are used to create/remove Table API/SQL Tables and write queries: {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.execute_sql" name="link">}} </td> </tr> + <tr> + <td> + <strong>sql_query(query)</strong> + </td> + <td> + Evaluates a SQL query and retrieves the result as a `Table` object. + </td> + <td class="text-center"> + {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.sql_query" name="link">}} + </td> + </tr> </tbody> </table> @@ -574,7 +574,9 @@ Please refer to the [Dependency Management]({{< ref "docs/dev/python/dependency_ <a href="{{< ref "docs/dev/python/python_config" >}}">Python Configuration</a>. <br> <br> The following code is an example showing how to set the configuration options through this API:<br> # set the parallelism to 8<br> -table_env.get_config().set("parallelism.default", "8") +table_env.get_config().set("parallelism.default", "8")<br> +# set the job name <br> +table_env.get_config().set("pipeline.name", "my_first_job") </td> <td class="text-center"> {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.get_config" name="link">}}