This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new b6d53fb [FLINK-18912][python][docs] Add Python api tutorial under Python GettingStart (#13192) b6d53fb is described below commit b6d53fb1717023faafb79948a305b8dc1e3de2c4 Author: Hequn Cheng <hequn....@alibaba-inc.com> AuthorDate: Thu Aug 20 09:52:33 2020 +0800 [FLINK-18912][python][docs] Add Python api tutorial under Python GettingStart (#13192) --- docs/dev/python/getting-started/tutorial/index.md | 24 ++++ .../python/getting-started/tutorial/index.zh.md | 24 ++++ .../tutorial/table_api_tutorial.md} | 9 +- .../tutorial/table_api_tutorial.zh.md} | 9 +- docs/try-flink/python_table_api.md | 156 +------------------- docs/try-flink/python_table_api.zh.md | 158 +-------------------- 6 files changed, 60 insertions(+), 320 deletions(-) diff --git a/docs/dev/python/getting-started/tutorial/index.md b/docs/dev/python/getting-started/tutorial/index.md new file mode 100644 index 0000000..b862506 --- /dev/null +++ b/docs/dev/python/getting-started/tutorial/index.md @@ -0,0 +1,24 @@ +--- +title: "Tutorial" +nav-id: python_tutorial +nav-parent_id: python_start +nav-pos: 20 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> diff --git a/docs/dev/python/getting-started/tutorial/index.zh.md b/docs/dev/python/getting-started/tutorial/index.zh.md new file mode 100644 index 0000000..e81b7a2 --- /dev/null +++ b/docs/dev/python/getting-started/tutorial/index.zh.md @@ -0,0 +1,24 @@ +--- +title: "教程" +nav-id: python_tutorial +nav-parent_id: python_start +nav-pos: 20 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> diff --git a/docs/try-flink/python_table_api.md b/docs/dev/python/getting-started/tutorial/table_api_tutorial.md similarity index 97% copy from docs/try-flink/python_table_api.md copy to docs/dev/python/getting-started/tutorial/table_api_tutorial.md index 9c8bd9c..401020c 100644 --- a/docs/try-flink/python_table_api.md +++ b/docs/dev/python/getting-started/tutorial/table_api_tutorial.md @@ -1,8 +1,7 @@ --- -title: "Python API Tutorial" -nav-title: Python API -nav-parent_id: try-flink -nav-pos: 4 +title: "Table API Tutorial" +nav-parent_id: python_tutorial +nav-pos: 20 --- <!-- Licensed to the Apache Software Foundation (ASF) under one @@ -180,4 +179,4 @@ pyflink 1 This should get you started with writing your own Flink Python Table API programs. To learn more about the Python Table API, you can refer -[Flink Python Table API Docs]({{ site.pythondocs_baseurl }}/api/python) for more details. +[Flink Python API Docs]({{ site.pythondocs_baseurl }}/api/python) for more details. diff --git a/docs/try-flink/python_table_api.zh.md b/docs/dev/python/getting-started/tutorial/table_api_tutorial.zh.md similarity index 97% copy from docs/try-flink/python_table_api.zh.md copy to docs/dev/python/getting-started/tutorial/table_api_tutorial.zh.md index 3fbea18..f94ab72 100644 --- a/docs/try-flink/python_table_api.zh.md +++ b/docs/dev/python/getting-started/tutorial/table_api_tutorial.zh.md @@ -1,8 +1,7 @@ --- -title: "Python API 教程" -nav-title: Python API -nav-parent_id: try-flink -nav-pos: 4 +title: "Table API 教程" +nav-parent_id: python_tutorial +nav-pos: 20 --- <!-- Licensed to the Apache Software Foundation (ASF) under one @@ -183,4 +182,4 @@ pyflink 1 {% endhighlight %} 上述教程介绍了如何编写并运行一个Flink Python Table API程序,如果想了解Flink Python Table API -的更多信息,可以参考[Flink Python Table API文档]({{ site.pythondocs_baseurl }}/api/python)。 +的更多信息,可以参考[Flink Python API文档]({{ site.pythondocs_baseurl }}/api/python)。 diff --git a/docs/try-flink/python_table_api.md b/docs/try-flink/python_table_api.md index 9c8bd9c..2583c9f 100644 --- a/docs/try-flink/python_table_api.md +++ b/docs/try-flink/python_table_api.md @@ -23,161 +23,9 @@ specific language governing permissions and limitations under the License. --> -This walkthrough will quickly get you started building a pure Python Flink project. - -Please refer to the Python Table API [installation guide]({% link dev/python/getting-started/installation.md %}) on how to set up the Python execution environments. - * This will be replaced by the TOC {:toc} -## Setting up a Python Project - -You can begin by creating a Python project and installing the PyFlink package following the [installation guide]({% link dev/python/getting-started/installation.md %}#installation-of-pyflink). - -## Writing a Flink Python Table API Program - -Table API applications begin by declaring a table environment; either a `BatchTableEvironment` for batch applications or `StreamTableEnvironment` for streaming applications. -This serves as the main entry point for interacting with the Flink runtime. -It can be used for setting execution parameters such as restart strategy, default parallelism, etc. -The table config allows setting Table API specific configurations. - -{% highlight python %} -exec_env = ExecutionEnvironment.get_execution_environment() -exec_env.set_parallelism(1) -t_config = TableConfig() -t_env = BatchTableEnvironment.create(exec_env, t_config) -{% endhighlight %} - -The the table environment created, you can declare source and sink tables. - -{% highlight python %} -t_env.connect(FileSystem().path('/tmp/input')) \ - .with_format(OldCsv() - .field('word', DataTypes.STRING())) \ - .with_schema(Schema() - .field('word', DataTypes.STRING())) \ - .create_temporary_table('mySource') - -t_env.connect(FileSystem().path('/tmp/output')) \ - .with_format(OldCsv() - .field_delimiter('\t') - .field('word', DataTypes.STRING()) - .field('count', DataTypes.BIGINT())) \ - .with_schema(Schema() - .field('word', DataTypes.STRING()) - .field('count', DataTypes.BIGINT())) \ - .create_temporary_table('mySink') -{% endhighlight %} -You can also use the TableEnvironment.sql_update() method to register a source/sink table defined in DDL: -{% highlight python %} -my_source_ddl = """ - create table mySource ( - word VARCHAR - ) with ( - 'connector.type' = 'filesystem', - 'format.type' = 'csv', - 'connector.path' = '/tmp/input' - ) -""" - -my_sink_ddl = """ - create table mySink ( - word VARCHAR, - `count` BIGINT - ) with ( - 'connector.type' = 'filesystem', - 'format.type' = 'csv', - 'connector.path' = '/tmp/output' - ) -""" - -t_env.sql_update(my_source_ddl) -t_env.sql_update(my_sink_ddl) -{% endhighlight %} -This registers a table named `mySource` and a table named `mySink` in the execution environment. -The table `mySource` has only one column, word, and it consumes strings read from file `/tmp/input`. -The table `mySink` has two columns, word and count, and writes data to the file `/tmp/output`, with `\t` as the field delimiter. - -You can now create a job which reads input from table `mySource`, preforms some transformations, and writes the results to table `mySink`. - -{% highlight python %} -t_env.from_path('mySource') \ - .group_by('word') \ - .select('word, count(1)') \ - .insert_into('mySink') -{% endhighlight %} - -Finally you must execute the actual Flink Python Table API job. -All operations, such as creating sources, transformations and sinks are lazy. -Only when `t_env.execute(job_name)` is called will the job be run. - -{% highlight python %} -t_env.execute("tutorial_job") -{% endhighlight %} - -The complete code so far: - -{% highlight python %} -from pyflink.dataset import ExecutionEnvironment -from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment -from pyflink.table.descriptors import Schema, OldCsv, FileSystem - -exec_env = ExecutionEnvironment.get_execution_environment() -exec_env.set_parallelism(1) -t_config = TableConfig() -t_env = BatchTableEnvironment.create(exec_env, t_config) - -t_env.connect(FileSystem().path('/tmp/input')) \ - .with_format(OldCsv() - .field('word', DataTypes.STRING())) \ - .with_schema(Schema() - .field('word', DataTypes.STRING())) \ - .create_temporary_table('mySource') - -t_env.connect(FileSystem().path('/tmp/output')) \ - .with_format(OldCsv() - .field_delimiter('\t') - .field('word', DataTypes.STRING()) - .field('count', DataTypes.BIGINT())) \ - .with_schema(Schema() - .field('word', DataTypes.STRING()) - .field('count', DataTypes.BIGINT())) \ - .create_temporary_table('mySink') - -t_env.from_path('mySource') \ - .group_by('word') \ - .select('word, count(1)') \ - .insert_into('mySink') - -t_env.execute("tutorial_job") -{% endhighlight %} - -## Executing a Flink Python Table API Program -Firstly, you need to prepare input data in the "/tmp/input" file. You can choose the following command line to prepare the input data: - -{% highlight bash %} -$ echo -e "flink\npyflink\nflink" > /tmp/input -{% endhighlight %} - -Next, you can run this example on the command line (Note: if the result file "/tmp/output" has already existed, you need to remove the file before running the example): - -{% highlight bash %} -$ python WordCount.py -{% endhighlight %} - -The command builds and runs the Python Table API program in a local mini cluster. -You can also submit the Python Table API program to a remote cluster, you can refer -[Job Submission Examples]({{ site.baseurl }}/ops/cli.html#job-submission-examples) -for more details. - -Finally, you can see the execution result on the command line: - -{% highlight bash %} -$ cat /tmp/output -flink 2 -pyflink 1 -{% endhighlight %} +## Python Table API tutorial -This should get you started with writing your own Flink Python Table API programs. -To learn more about the Python Table API, you can refer -[Flink Python Table API Docs]({{ site.pythondocs_baseurl }}/api/python) for more details. +You can refer to the [Python Table API Tutorial]({% link dev/python/getting-started/tutorial/table_api_tutorial.md %}) Docs for more details. diff --git a/docs/try-flink/python_table_api.zh.md b/docs/try-flink/python_table_api.zh.md index 3fbea18..8952f5f 100644 --- a/docs/try-flink/python_table_api.zh.md +++ b/docs/try-flink/python_table_api.zh.md @@ -26,161 +26,7 @@ under the License. * This will be replaced by the TOC {:toc} -在该教程中,我们会从零开始,介绍如何创建一个Flink Python项目及运行Python Table API程序。 -关于Python执行环境的要求,请参考Python Table API[环境安装]({% link dev/python/getting-started/installation.zh.md %})。 +## Python Table API 教程 -## 创建一个Python Table API项目 - -首先,使用您最熟悉的IDE创建一个Python项目,然后安装PyFlink包,请参考[PyFlink安装指南]({% link dev/python/getting-started/installation.zh.md %}#pyflink-安装)了解详细信息。 - -## 编写一个Flink Python Table API程序 - -编写Flink Python Table API程序的第一步是创建`BatchTableEnvironment` -(或者`StreamTableEnvironment`,如果你要创建一个流式作业)。这是Python Table API作业的入口类。 - -{% highlight python %} -exec_env = ExecutionEnvironment.get_execution_environment() -exec_env.set_parallelism(1) -t_config = TableConfig() -t_env = BatchTableEnvironment.create(exec_env, t_config) -{% endhighlight %} - -`ExecutionEnvironment` (或者`StreamExecutionEnvironment`,如果你要创建一个流式作业) -可以用来设置执行参数,比如重启策略,缺省并发值等。 - -`TableConfig`可以用来设置缺省的catalog名字,自动生成代码时方法大小的阈值等. - -接下来,我们将介绍如何创建源表和结果表。 - -{% highlight python %} -t_env.connect(FileSystem().path('/tmp/input')) \ - .with_format(OldCsv() - .field('word', DataTypes.STRING())) \ - .with_schema(Schema() - .field('word', DataTypes.STRING())) \ - .create_temporary_table('mySource') - -t_env.connect(FileSystem().path('/tmp/output')) \ - .with_format(OldCsv() - .field_delimiter('\t') - .field('word', DataTypes.STRING()) - .field('count', DataTypes.BIGINT())) \ - .with_schema(Schema() - .field('word', DataTypes.STRING()) - .field('count', DataTypes.BIGINT())) \ - .create_temporary_table('mySink') -{% endhighlight %} - -You can also use the TableEnvironment.sql_update() method to register a source/sink table defined in DDL: -{% highlight python %} -my_source_ddl = """ - create table mySource ( - word VARCHAR - ) with ( - 'connector.type' = 'filesystem', - 'format.type' = 'csv', - 'connector.path' = '/tmp/input' - ) -""" - -my_sink_ddl = """ - create table mySink ( - word VARCHAR, - `count` BIGINT - ) with ( - 'connector.type' = 'filesystem', - 'format.type' = 'csv', - 'connector.path' = '/tmp/output' - ) -""" - -t_env.sql_update(my_source_ddl) -t_env.sql_update(my_sink_ddl) -{% endhighlight %} - -上面的程序展示了如何创建及在`ExecutionEnvironment`中注册表名分别为`mySource`和`mySink`的表。 -其中,源表`mySource`有一列: word,该表代表了从输入文件`/tmp/input`中读取的单词; -结果表`mySink`有两列: word和count,该表会将计算结果输出到文件`/tmp/output`中,字段之间使用`\t`作为分隔符。 - -接下来,我们介绍如何创建一个作业:该作业读取表`mySource`中的数据,进行一些变换,然后将结果写入表`mySink`。 - -{% highlight python %} -t_env.scan('mySource') \ - .group_by('word') \ - .select('word, count(1)') \ - .insert_into('mySink') -{% endhighlight %} - -最后,需要做的就是启动Flink Python Table API作业。上面所有的操作,比如创建源表 -进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当`t_env.execute(job_name)`被调用的时候, -作业才会被真正提交到集群或者本地进行执行。 - -{% highlight python %} -t_env.execute("python_job") -{% endhighlight %} - -该教程的完整代码如下: - -{% highlight python %} -from pyflink.dataset import ExecutionEnvironment -from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment -from pyflink.table.descriptors import Schema, OldCsv, FileSystem - -exec_env = ExecutionEnvironment.get_execution_environment() -exec_env.set_parallelism(1) -t_config = TableConfig() -t_env = BatchTableEnvironment.create(exec_env, t_config) - -t_env.connect(FileSystem().path('/tmp/input')) \ - .with_format(OldCsv() - .field('word', DataTypes.STRING())) \ - .with_schema(Schema() - .field('word', DataTypes.STRING())) \ - .create_temporary_table('mySource') - -t_env.connect(FileSystem().path('/tmp/output')) \ - .with_format(OldCsv() - .field_delimiter('\t') - .field('word', DataTypes.STRING()) - .field('count', DataTypes.BIGINT())) \ - .with_schema(Schema() - .field('word', DataTypes.STRING()) - .field('count', DataTypes.BIGINT())) \ - .create_temporary_table('mySink') - -t_env.from_path('mySource') \ - .group_by('word') \ - .select('word, count(1)') \ - .insert_into('mySink') - -t_env.execute("python_job") -{% endhighlight %} - -## 执行一个Flink Python Table API程序 - -首先,你需要在文件 “/tmp/input” 中准备好输入数据。你可以选择通过如下命令准备输入数据: - -{% highlight bash %} -$ echo -e "flink\npyflink\nflink" > /tmp/input -{% endhighlight %} - -接下来,可以在命令行中运行作业(假设作业名为WordCount.py)(注意:如果输出结果文件“/tmp/output”已经存在,你需要先删除文件,否则程序将无法正确运行起来): - -{% highlight bash %} -$ python WordCount.py -{% endhighlight %} - -上述命令会构建Python Table API程序,并在本地mini cluster中运行。如果想将作业提交到远端集群执行, -可以参考[作业提交示例]({{ site.baseurl }}/zh/ops/cli.html#job-submission-examples)。 - -最后,你可以通过如下命令查看你的运行结果: - -{% highlight bash %} -$ cat /tmp/output -flink 2 -pyflink 1 -{% endhighlight %} - -上述教程介绍了如何编写并运行一个Flink Python Table API程序,如果想了解Flink Python Table API -的更多信息,可以参考[Flink Python Table API文档]({{ site.pythondocs_baseurl }}/api/python)。 +详细信息请参考 [Python Table API 教程]({% link dev/python/getting-started/tutorial/table_api_tutorial.zh.md %})文档