This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4b37c374262 [python][docs] Update documentation to remove deprecated API examples 4b37c374262 is described below commit 4b37c37426282ecbc29bb699e8e3671f013647e7 Author: Dian Fu <dia...@apache.org> AuthorDate: Thu Apr 28 11:08:49 2022 +0800 [python][docs] Update documentation to remove deprecated API examples --- docs/content.zh/docs/dev/python/debugging.md | 6 +- .../docs/dev/python/table/conversion_of_pandas.md | 6 +- .../docs/dev/python/table/intro_to_table_api.md | 17 +-- docs/content.zh/docs/dev/python/table/metrics.md | 19 ++- .../table/operations/row_based_operations.md | 11 +- .../python/table/python_table_api_connectors.md | 4 + .../docs/dev/python/table/udfs/python_udfs.md | 36 +++--- .../python/table/udfs/vectorized_python_udfs.md | 22 +++- docs/content.zh/docs/dev/table/common.md | 8 +- docs/content.zh/docs/dev/table/tableApi.md | 137 +++++++++++---------- docs/content/docs/dev/python/debugging.md | 6 +- .../docs/dev/python/table/conversion_of_pandas.md | 6 +- .../docs/dev/python/table/intro_to_table_api.md | 20 +-- docs/content/docs/dev/python/table/metrics.md | 22 ++-- .../table/operations/row_based_operations.md | 13 +- .../python/table/python_table_api_connectors.md | 4 + .../docs/dev/python/table/udfs/python_udfs.md | 36 +++--- .../python/table/udfs/vectorized_python_udfs.md | 22 +++- docs/content/docs/dev/table/common.md | 8 +- docs/content/docs/dev/table/data_stream_api.md | 2 +- docs/content/docs/dev/table/tableApi.md | 131 ++++++++++---------- .../table/mixing_use_of_datastream_and_table.py | 3 +- .../pyflink/examples/table/pandas/pandas_udaf.py | 4 +- .../pyflink/examples/table/process_json_data.py | 3 +- .../examples/table/process_json_data_with_udf.py | 3 +- .../examples/table/windowing/over_window.py | 2 +- .../examples/table/windowing/session_window.py | 4 +- .../examples/table/windowing/sliding_window.py | 4 +- .../examples/table/windowing/tumble_window.py | 4 +- flink-python/pyflink/table/expression.py | 2 +- flink-python/pyflink/table/schema.py | 6 +- flink-python/pyflink/table/table.py | 121 +++++++++--------- flink-python/pyflink/table/table_config.py | 3 +- flink-python/pyflink/table/table_environment.py | 2 +- flink-python/pyflink/table/window.py | 26 ++-- 35 files changed, 401 insertions(+), 322 deletions(-) diff --git a/docs/content.zh/docs/dev/python/debugging.md b/docs/content.zh/docs/dev/python/debugging.md index 0cefc596b59..8dd058897ad 100644 --- a/docs/content.zh/docs/dev/python/debugging.md +++ b/docs/content.zh/docs/dev/python/debugging.md @@ -58,10 +58,14 @@ print(table.get_schema()) 在作业运行的过程中,日志信息会打印在 `TaskManager` 的日志文件中。 ```python +from pyflink.table import DataTypes +from pyflink.table.udf import udf + +import logging + @udf(result_type=DataTypes.BIGINT()) def add(i, j): # 使用 logging 模块 - import logging logging.info("debug") # 使用 print 函数 print('debug') diff --git a/docs/content.zh/docs/dev/python/table/conversion_of_pandas.md b/docs/content.zh/docs/dev/python/table/conversion_of_pandas.md index 67544095938..1d16020efa1 100644 --- a/docs/content.zh/docs/dev/python/table/conversion_of_pandas.md +++ b/docs/content.zh/docs/dev/python/table/conversion_of_pandas.md @@ -36,6 +36,8 @@ PyFlink 支持将 Pandas DataFrame 转换成 PyFlink Table。在内部实现上 以下示例显示如何从 Pandas DataFrame 创建 PyFlink Table: ```python +from pyflink.table import DataTypes + import pandas as pd import numpy as np @@ -54,7 +56,7 @@ table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()]) # 由Pandas DataFrame创建列名和列类型的PyFlink表 table = t_env.from_pandas(pdf, DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.DOUBLE()), - DataTypes.FIELD("f1", DataTypes.DOUBLE())]) + DataTypes.FIELD("f1", DataTypes.DOUBLE())])) ``` ## 将 PyFlink Table 转换为 Pandas DataFrame @@ -68,6 +70,8 @@ table = t_env.from_pandas(pdf, 以下示例显示了如何将 PyFlink Table 转换为 Pandas DataFrame: ```python +from pyflink.table.expressions import col + import pandas as pd import numpy as np diff --git a/docs/content.zh/docs/dev/python/table/intro_to_table_api.md b/docs/content.zh/docs/dev/python/table/intro_to_table_api.md index fb4008e4cc5..e9fc93aca1b 100644 --- a/docs/content.zh/docs/dev/python/table/intro_to_table_api.md +++ b/docs/content.zh/docs/dev/python/table/intro_to_table_api.md @@ -319,6 +319,7 @@ new_table.execute().print() ```python from pyflink.table import EnvironmentSettings, TableEnvironment +from pyflink.table.expressions import col # 通过 batch table environment 来执行查询 env_settings = EnvironmentSettings.in_batch_mode() @@ -329,11 +330,11 @@ orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30 # 计算所有来自法国客户的收入 revenue = orders \ - .select(orders.name, orders.country, orders.revenue) \ - .where(orders.country == 'FRANCE') \ - .group_by(orders.name) \ - .select(orders.name, orders.revenue.sum.alias('rev_sum')) - + .select(col("name"), col("country"), col("revenue")) \ + .where(col("country") == 'FRANCE') \ + .group_by(col("name")) \ + .select(col("name"), col("country").sum.alias('rev_sum')) + revenue.execute().print() ``` @@ -686,6 +687,7 @@ Table API 提供了一种机制来查看 `Table` 的逻辑查询计划和优化 ```python # 使用流模式 TableEnvironment from pyflink.table import EnvironmentSettings, TableEnvironment +from pyflink.table.expressions import col env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings) @@ -693,7 +695,7 @@ table_env = TableEnvironment.create(env_settings) table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) table = table1 \ - .where(table1.data.like('H%')) \ + .where(col("data").like('H%')) \ .union_all(table2) print(table.explain()) ``` @@ -739,6 +741,7 @@ Stage 136 : Data Source ```python # 使用流模式 TableEnvironment from pyflink.table import EnvironmentSettings, TableEnvironment +from pyflink.table.expressions import col env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(environment_settings=env_settings) @@ -764,7 +767,7 @@ table_env.execute_sql(""" statement_set = table_env.create_statement_set() -statement_set.add_insert("print_sink_table", table1.where(table1.data.like('H%'))) +statement_set.add_insert("print_sink_table", table1.where(col("data").like('H%'))) statement_set.add_insert("black_hole_sink_table", table2) print(statement_set.explain()) diff --git a/docs/content.zh/docs/dev/python/table/metrics.md b/docs/content.zh/docs/dev/python/table/metrics.md index e00af42c5d0..fb0c7840c00 100644 --- a/docs/content.zh/docs/dev/python/table/metrics.md +++ b/docs/content.zh/docs/dev/python/table/metrics.md @@ -127,7 +127,6 @@ class MyUDF(ScalarFunction): self.meter = None def open(self, function_context): - super().open(function_context) # 120秒内统计的平均每秒事件数,默认是60秒 self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120) @@ -151,14 +150,14 @@ class MyUDF(ScalarFunction): {{< tab "Python" >}} ```python -function_context - .get_metric_group() - .add_group("my_metrics") +function_context \ + .get_metric_group() \ + .add_group("my_metrics") \ .counter("my_counter") -function_context - .get_metric_group() - .add_group("my_metrics_key", "my_metrics_value") +function_context \ + .get_metric_group() \ + .add_group("my_metrics_key", "my_metrics_value") \ .counter("my_counter") ``` @@ -182,9 +181,9 @@ function_context {{< tabs "6d0715c0-6c39-489a-b3f3-e9bf7d50c268" >}} {{< tab "Python" >}} ```python -function_context - .get_metric_group() - .add_group("my_metrics_key", "my_metrics_value") +function_context \ + .get_metric_group() \ + .add_group("my_metrics_key", "my_metrics_value") \ .counter("my_counter") ``` {{< /tab >}} diff --git a/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md b/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md index 59f3a61de5b..59e9fa03a24 100644 --- a/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md +++ b/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md @@ -44,7 +44,7 @@ table_env = TableEnvironment.create(env_settings) table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()), - DataTypes.FIELD("data", DataTypes.STRING())])) + DataTypes.FIELD("data", DataTypes.STRING())])) def func1(id: int, data: str) -> Row: return Row(id, data * 2) @@ -63,7 +63,7 @@ It also supports to take a Row object (containing all the columns of the input t ```python @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()), - DataTypes.FIELD("data", DataTypes.STRING())])) + DataTypes.FIELD("data", DataTypes.STRING())])) def func2(data: Row) -> Row: return Row(data.id, data.data * 2) @@ -86,7 +86,7 @@ It should be noted that the input type and output type should be pandas.DataFram ```python import pandas as pd @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()), - DataTypes.FIELD("data", DataTypes.STRING())]), + DataTypes.FIELD("data", DataTypes.STRING())]), func_type='pandas') def func3(data: pd.DataFrame) -> pd.DataFrame: res = pd.concat([data.id, data.data * 2], axis=1) @@ -288,10 +288,11 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'), (3, 'Hi', 'hi'), (5, 'Hi2', 'hi'), (7, 'Hi', 'Hello'), - (2, 'Hi', 'Hello')], ['a', 'b', 'c']) + (2, 'Hi', 'Hello')], + ['a', 'b', 'c']) # call function "inline" without registration in Table API -result = t.group_by(t.b).flat_aggregate(top2).select(col('*')).to_pandas() +result = t.group_by(col('b')).flat_aggregate(top2).select(col('*')).to_pandas() # the result is: #+----+--------------------------------+----------------------+ diff --git a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md index 6be2c1e7275..d718d4d45fe 100644 --- a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md +++ b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md @@ -134,6 +134,8 @@ if __name__ == '__main__': PyFlink 表支持与 Pandas DataFrame 之间互相转换。 ```python +from pyflink.table.expressions import col + import pandas as pd import numpy as np @@ -150,6 +152,8 @@ pdf = table.to_pandas() `from_elements()` 用于从一个元素集合中创建一张表。元素类型必须是可支持的原子类型或者复杂类型。 ```python +from pyflink.table import DataTypes + table_env.from_elements([(1, 'Hi'), (2, 'Hello')]) # 使用第二个参数指定自定义字段名 diff --git a/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md b/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md index 4377474cabb..7194eb00323 100644 --- a/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md +++ b/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md @@ -41,7 +41,9 @@ Python 标量函数的行为由名为 `eval` 的方法定义,`eval` 方法支 以下示例显示了如何定义自己的 Python 哈希函数、如何在 TableEnvironment 中注册它以及如何在作业中使用它。 ```python -from pyflink.table.expressions import call +from pyflink.table.expressions import call, col +from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings +from pyflink.table.udf import ScalarFunction, udf class HashCode(ScalarFunction): def __init__(self): @@ -56,7 +58,7 @@ table_env = TableEnvironment.create(settings) hash_code = udf(HashCode(), result_type=DataTypes.BIGINT()) # 在 Python Table API 中使用 Python 自定义函数 -my_table.select(my_table.string, my_table.bigint, hash_code(my_table.bigint), call(hash_code, my_table.bigint)) +my_table.select(col("string"), col("bigint"), hash_code(col("bigint")), call(hash_code, col("bigint"))) # 在 SQL API 中使用 Python 自定义函数 table_env.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT())) @@ -78,7 +80,8 @@ public class HashCode extends ScalarFunction { } } ''' -from pyflink.table.expressions import call +from pyflink.table.expressions import call, col +from pyflink.table import TableEnvironment, EnvironmentSettings settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(settings) @@ -87,7 +90,7 @@ table_env = TableEnvironment.create(settings) table_env.create_java_temporary_function("hash_code", "my.java.function.HashCode") # 在 Python Table API 中使用 Java 函数 -my_table.select(call('hash_code', my_table.string)) +my_table.select(call('hash_code', col("string"))) # 在 SQL API 中使用 Java 函数 table_env.sql_query("SELECT string, bigint, hash_code(string) FROM MyTable") @@ -128,10 +131,10 @@ add = udf(functools.partial(partial_add, k=1), result_type=DataTypes.BIGINT()) # 注册 Python 自定义函数 table_env.create_temporary_function("add", add) # 在 Python Table API 中使用 Python 自定义函数 -my_table.select(call('add', my_table.a, my_table.b)) +my_table.select(call('add', col('a'), col('b'))) # 也可以在 Python Table API 中直接使用 Python 自定义函数 -my_table.select(add(my_table.a, my_table.b)) +my_table.select(add(col('a'), col('b'))) ``` <a name="table-functions"></a> @@ -143,6 +146,10 @@ my_table.select(add(my_table.a, my_table.b)) 以下示例说明了如何定义自己的 Python 自定义表值函数,将其注册到 TableEnvironment 中,并在作业中使用它。 ```python +from pyflink.table.expressions import col +from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings +from pyflink.table.udf import TableFunction, udtf + class Split(TableFunction): def eval(self, string): for s in string.split(" "): @@ -156,8 +163,8 @@ my_table = ... # type: Table, table schema: [a: String] split = udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()]) # 在 Python Table API 中使用 Python 表值函数 -my_table.join_lateral(split(my_table.a).alias("word", "length")) -my_table.left_outer_join_lateral(split(my_table.a).alias("word", "length")) +my_table.join_lateral(split(col("a")).alias("word", "length")) +my_table.left_outer_join_lateral(split(col("a")).alias("word", "length")) # 在 SQL API 中使用 Python 表值函数 table_env.create_temporary_function("split", udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()])) @@ -183,7 +190,8 @@ public class Split extends TableFunction<Tuple2<String, Integer>> { } } ''' -from pyflink.table.expressions import call +from pyflink.table.expressions import call, col +from pyflink.table import TableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings) @@ -193,8 +201,8 @@ my_table = ... # type: Table, table schema: [a: String] table_env.create_java_temporary_function("split", "my.java.function.Split") # 在 Python Table API 中使用表值函数。 "alias"指定表的字段名称。 -my_table.join_lateral(call('split', my_table.a).alias("word", "length")).select(my_table.a, col('word'), col('length')) -my_table.left_outer_join_lateral(call('split', my_table.a).alias("word", "length")).select(my_table.a, col('word'), col('length')) +my_table.join_lateral(call('split', col('a')).alias("word", "length")).select(col('a'), col('word'), col('length')) +my_table.left_outer_join_lateral(call('split', col('a')).alias("word", "length")).select(col('a'), col('word'), col('length')) # 注册 Python 函数。 @@ -311,14 +319,14 @@ t = table_env.from_elements([(1, 2, "Lee"), (7, 8, "Lee")]).alias("value", "count", "name") # call function "inline" without registration in Table API -result = t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg")).execute() +result = t.group_by(col("name")).select(weighted_avg(col("value"), col("count")).alias("avg")).execute() result.print() # register function table_env.create_temporary_function("weighted_avg", WeightedAvg()) # call registered function in Table API -result = t.group_by(t.name).select(call("weighted_avg", t.value, t.count).alias("avg")).execute() +result = t.group_by(col("name")).select(call("weighted_avg", col("value"), col("count")).alias("avg")).execute() result.print() # register table @@ -487,7 +495,7 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'), ['a', 'b', 'c']) # call function "inline" without registration in Table API -t.group_by(t.b).flat_aggregate(top2).select(col('*')).execute().print() +t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print() # the result is: +----+--------------------------------+----------------------+ diff --git a/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md b/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md index 10012d8ba4a..5454af1057d 100644 --- a/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md +++ b/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md @@ -49,6 +49,10 @@ under the License. 以下示例显示了如何定义自己的向量化 Python 标量函数,该函数计算两列的总和,并在查询中使用它: ```python +from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings +from pyflink.table.expressions import col +from pyflink.table.udf import udf + @udf(result_type=DataTypes.BIGINT(), func_type="pandas") def add(i, j): return i + j @@ -57,7 +61,7 @@ settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(settings) # use the vectorized Python scalar function in Python Table API -my_table.select(add(my_table.bigint, my_table.bigint)) +my_table.select(add(col("bigint"), col("bigint"))) # 在SQL API中使用Python向量化标量函数 table_env.create_temporary_function("add", add) @@ -81,6 +85,11 @@ table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable") and `Over Window Aggregation` 使用它: ```python +from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings +from pyflink.table.expressions import col, lit +from pyflink.table.udf import udaf +from pyflink.table.window import Tumble + @udaf(result_type=DataTypes.FLOAT(), func_type="pandas") def mean_udaf(v): return v.mean() @@ -91,16 +100,16 @@ table_env = TableEnvironment.create(settings) my_table = ... # type: Table, table schema: [a: String, b: BigInt, c: BigInt] # 在 GroupBy Aggregation 中使用向量化聚合函数 -my_table.group_by(my_table.a).select(my_table.a, mean_udaf(add(my_table.b))) +my_table.group_by(col('a')).select(col('a'), mean_udaf(col('b'))) # 在 GroupBy Window Aggregation 中使用向量化聚合函数 -tumble_window = Tumble.over(expr.lit(1).hours) \ - .on(expr.col("rowtime")) \ +tumble_window = Tumble.over(lit(1).hours) \ + .on(col("rowtime")) \ .alias("w") my_table.window(tumble_window) \ - .group_by("w") \ + .group_by(col("w")) \ .select(col('w').start, col('w').end, mean_udaf(col('b'))) # 在 Over Window Aggregation 中使用向量化聚合函数 @@ -117,6 +126,9 @@ table_env.sql_query(""" 以下示例显示了多种定义向量化 Python 聚合函数的方式。该函数需要两个类型为 bigint 的参数作为输入参数,并返回它们的最大值的和作为结果。 ```python +from pyflink.table import DataTypes +from pyflink.table.udf import AggregateFunction, udaf + # 方式一:扩展基类 `AggregateFunction` class MaxAdd(AggregateFunction): diff --git a/docs/content.zh/docs/dev/table/common.md b/docs/content.zh/docs/dev/table/common.md index 02f06056a7f..83d0ddacdcc 100644 --- a/docs/content.zh/docs/dev/table/common.md +++ b/docs/content.zh/docs/dev/table/common.md @@ -475,9 +475,9 @@ table_env = # see "Create a TableEnvironment" section orders = table_env.from_path("Orders") # compute revenue for all customers from France revenue = orders \ - .filter(orders.cCountry == 'FRANCE') \ - .group_by(orders.cID, orders.cName) \ - .select(orders.cID, orders.cName, orders.revenue.sum.alias('revSum')) + .filter(col('cCountry') == 'FRANCE') \ + .group_by(col('cID'), col('cName')) \ + .select(col('cID'), col('cName'), col('revenue').sum.alias('revSum')) # emit or convert Table # execute query @@ -856,7 +856,7 @@ t_env = StreamTableEnvironment.create(env) table1 = t_env.from_elements([(1, "hello")], ["count", "word"]) table2 = t_env.from_elements([(1, "hello")], ["count", "word"]) table = table1 \ - .where(table1.word.like('F%')) \ + .where(col('word').like('F%')) \ .union_all(table2) print(table.explain()) diff --git a/docs/content.zh/docs/dev/table/tableApi.md b/docs/content.zh/docs/dev/table/tableApi.md index d2b198ca29c..e5961db83d5 100644 --- a/docs/content.zh/docs/dev/table/tableApi.md +++ b/docs/content.zh/docs/dev/table/tableApi.md @@ -113,6 +113,7 @@ val result = orders ```python from pyflink.table import * +from pyflink.table.expressions import col # 环境配置 t_env = TableEnvironment.create( @@ -151,8 +152,7 @@ t_env.execute_sql(sink_ddl) # 指定表程序 orders = t_env.from_path("Orders") # schema (a, b, c, rowtime) -orders.group_by("a").select(orders.a, orders.b.count.alias('cnt')).execute_insert("result").wait() - +orders.group_by(col("a")).select(col("a"), col("b").count.alias('cnt')).execute_insert("result").wait() ``` {{< /tab >}} @@ -207,14 +207,15 @@ val result: Table = orders ```python # 指定表程序 from pyflink.table.expressions import col, lit +from pyflink.table.window import Tumble orders = t_env.from_path("Orders") # schema (a, b, c, rowtime) -result = orders.filter(orders.a.is_not_null & orders.b.is_not_null & orders.c.is_not_null) \ - .select(orders.a.lower_case.alias('a'), orders.b, orders.rowtime) \ - .window(Tumble.over(lit(1).hour).on(orders.rowtime).alias("hourly_window")) \ +result = orders.filter(col("a").is_not_null & col("b").is_not_null & col("c").is_not_null) \ + .select(col("a").lower_case.alias('a'), col("b"), col("rowtime")) \ + .window(Tumble.over(lit(1).hour).on(col("rowtime")).alias("hourly_window")) \ .group_by(col('hourly_window'), col('a')) \ - .select(col('a'), col('hourly_window').end.alias('hour'), b.avg.alias('avg_billing_amount')) + .select(col('a'), col('hourly_window').end.alias('hour'), col("b").avg.alias('avg_billing_amount')) ``` {{< /tab >}} @@ -369,7 +370,7 @@ Table result = orders.select($"a", $"c" as "d") {{< tab "Python" >}} ```python orders = t_env.from_path("Orders") -result = orders.select(orders.a, orders.c.alias('d')) +result = orders.select(col("a"), col("c").alias('d')) ``` {{< /tab >}} {{< /tabs >}} @@ -446,7 +447,7 @@ val result = orders.filter($"a" % 2 === 0) {{< tab "Python" >}} ```python orders = t_env.from_path("Orders") -result = orders.where(orders.a == 'red') +result = orders.where(col("a") == 'red') ``` {{< /tab >}} {{< /tabs >}} @@ -469,7 +470,7 @@ val result = orders.filter($"a" % 2 === 0) {{< tab "Python" >}} ```python orders = t_env.from_path("Orders") -result = orders.filter(orders.a == 'red') +result = orders.filter(col("a") == 'red') ``` {{< /tab >}} {{< /tabs >}} @@ -504,7 +505,7 @@ val result = orders.addColumns(concat($"c", "Sunny")) from pyflink.table.expressions import concat orders = t_env.from_path("Orders") -result = orders.add_columns(concat(orders.c, 'sunny')) +result = orders.add_columns(concat(col("c"), 'sunny')) ``` {{< /tab >}} {{< /tabs >}} @@ -535,7 +536,7 @@ val result = orders.addOrReplaceColumns(concat($"c", "Sunny") as "desc") from pyflink.table.expressions import concat orders = t_env.from_path("Orders") -result = orders.add_or_replace_columns(concat(orders.c, 'sunny').alias('desc')) +result = orders.add_or_replace_columns(concat(col("c"), 'sunny').alias('desc')) ``` {{< /tab >}} {{< /tabs >}} @@ -560,7 +561,7 @@ val result = orders.dropColumns($"b", $"c") {{< tab "Python" >}} ```python orders = t_env.from_path("Orders") -result = orders.drop_columns(orders.b, orders.c) +result = orders.drop_columns(col("b"), col("c")) ``` {{< /tab >}} {{< /tabs >}} @@ -588,7 +589,7 @@ val result = orders.renameColumns($"b" as "b2", $"c" as "c2") {{< tab "Python" >}} ```python orders = t_env.from_path("Orders") -result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2')) +result = orders.rename_columns(col("b").alias('b2'), col("c").alias('c2')) ``` {{< /tab >}} {{< /tabs >}} @@ -621,7 +622,7 @@ val result = orders.groupBy($"a").select($"a", $"b".sum().as("d")) {{< tab "Python" >}} ```python orders = t_env.from_path("Orders") -result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d')) +result = orders.group_by(col("a")).select(col("a"), col("b").sum.alias('d')) ``` {{< /tab >}} {{< /tabs >}} @@ -666,9 +667,9 @@ from pyflink.table.window import Tumble from pyflink.table.expressions import lit, col orders = t_env.from_path("Orders") -result = orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \ - .group_by(orders.a, col('w')) \ - .select(orders.a, col('w').start, col('w').end, orders.b.sum.alias('d')) +result = orders.window(Tumble.over(lit(5).minutes).on(col('rowtime')).alias("w")) \ + .group_by(col('a'), col('w')) \ + .select(col('a'), col('w').start, col('w').end, col('b').sum.alias('d')) ``` {{< /tab >}} {{< /tabs >}} @@ -721,10 +722,10 @@ from pyflink.table.window import Over from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE orders = t_env.from_path("Orders") -result = orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime) +result = orders.over_window(Over.partition_by(col("a")).order_by(col("rowtime")) .preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE) .alias("w")) \ - .select(orders.a, orders.b.avg.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w'))) + .select(col("a"), col("b").avg.over(col('w')), col("b").max.over(col('w')), col("b").min.over(col('w'))) ``` {{< /tab >}} {{< /tabs >}} @@ -795,22 +796,23 @@ val result = orders {{< tab "Python" >}} ```python from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE +from pyflink.table.window import Over, Tumble orders = t_env.from_path("Orders") # 按属性分组后的的互异(互不相同、去重)聚合 -group_by_distinct_result = orders.group_by(orders.a) \ - .select(orders.a, orders.b.sum.distinct.alias('d')) +group_by_distinct_result = orders.group_by(col("a")) \ + .select(col("a"), col("b").sum.distinct.alias('d')) # 按属性、时间窗口分组后的互异(互不相同、去重)聚合 -group_by_window_distinct_result = orders.window( - Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(orders.a, col('w')) \ - .select(orders.a, orders.b.sum.distinct.alias('d')) +group_by_window_distinct_result = orders.window(Tumble.over(lit(5).minutes).on(col("rowtime")).alias("w")) \ + .group_by(col("a"), col('w')) \ + .select(col("a"), col("b").sum.distinct.alias('d')) # over window 上的互异(互不相同、去重)聚合 result = orders.over_window(Over - .partition_by(orders.a) - .order_by(orders.rowtime) - .preceding(UNBOUNDED_RANGE) - .alias("w")) \ - .select(orders.a, orders.b.avg.distinct.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w'))) + .partition_by(col("a")) + .order_by(col("rowtime")) + .preceding(UNBOUNDED_RANGE) + .alias("w")) \ + .select(col("a"), col("b").avg.distinct.over(col('w')), col("b").max.over(col('w')), col("b").min.over(col('w'))) ``` {{< /tab >}} {{< /tabs >}} @@ -911,7 +913,7 @@ from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f')) -result = left.join(right).where(left.a == right.d).select(left.a, left.b, right.e) +result = left.join(right).where(col('a') == col('d')).select(col('a'), col('b'), col('e')) ``` {{< /tab >}} {{< /tabs >}} @@ -957,9 +959,9 @@ from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f')) -left_outer_result = left.left_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) -right_outer_result = left.right_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) -full_outer_result = left.full_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) +left_outer_result = left.left_outer_join(right, col('a') == col('d')).select(col('a'), col('b'), col('e')) +right_outer_result = left.right_outer_join(right, col('a') == col('d')).select(col('a'), col('b'), col('e')) +full_outer_result = left.full_outer_join(right, col('a') == col('d')).select(col('a'), col('b'), col('e')) ``` {{< /tab >}} {{< /tabs >}} @@ -1006,9 +1008,9 @@ from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'), col('rowtime1')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'), col('rowtime2')) - -joined_table = left.join(right).where((left.a == right.d) & (left.rowtime1 >= right.rowtime2 - lit(1).second) & (left.rowtime1 <= right.rowtime2 + lit(2).seconds)) -result = joined_table.select(joined_table.a, joined_table.b, joined_table.e, joined_table.rowtime1) + +joined_table = left.join(right).where((col('a') == col('d')) & (col('rowtime1') >= col('rowtime2') - lit(1).second) & (col('rowtime1') <= col('rowtime2') + lit(2).seconds)) +result = joined_table.select(col('a'), col('b'), col('e'), col('rowtime1')) ``` {{< /tab >}} {{< /tabs >}} @@ -1054,8 +1056,8 @@ def split(x): # join orders = t_env.from_path("Orders") -joined_table = orders.join_lateral(split(orders.c).alias("s", "t", "v")) -result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, joined_table.t, joined_table.v) +joined_table = orders.join_lateral(split(col('c')).alias("s", "t", "v")) +result = joined_table.select(col('a'), col('b'), col('s'), col('t'), col('v')) ``` {{< /tab >}} {{< /tabs >}} @@ -1103,8 +1105,8 @@ def split(x): # join orders = t_env.from_path("Orders") -joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s", "t", "v")) -result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, joined_table.t, joined_table.v) +joined_table = orders.left_outer_join_lateral(split(col('c')).alias("s", "t", "v")) +result = joined_table.select(col('a'), col('b'), col('s'), col('t'), col('v')) ``` {{< /tab >}} {{< /tabs >}} @@ -1370,7 +1372,7 @@ val result = left.select($"a", $"b", $"c").where($"a".in(right)) left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('a')) -result = left.select(left.a, left.b, left.c).where(left.a.in_(right)) +result = left.select(col('a'), col('b'), col('c')).where(col('a').in_(right)) ``` {{< /tab >}} {{< /tabs >}} @@ -1390,17 +1392,17 @@ result = left.select(left.a, left.b, left.c).where(left.a.in_(right)) {{< tabs "orderby" >}} {{< tab "Java" >}} ```java -Table result = in.orderBy($("a").asc()); +Table result = tab.orderBy($("a").asc()); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -val result = in.orderBy($"a".asc) +val result = tab.orderBy($"a".asc) ``` {{< /tab >}} {{< tab "Python" >}} ```python -result = in.order_by(in.a.asc) +result = tab.order_by(col('a').asc) ``` {{< /tab >}} {{< /tabs >}} @@ -1440,13 +1442,13 @@ val result3: Table = in.orderBy($"a".asc).offset(10).fetch(5) {{< tab "Python" >}} ```python # 从已排序的结果集中返回前5条记录 -result1 = table.order_by(table.a.asc).fetch(5) +result1 = table.order_by(col('a').asc).fetch(5) # 从已排序的结果集中返回跳过3条记录之后的所有记录 -result2 = table.order_by(table.a.asc).offset(3) +result2 = table.order_by(col('a').asc).offset(3) # 从已排序的结果集中返回跳过10条记录之后的前5条记录 -result3 = table.order_by(table.a.asc).offset(10).fetch(5) +result3 = table.order_by(col('a').asc).offset(10).fetch(5) ``` {{< /tab >}} {{< /tabs >}} @@ -1518,7 +1520,7 @@ val table = input ```python # 定义窗口并指定别名为 w,以窗口 w 对表进行分组,然后再聚合 table = input.window([w: GroupWindow].alias("w")) \ - .group_by(col('w')).select(input.b.sum) + .group_by(col('w')).select(col('b').sum) ``` {{< /tab >}} {{< /tabs >}} @@ -1554,7 +1556,7 @@ val table = input # 定义窗口并指定别名为 w,以属性 a 和窗口 w 对表进行分组, # 然后再聚合 table = input.window([w: GroupWindow].alias("w")) \ - .group_by(col('w'), input.a).select(input.b.sum) + .group_by(col('w'), col('a')).select(col('b').sum) ``` {{< /tab >}} {{< /tabs >}} @@ -1583,8 +1585,8 @@ val table = input # 定义窗口并指定别名为 w,以属性 a 和窗口 w 对表进行分组, # 然后再聚合并添加窗口开始、结束和 rowtime 时间戳 table = input.window([w: GroupWindow].alias("w")) \ - .group_by(col('w'), input.a) \ - .select(input.a, col('w').start, col('w').end, col('w').rowtime, input.b.count) + .group_by(col('w'), col('a')) \ + .select(col('a'), col('w').start, col('w').end, col('w').rowtime, col('b').count) ``` {{< /tab >}} {{< /tabs >}} @@ -1989,8 +1991,8 @@ val table = input {{< tab "Python" >}} ```python # define over window with alias w and aggregate over the over window w -table = input.over_window([w: OverWindow].alias("w")) \ - .select(input.a, input.b.sum.over(col('w')), input.c.min.over(col('w'))) +table = input.over_window([w: OverWindow].alias("w")) + .select(col('a'), col('b').sum.over(col('w')), col('c').min.over(col('w'))) ``` {{< /tab >}} {{< /tabs >}} @@ -2217,9 +2219,9 @@ func = udf(map_function, result_type=DataTypes.ROW( table = input.map(func).alias('a', 'b') # 使用 python 向量化标量函数进行 map 操作 -pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW( - [DataTypes.FIELD("a", DataTypes.BIGINT()), - DataTypes.FIELD("b", DataTypes.BIGINT()))]), +pandas_func = udf(lambda x: x * 2, + result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT()), + DataTypes.FIELD("b", DataTypes.BIGINT())]), func_type='pandas') table = input.map(pandas_func).alias('a', 'b') @@ -2451,7 +2453,7 @@ agg = udaf(function, name=str(function.__class__.__name__)) # 使用 python 通用聚合函数进行聚合 -result = t.group_by(t.a) \ +result = t.group_by(col('a')) \ .aggregate(agg.alias("c", "d")) \ .select(col('a'), col('c'), col('d')) @@ -2462,7 +2464,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), DataTypes.FIELD("b", DataTypes.INT())]), func_type="pandas") t.aggregate(pandas_udaf.alias("a", "b")) \ - .select(col('a'), col('b')) + .select(col('a'), col('b')) ``` {{< /tab >}} @@ -2503,16 +2505,18 @@ val table = input ```python from pyflink.table import DataTypes from pyflink.table.udf import AggregateFunction, udaf +from pyflink.table.expressions import col, lit +from pyflink.table.window import Tumble pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), result_type=DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.FLOAT()), DataTypes.FIELD("b", DataTypes.INT())]), func_type="pandas") -tumble_window = Tumble.over(expr.lit(1).hours) \ - .on(expr.col("rowtime")) \ +tumble_window = Tumble.over(lit(1).hours) \ + .on(col("rowtime")) \ .alias("w") -t.select(t.b, t.rowtime) \ +t.select(col('b'), col('rowtime')) \ .window(tumble_window) \ .group_by(col("w")) \ .aggregate(pandas_udaf.alias("d", "e")) \ @@ -2666,6 +2670,7 @@ val result = orders from pyflink.common import Row from pyflink.table.udf import TableAggregateFunction, udtaf from pyflink.table import DataTypes +from pyflink.table.expressions import col class Top2(TableAggregateFunction): @@ -2701,13 +2706,13 @@ t = t_env.from_elements([(1, 'Hi', 'Hello'), (3, 'Hi', 'hi'), (5, 'Hi2', 'hi'), (7, 'Hi', 'Hello'), - (2, 'Hi', 'Hello')], ['a', 'b', 'c']) -result = t.select(t.a, t.c) \ - .group_by(t.c) \ + (2, 'Hi', 'Hello')], + ['a', 'b', 'c']) +result = t.select(col('a'), col('c')) \ + .group_by(col('c')) \ .flat_aggregate(mytop) \ - .select(t.a) \ + .select(col('a')) \ .flat_aggregate(mytop.alias("b")) - ``` {{< /tab >}} {{< /tabs >}} diff --git a/docs/content/docs/dev/python/debugging.md b/docs/content/docs/dev/python/debugging.md index a1723e2a55e..d0597032513 100644 --- a/docs/content/docs/dev/python/debugging.md +++ b/docs/content/docs/dev/python/debugging.md @@ -60,10 +60,14 @@ You can log contextual and debug information via `print` or standard Python logg The logging messages will be printed in the log files of the `TaskManagers` during job execution. ```python +from pyflink.table import DataTypes +from pyflink.table.udf import udf + +import logging + @udf(result_type=DataTypes.BIGINT()) def add(i, j): # use logging modules - import logging logging.info("debug") # use print function print('debug') diff --git a/docs/content/docs/dev/python/table/conversion_of_pandas.md b/docs/content/docs/dev/python/table/conversion_of_pandas.md index 36a43e1c91b..46e90bdd331 100644 --- a/docs/content/docs/dev/python/table/conversion_of_pandas.md +++ b/docs/content/docs/dev/python/table/conversion_of_pandas.md @@ -39,6 +39,8 @@ provide exactly-once guarantees. The following example shows how to create a PyFlink Table from a Pandas DataFrame: ```python +from pyflink.table import DataTypes + import pandas as pd import numpy as np @@ -57,7 +59,7 @@ table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()]) # Create a PyFlink Table from a Pandas DataFrame with the specified row type table = t_env.from_pandas(pdf, DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.DOUBLE()), - DataTypes.FIELD("f1", DataTypes.DOUBLE())]) + DataTypes.FIELD("f1", DataTypes.DOUBLE())])) ``` ## Convert PyFlink Table to Pandas DataFrame @@ -72,6 +74,8 @@ You can limit the number of rows collected to client side via {{< pythondoc file The following example shows how to convert a PyFlink Table to a Pandas DataFrame: ```python +from pyflink.table.expressions import col + import pandas as pd import numpy as np diff --git a/docs/content/docs/dev/python/table/intro_to_table_api.md b/docs/content/docs/dev/python/table/intro_to_table_api.md index 938e0379955..13bb85736c0 100644 --- a/docs/content/docs/dev/python/table/intro_to_table_api.md +++ b/docs/content/docs/dev/python/table/intro_to_table_api.md @@ -36,6 +36,7 @@ All Table API and SQL programs, both batch and streaming, follow the same patter ```python from pyflink.table import EnvironmentSettings, TableEnvironment +from pyflink.table.expressions import col # 1. create a TableEnvironment env_settings = EnvironmentSettings.in_streaming_mode() @@ -70,7 +71,7 @@ source_table = table_env.from_path("datagen") # or create a Table from a SQL query: # source_table = table_env.sql_query("SELECT * FROM datagen") -result_table = source_table.select(source_table.id + 1, source_table.data) +result_table = source_table.select(col("id") + 1, col("data")) # 5. emit query result to sink table # emit a Table API result Table to a sink table: @@ -319,6 +320,7 @@ The following example shows a simple Table API aggregation query: ```python from pyflink.table import EnvironmentSettings, TableEnvironment +from pyflink.table.expressions import col # using batch table environment to execute the queries env_settings = EnvironmentSettings.in_batch_mode() @@ -329,11 +331,11 @@ orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30 # compute revenue for all customers from France revenue = orders \ - .select(orders.name, orders.country, orders.revenue) \ - .where(orders.country == 'FRANCE') \ - .group_by(orders.name) \ - .select(orders.name, orders.revenue.sum.alias('rev_sum')) - + .select(col("name"), col("country"), col("revenue")) \ + .where(col("country") == 'FRANCE') \ + .group_by(col("name")) \ + .select(col("name"), col("country").sum.alias('rev_sum')) + revenue.execute().print() ``` @@ -687,6 +689,7 @@ The following code shows how to use the `Table.explain()` method: ```python # using a stream TableEnvironment from pyflink.table import EnvironmentSettings, TableEnvironment +from pyflink.table.expressions import col env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings) @@ -694,7 +697,7 @@ table_env = TableEnvironment.create(env_settings) table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) table = table1 \ - .where(table1.data.like('H%')) \ + .where(col("data").like('H%')) \ .union_all(table2) print(table.explain()) ``` @@ -740,6 +743,7 @@ The following code shows how to use the `StatementSet.explain()` method: ```python # using a stream TableEnvironment from pyflink.table import EnvironmentSettings, TableEnvironment +from pyflink.table.expressions import col env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings) @@ -765,7 +769,7 @@ table_env.execute_sql(""" statement_set = table_env.create_statement_set() -statement_set.add_insert("print_sink_table", table1.where(table1.data.like('H%'))) +statement_set.add_insert("print_sink_table", table1.where(col("data").like('H%'))) statement_set.add_insert("black_hole_sink_table", table2) print(statement_set.explain()) diff --git a/docs/content/docs/dev/python/table/metrics.md b/docs/content/docs/dev/python/table/metrics.md index 56441980de8..2767c654ecf 100644 --- a/docs/content/docs/dev/python/table/metrics.md +++ b/docs/content/docs/dev/python/table/metrics.md @@ -60,7 +60,6 @@ class MyUDF(ScalarFunction): def eval(self, i): self.counter.inc(i) return i - ``` {{< /tab >}} {{< /tabs >}} @@ -137,7 +136,6 @@ class MyUDF(ScalarFunction): self.meter = None def open(self, function_context): - super().open(function_context) # an average rate of events per second over 120s, default is 60s. self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120) @@ -162,17 +160,15 @@ group's sub-groups. In this case, the value group will be returned, and a user v {{< tabs "a3040b2d-bf2d-4ce4-be2f-2896f48334c8" >}} {{< tab "Python" >}} ```python - -function_context - .get_metric_group() - .add_group("my_metrics") +function_context \ + .get_metric_group() \ + .add_group("my_metrics") \ .counter("my_counter") -function_context - .get_metric_group() - .add_group("my_metrics_key", "my_metrics_value") +function_context \ + .get_metric_group() \ + .add_group("my_metrics_key", "my_metrics_value") \ .counter("my_counter") - ``` {{< /tab >}} {{< /tabs >}} @@ -195,9 +191,9 @@ specifying the value parameter. {{< tabs "d27cbda0-da5f-4a77-a02e-2e54e3156e31" >}} {{< tab "Python" >}} ```python -function_context - .get_metric_group() - .add_group("my_metrics_key", "my_metrics_value") +function_context \ + .get_metric_group() \ + .add_group("my_metrics_key", "my_metrics_value") \ .counter("my_counter") ``` {{< /tab >}} diff --git a/docs/content/docs/dev/python/table/operations/row_based_operations.md b/docs/content/docs/dev/python/table/operations/row_based_operations.md index 3e313b274bc..640d8a05ddd 100644 --- a/docs/content/docs/dev/python/table/operations/row_based_operations.md +++ b/docs/content/docs/dev/python/table/operations/row_based_operations.md @@ -44,7 +44,7 @@ table_env = TableEnvironment.create(env_settings) table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()), - DataTypes.FIELD("data", DataTypes.STRING())])) + DataTypes.FIELD("data", DataTypes.STRING())])) def func1(id: int, data: str) -> Row: return Row(id, data * 2) @@ -63,7 +63,7 @@ It also supports to take a Row object (containing all the columns of the input t ```python @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()), - DataTypes.FIELD("data", DataTypes.STRING())])) + DataTypes.FIELD("data", DataTypes.STRING())])) def func2(data: Row) -> Row: return Row(data.id, data.data * 2) @@ -86,7 +86,7 @@ It should be noted that the input type and output type should be pandas.DataFram ```python import pandas as pd @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()), - DataTypes.FIELD("data", DataTypes.STRING())]), + DataTypes.FIELD("data", DataTypes.STRING())]), func_type='pandas') def func3(data: pd.DataFrame) -> pd.DataFrame: res = pd.concat([data.id, data.data * 2], axis=1) @@ -226,7 +226,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), DataTypes.FIELD("b", DataTypes.INT())]), func_type="pandas") t.aggregate(pandas_udaf.alias("a", "b")) \ - .select(col('a'), col('b')).execute().print() + .select(col('a'), col('b')).execute().print() # the result is #+--------------------------------+-------------+ @@ -288,10 +288,11 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'), (3, 'Hi', 'hi'), (5, 'Hi2', 'hi'), (7, 'Hi', 'Hello'), - (2, 'Hi', 'Hello')], ['a', 'b', 'c']) + (2, 'Hi', 'Hello')], + ['a', 'b', 'c']) # call function "inline" without registration in Table API -result = t.group_by(t.b).flat_aggregate(top2).select(col('*')).execute().print() +result = t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print() # the result is: #+----+--------------------------------+----------------------+ diff --git a/docs/content/docs/dev/python/table/python_table_api_connectors.md b/docs/content/docs/dev/python/table/python_table_api_connectors.md index b3ee2481ab8..2d85d5c4024 100644 --- a/docs/content/docs/dev/python/table/python_table_api_connectors.md +++ b/docs/content/docs/dev/python/table/python_table_api_connectors.md @@ -138,6 +138,8 @@ The predefined data sinks support writing to Pandas DataFrame. PyFlink Tables support conversion to and from Pandas DataFrame. ```python +from pyflink.table.expressions import col + import pandas as pd import numpy as np @@ -155,6 +157,8 @@ pdf = table.to_pandas() be acceptable atomic types or acceptable composite types. ```python +from pyflink.table import DataTypes + table_env.from_elements([(1, 'Hi'), (2, 'Hello')]) # use the second parameter to specify custom field names diff --git a/docs/content/docs/dev/python/table/udfs/python_udfs.md b/docs/content/docs/dev/python/table/udfs/python_udfs.md index 8e4526c30a7..d4301a26576 100644 --- a/docs/content/docs/dev/python/table/udfs/python_udfs.md +++ b/docs/content/docs/dev/python/table/udfs/python_udfs.md @@ -41,7 +41,9 @@ The following example shows how to define your own Python hash code function, re Note that you can configure your scalar function via a constructor before it is registered: ```python -from pyflink.table.expressions import call +from pyflink.table.expressions import call, col +from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings +from pyflink.table.udf import ScalarFunction, udf class HashCode(ScalarFunction): def __init__(self): @@ -56,7 +58,7 @@ table_env = TableEnvironment.create(settings) hash_code = udf(HashCode(), result_type=DataTypes.BIGINT()) # use the Python function in Python Table API -my_table.select(my_table.string, my_table.bigint, hash_code(my_table.bigint), call(hash_code, my_table.bigint)) +my_table.select(col("string"), col("bigint"), hash_code(col("bigint")), call(hash_code, col("bigint"))) # use the Python function in SQL API table_env.create_temporary_function("hash_code", udf(HashCode(), result_type=DataTypes.BIGINT())) @@ -78,7 +80,8 @@ public class HashCode extends ScalarFunction { } } ''' -from pyflink.table.expressions import call +from pyflink.table.expressions import call, col +from pyflink.table import TableEnvironment, EnvironmentSettings settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(settings) @@ -87,7 +90,7 @@ table_env = TableEnvironment.create(settings) table_env.create_java_temporary_function("hash_code", "my.java.function.HashCode") # use the Java function in Python Table API -my_table.select(call('hash_code', my_table.string)) +my_table.select(call('hash_code', col("string"))) # use the Java function in SQL API table_env.sql_query("SELECT string, bigint, hash_code(string) FROM MyTable") @@ -129,10 +132,10 @@ add = udf(functools.partial(partial_add, k=1), result_type=DataTypes.BIGINT()) # register the Python function table_env.create_temporary_function("add", add) # use the function in Python Table API -my_table.select(call('add', my_table.a, my_table.b)) +my_table.select(call('add', col('a'), col('b'))) # You can also use the Python function in Python Table API directly -my_table.select(add(my_table.a, my_table.b)) +my_table.select(add(col('a'), col('b'))) ``` ## Table Functions @@ -145,6 +148,10 @@ The following example shows how to define your own Python multi emit function, r TableEnvironment, and call it in a query. ```python +from pyflink.table.expressions import col +from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings +from pyflink.table.udf import TableFunction, udtf + class Split(TableFunction): def eval(self, string): for s in string.split(" "): @@ -158,8 +165,8 @@ my_table = ... # type: Table, table schema: [a: String] split = udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()]) # use the Python Table Function in Python Table API -my_table.join_lateral(split(my_table.a).alias("word", "length")) -my_table.left_outer_join_lateral(split(my_table.a).alias("word", "length")) +my_table.join_lateral(split(col("a")).alias("word", "length")) +my_table.left_outer_join_lateral(split(col("a")).alias("word", "length")) # use the Python Table function in SQL API table_env.create_temporary_function("split", udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()])) @@ -186,7 +193,8 @@ public class Split extends TableFunction<Tuple2<String, Integer>> { } } ''' -from pyflink.table.expressions import call +from pyflink.table.expressions import call, col +from pyflink.table import TableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings) @@ -196,8 +204,8 @@ my_table = ... # type: Table, table schema: [a: String] table_env.create_java_temporary_function("split", "my.java.function.Split") # Use the table function in the Python Table API. "alias" specifies the field names of the table. -my_table.join_lateral(call('split', my_table.a).alias("word", "length")).select(my_table.a, col('word'), col('length')) -my_table.left_outer_join_lateral(call('split', my_table.a).alias("word", "length")).select(my_table.a, col('word'), col('length')) +my_table.join_lateral(call('split', col('a')).alias("word", "length")).select(col('a'), col('word'), col('length')) +my_table.left_outer_join_lateral(call('split', col('a')).alias("word", "length")).select(col('a'), col('word'), col('length')) # Register the python function. @@ -313,14 +321,14 @@ t = table_env.from_elements([(1, 2, "Lee"), (7, 8, "Lee")]).alias("value", "count", "name") # call function "inline" without registration in Table API -result = t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg")).execute() +result = t.group_by(col("name")).select(weighted_avg(col("value"), col("count")).alias("avg")).execute() result.print() # register function table_env.create_temporary_function("weighted_avg", WeightedAvg()) # call registered function in Table API -result = t.group_by(t.name).select(call("weighted_avg", t.value, t.count).alias("avg")).execute() +result = t.group_by(col("name")).select(call("weighted_avg", col("value"), col("count")).alias("avg")).execute() result.print() # register table @@ -489,7 +497,7 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'), ['a', 'b', 'c']) # call function "inline" without registration in Table API -t.group_by(t.b).flat_aggregate(top2).select(col('*')).execute().print() +t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print() # the result is: # b a diff --git a/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md b/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md index c3461b342f4..e31d79b54ed 100644 --- a/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md +++ b/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md @@ -49,6 +49,10 @@ The following example shows how to define your own vectorized Python scalar func and use it in a query: ```python +from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings +from pyflink.table.expressions import col +from pyflink.table.udf import udf + @udf(result_type=DataTypes.BIGINT(), func_type="pandas") def add(i, j): return i + j @@ -57,7 +61,7 @@ settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(settings) # use the vectorized Python scalar function in Python Table API -my_table.select(add(my_table.bigint, my_table.bigint)) +my_table.select(add(col("bigint"), col("bigint"))) # use the vectorized Python scalar function in SQL API table_env.create_temporary_function("add", add) @@ -80,6 +84,11 @@ The following example shows how to define your own vectorized Python aggregate f and use it in `GroupBy Aggregation`, `GroupBy Window Aggregation` and `Over Window Aggregation`: ```python +from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings +from pyflink.table.expressions import col, lit +from pyflink.table.udf import udaf +from pyflink.table.window import Tumble + @udaf(result_type=DataTypes.FLOAT(), func_type="pandas") def mean_udaf(v): return v.mean() @@ -90,16 +99,16 @@ table_env = TableEnvironment.create(settings) my_table = ... # type: Table, table schema: [a: String, b: BigInt, c: BigInt] # use the vectorized Python aggregate function in GroupBy Aggregation -my_table.group_by(my_table.a).select(my_table.a, mean_udaf(add(my_table.b))) +my_table.group_by(col('a')).select(col('a'), mean_udaf(col('b'))) # use the vectorized Python aggregate function in GroupBy Window Aggregation -tumble_window = Tumble.over(expr.lit(1).hours) \ - .on(expr.col("rowtime")) \ +tumble_window = Tumble.over(lit(1).hours) \ + .on(col("rowtime")) \ .alias("w") my_table.window(tumble_window) \ - .group_by("w") \ + .group_by(col("w")) \ .select(col('w').start, col('w').end, mean_udaf(col('b'))) # use the vectorized Python aggregate function in Over Window Aggregation @@ -110,7 +119,6 @@ table_env.sql_query(""" over (PARTITION BY a ORDER BY rowtime ROWS BETWEEN UNBOUNDED preceding AND UNBOUNDED FOLLOWING) FROM MyTable""") - ``` There are many ways to define a vectorized Python aggregate functions. @@ -118,6 +126,8 @@ The following examples show the different ways to define a vectorized Python agg which takes two columns of bigint as the inputs and returns the sum of the maximum of them as the result. ```python +from pyflink.table import DataTypes +from pyflink.table.udf import AggregateFunction, udaf # option 1: extending the base class `AggregateFunction` class MaxAdd(AggregateFunction): diff --git a/docs/content/docs/dev/table/common.md b/docs/content/docs/dev/table/common.md index 5fa3ead0509..3aad29f7e01 100644 --- a/docs/content/docs/dev/table/common.md +++ b/docs/content/docs/dev/table/common.md @@ -480,9 +480,9 @@ table_env = # see "Create a TableEnvironment" section orders = table_env.from_path("Orders") # compute revenue for all customers from France revenue = orders \ - .filter(orders.cCountry == 'FRANCE') \ - .group_by(orders.cID, orders.cName) \ - .select(orders.cID, orders.cName, orders.revenue.sum.alias('revSum')) + .filter(col('cCountry') == 'FRANCE') \ + .group_by(col('cID'), col('cName')) \ + .select(col('cID'), col('cName'), col('revenue').sum.alias('revSum')) # emit or convert Table # execute query @@ -849,7 +849,7 @@ t_env = StreamTableEnvironment.create(env) table1 = t_env.from_elements([(1, "hello")], ["count", "word"]) table2 = t_env.from_elements([(1, "hello")], ["count", "word"]) table = table1 \ - .where(table1.word.like('F%')) \ + .where(col('word').like('F%')) \ .union_all(table2) print(table.explain()) diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md index 28950e2e942..6dd85151de2 100644 --- a/docs/content/docs/dev/table/data_stream_api.md +++ b/docs/content/docs/dev/table/data_stream_api.md @@ -2883,7 +2883,7 @@ t_env = ... table = t_env.from_elements([("john", 35), ("sarah", 32)], DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), - DataTypes.FIELD("age", DataTypes.INT())])) + DataTypes.FIELD("age", DataTypes.INT())])) # Convert the Table into an append DataStream of Row by specifying the type information ds_row = t_env.to_append_stream(table, Types.ROW([Types.STRING(), Types.INT()])) diff --git a/docs/content/docs/dev/table/tableApi.md b/docs/content/docs/dev/table/tableApi.md index 91ab6cf49c1..08e3223b9e9 100644 --- a/docs/content/docs/dev/table/tableApi.md +++ b/docs/content/docs/dev/table/tableApi.md @@ -114,6 +114,7 @@ The following example shows how a Python Table API program is constructed and ho ```python from pyflink.table import * +from pyflink.table.expressions import col # environment configuration t_env = TableEnvironment.create( @@ -152,8 +153,7 @@ t_env.execute_sql(sink_ddl) # specify table program orders = t_env.from_path("Orders") # schema (a, b, c, rowtime) -orders.group_by("a").select(orders.a, orders.b.count.alias('cnt')).execute_insert("result").wait() - +orders.group_by(col("a")).select(col("a"), col("b").count.alias('cnt')).execute_insert("result").wait() ``` {{< /tab >}} @@ -208,14 +208,15 @@ val result: Table = orders ```python # specify table program from pyflink.table.expressions import col, lit +from pyflink.table.window import Tumble orders = t_env.from_path("Orders") # schema (a, b, c, rowtime) -result = orders.filter(orders.a.is_not_null & orders.b.is_not_null & orders.c.is_not_null) \ - .select(orders.a.lower_case.alias('a'), orders.b, orders.rowtime) \ - .window(Tumble.over(lit(1).hour).on(orders.rowtime).alias("hourly_window")) \ +result = orders.filter(col("a").is_not_null & col("b").is_not_null & col("c").is_not_null) \ + .select(col("a").lower_case.alias('a'), col("b"), col("rowtime")) \ + .window(Tumble.over(lit(1).hour).on(col("rowtime")).alias("hourly_window")) \ .group_by(col('hourly_window'), col('a')) \ - .select(col('a'), col('hourly_window').end.alias('hour'), b.avg.alias('avg_billing_amount')) + .select(col('a'), col('hourly_window').end.alias('hour'), col("b").avg.alias('avg_billing_amount')) ``` {{< /tab >}} @@ -370,7 +371,7 @@ Table result = orders.select($"a", $"c" as "d") {{< tab "Python" >}} ```python orders = t_env.from_path("Orders") -result = orders.select(orders.a, orders.c.alias('d')) +result = orders.select(col("a"), col("c").alias('d')) ``` {{< /tab >}} {{< /tabs >}} @@ -446,7 +447,7 @@ val result = orders.filter($"a" % 2 === 0) {{< tab "Python" >}} ```python orders = t_env.from_path("Orders") -result = orders.where(orders.a == 'red') +result = orders.where(col("a") == 'red') ``` {{< /tab >}} {{< /tabs >}} @@ -469,7 +470,7 @@ val result = orders.filter($"a" % 2 === 0) {{< tab "Python" >}} ```python orders = t_env.from_path("Orders") -result = orders.filter(orders.a == 'red') +result = orders.filter(col("a") == 'red') ``` {{< /tab >}} {{< /tabs >}} @@ -502,7 +503,7 @@ val result = orders.addColumns(concat($"c", "Sunny")) from pyflink.table.expressions import concat orders = t_env.from_path("Orders") -result = orders.add_columns(concat(orders.c, 'sunny')) +result = orders.add_columns(concat(col("c"), 'sunny')) ``` {{< /tab >}} {{< /tabs >}} @@ -533,7 +534,7 @@ val result = orders.addOrReplaceColumns(concat($"c", "Sunny") as "desc") from pyflink.table.expressions import concat orders = t_env.from_path("Orders") -result = orders.add_or_replace_columns(concat(orders.c, 'sunny').alias('desc')) +result = orders.add_or_replace_columns(concat(col("c"), 'sunny').alias('desc')) ``` {{< /tab >}} {{< /tabs >}} @@ -558,7 +559,7 @@ val result = orders.dropColumns($"b", $"c") {{< tab "Python" >}} ```python orders = t_env.from_path("Orders") -result = orders.drop_columns(orders.b, orders.c) +result = orders.drop_columns(col("b"), col("c")) ``` {{< /tab >}} {{< /tabs >}} @@ -586,7 +587,7 @@ val result = orders.renameColumns($"b" as "b2", $"c" as "c2") {{< tab "Python" >}} ```python orders = t_env.from_path("Orders") -result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2')) +result = orders.rename_columns(col("b").alias('b2'), col("c").alias('c2')) ``` {{< /tab >}} {{< /tabs >}} @@ -619,7 +620,7 @@ val result = orders.groupBy($"a").select($"a", $"b".sum().as("d")) {{< tab "Python" >}} ```python orders = t_env.from_path("Orders") -result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d')) +result = orders.group_by(col("a")).select(col("a"), col("b").sum.alias('d')) ``` {{< /tab >}} {{< /tabs >}} @@ -664,9 +665,9 @@ from pyflink.table.window import Tumble from pyflink.table.expressions import lit, col orders = t_env.from_path("Orders") -result = orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \ - .group_by(orders.a, col('w')) \ - .select(orders.a, col('w').start, col('w').end, orders.b.sum.alias('d')) +result = orders.window(Tumble.over(lit(5).minutes).on(col('rowtime')).alias("w")) \ + .group_by(col('a'), col('w')) \ + .select(col('a'), col('w').start, col('w').end, col('b').sum.alias('d')) ``` {{< /tab >}} {{< /tabs >}} @@ -720,10 +721,10 @@ from pyflink.table.window import Over from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE orders = t_env.from_path("Orders") -result = orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime) +result = orders.over_window(Over.partition_by(col("a")).order_by(col("rowtime")) .preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE) .alias("w")) \ - .select(orders.a, orders.b.avg.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w'))) + .select(col("a"), col("b").avg.over(col('w')), col("b").max.over(col('w')), col("b").min.over(col('w'))) ``` {{< /tab >}} {{< /tabs >}} @@ -794,22 +795,23 @@ val result = orders {{< tab "Python" >}} ```python from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE +from pyflink.table.window import Over, Tumble orders = t_env.from_path("Orders") # Distinct aggregation on group by -group_by_distinct_result = orders.group_by(orders.a) \ - .select(orders.a, orders.b.sum.distinct.alias('d')) +group_by_distinct_result = orders.group_by(col("a")) \ + .select(col("a"), col("b").sum.distinct.alias('d')) # Distinct aggregation on time window group by -group_by_window_distinct_result = orders.window( - Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(orders.a, col('w')) \ - .select(orders.a, orders.b.sum.distinct.alias('d')) +group_by_window_distinct_result = orders.window(Tumble.over(lit(5).minutes).on(col("rowtime")).alias("w")) \ + .group_by(col("a"), col('w')) \ + .select(col("a"), col("b").sum.distinct.alias('d')) # Distinct aggregation on over window result = orders.over_window(Over - .partition_by(orders.a) - .order_by(orders.rowtime) + .partition_by(col("a")) + .order_by(col("rowtime")) .preceding(UNBOUNDED_RANGE) .alias("w")) \ - .select(orders.a, orders.b.avg.distinct.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w'))) + .select(col("a"), col("b").avg.distinct.over(col('w')), col("b").max.over(col('w')), col("b").min.over(col('w'))) ``` {{< /tab >}} {{< /tabs >}} @@ -910,7 +912,7 @@ from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f')) -result = left.join(right).where(left.a == right.d).select(left.a, left.b, right.e) +result = left.join(right).where(col('a') == col('d')).select(col('a'), col('b'), col('e')) ``` {{< /tab >}} {{< /tabs >}} @@ -956,9 +958,9 @@ from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f')) -left_outer_result = left.left_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) -right_outer_result = left.right_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) -full_outer_result = left.full_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) +left_outer_result = left.left_outer_join(right, col('a') == col('d')).select(col('a'), col('b'), col('e')) +right_outer_result = left.right_outer_join(right, col('a') == col('d')).select(col('a'), col('b'), col('e')) +full_outer_result = left.full_outer_join(right, col('a') == col('d')).select(col('a'), col('b'), col('e')) ``` {{< /tab >}} {{< /tabs >}} @@ -1006,8 +1008,8 @@ from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'), col('rowtime1')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'), col('rowtime2')) -joined_table = left.join(right).where((left.a == right.d) & (left.rowtime1 >= right.rowtime2 - lit(1).second) & (left.rowtime1 <= right.rowtime2 + lit(2).seconds)) -result = joined_table.select(joined_table.a, joined_table.b, joined_table.e, joined_table.rowtime1) +joined_table = left.join(right).where((col('a') == col('d')) & (col('rowtime1') >= col('rowtime2') - lit(1).second) & (col('rowtime1') <= col('rowtime2') + lit(2).seconds)) +result = joined_table.select(col('a'), col('b'), col('e'), col('rowtime1')) ``` {{< /tab >}} {{< /tabs >}} @@ -1053,8 +1055,8 @@ def split(x): # join orders = t_env.from_path("Orders") -joined_table = orders.join_lateral(split(orders.c).alias("s", "t", "v")) -result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, joined_table.t, joined_table.v) +joined_table = orders.join_lateral(split(col('c')).alias("s", "t", "v")) +result = joined_table.select(col('a'), col('b'), col('s'), col('t'), col('v')) ``` {{< /tab >}} {{< /tabs >}} @@ -1102,8 +1104,8 @@ def split(x): # join orders = t_env.from_path("Orders") -joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s", "t", "v")) -result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, joined_table.t, joined_table.v) +joined_table = orders.left_outer_join_lateral(split(col('c')).alias("s", "t", "v")) +result = joined_table.select(col('a'), col('b'), col('s'), col('t'), col('v')) ``` {{< /tab >}} {{< /tabs >}} @@ -1369,7 +1371,7 @@ val result = left.select($"a", $"b", $"c").where($"a".in(right)) left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('a')) -result = left.select(left.a, left.b, left.c).where(left.a.in_(right)) +result = left.select(col('a'), col('b'), col('c')).where(col('a').in_(right)) ``` {{< /tab >}} {{< /tabs >}} @@ -1389,17 +1391,17 @@ Similar to a SQL `ORDER BY` clause. Returns records globally sorted across all p {{< tabs "orderby" >}} {{< tab "Java" >}} ```java -Table result = in.orderBy($("a").asc()); +Table result = tab.orderBy($("a").asc()); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -val result = in.orderBy($"a".asc) +val result = tab.orderBy($"a".asc) ``` {{< /tab >}} {{< tab "Python" >}} ```python -result = in.order_by(in.a.asc) +result = tab.order_by(col('a').asc) ``` {{< /tab >}} {{< /tabs >}} @@ -1439,13 +1441,13 @@ val result3: Table = in.orderBy($"a".asc).offset(10).fetch(5) {{< tab "Python" >}} ```python # returns the first 5 records from the sorted result -result1 = table.order_by(table.a.asc).fetch(5) +result1 = table.order_by(col('a').asc).fetch(5) # skips the first 3 records and returns all following records from the sorted result -result2 = table.order_by(table.a.asc).offset(3) +result2 = table.order_by(col('a').asc).offset(3) # skips the first 10 records and returns the next 5 records from the sorted result -result3 = table.order_by(table.a.asc).offset(10).fetch(5) +result3 = table.order_by(col('a').asc).offset(10).fetch(5) ``` {{< /tab >}} {{< /tabs >}} @@ -1517,7 +1519,7 @@ The following example shows how to define a window aggregation on a table. ```python # define window with alias w, group the table by window w, then aggregate table = input.window([w: GroupWindow].alias("w")) \ - .group_by(col('w')).select(input.b.sum) + .group_by(col('w')).select(col('b').sum) ``` {{< /tab >}} {{< /tabs >}} @@ -1553,7 +1555,7 @@ The following example shows how to define a window aggregation with additional g # define window with alias w, group the table by attribute a and window w, # then aggregate table = input.window([w: GroupWindow].alias("w")) \ - .group_by(col('w'), input.a).select(input.b.sum) + .group_by(col('w'), col('a')).select(col('b').sum) ``` {{< /tab >}} {{< /tabs >}} @@ -1582,8 +1584,8 @@ val table = input # define window with alias w, group the table by attribute a and window w, # then aggregate and add window start, end, and rowtime timestamps table = input.window([w: GroupWindow].alias("w")) \ - .group_by(col('w'), input.a) \ - .select(input.a, col('w').start, col('w').end, col('w').rowtime, input.b.count) + .group_by(col('w'), col('a')) \ + .select(col('a'), col('w').start, col('w').end, col('w').rowtime, col('b').count) ``` {{< /tab >}} {{< /tabs >}} @@ -1989,7 +1991,7 @@ val table = input ```python # define over window with alias w and aggregate over the over window w table = input.over_window([w: OverWindow].alias("w")) \ - .select(input.a, input.b.sum.over(col('w')), input.c.min.over(col('w'))) + .select(col('a'), col('b').sum.over(col('w')), col('c').min.over(col('w'))) ``` {{< /tab >}} {{< /tabs >}} @@ -2216,9 +2218,9 @@ func = udf(map_function, result_type=DataTypes.ROW( table = input.map(func).alias('a', 'b') # map operation with a python vectorized scalar function -pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW( - [DataTypes.FIELD("a", DataTypes.BIGINT()), - DataTypes.FIELD("b", DataTypes.BIGINT()))]), +pandas_func = udf(lambda x: x * 2, + result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT()), + DataTypes.FIELD("b", DataTypes.BIGINT())]), func_type='pandas') table = input.map(pandas_func).alias('a', 'b') @@ -2450,9 +2452,9 @@ agg = udaf(function, name=str(function.__class__.__name__)) # aggregate with a python general aggregate function -result = t.group_by(t.a) \ +result = t.group_by(col('a')) \ .aggregate(agg.alias("c", "d")) \ - select(col('a'), col('c'), col('d')) + .select(col('a'), col('c'), col('d')) # aggregate with a python vectorized aggregate function pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), @@ -2461,7 +2463,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), DataTypes.FIELD("b", DataTypes.INT())]), func_type="pandas") t.aggregate(pandas_udaf.alias("a", "b")) \ - select(col('a'), col('b')) + .select(col('a'), col('b')) ``` {{< /tab >}} @@ -2503,16 +2505,18 @@ val table = input ```python from pyflink.table import DataTypes from pyflink.table.udf import AggregateFunction, udaf +from pyflink.table.expressions import col, lit +from pyflink.table.window import Tumble pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), result_type=DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.FLOAT()), DataTypes.FIELD("b", DataTypes.INT())]), func_type="pandas") -tumble_window = Tumble.over(expr.lit(1).hours) \ - .on(expr.col("rowtime")) \ +tumble_window = Tumble.over(lit(1).hours) \ + .on(col("rowtime")) \ .alias("w") -t.select(t.b, t.rowtime) \ +t.select(col('b'), col('rowtime')) \ .window(tumble_window) \ .group_by(col("w")) \ .aggregate(pandas_udaf.alias("d", "e")) \ @@ -2666,6 +2670,7 @@ Similar to a **GroupBy Aggregation**. Groups the rows on the grouping keys with from pyflink.common import Row from pyflink.table.udf import TableAggregateFunction, udtaf from pyflink.table import DataTypes +from pyflink.table.expressions import col class Top2(TableAggregateFunction): @@ -2701,13 +2706,13 @@ t = t_env.from_elements([(1, 'Hi', 'Hello'), (3, 'Hi', 'hi'), (5, 'Hi2', 'hi'), (7, 'Hi', 'Hello'), - (2, 'Hi', 'Hello')], ['a', 'b', 'c']) -result = t.select(t.a, t.c) \ - .group_by(t.c) \ + (2, 'Hi', 'Hello')], + ['a', 'b', 'c']) +result = t.select(col('a'), col('c')) \ + .group_by(col('c')) \ .flat_aggregate(mytop) \ - .select(t.a) \ + .select(col('a')) \ .flat_aggregate(mytop.alias("b")) - ``` {{< /tab >}} {{< /tabs >}} diff --git a/flink-python/pyflink/examples/table/mixing_use_of_datastream_and_table.py b/flink-python/pyflink/examples/table/mixing_use_of_datastream_and_table.py index dc518f4e3d2..3dc9b7d82dd 100644 --- a/flink-python/pyflink/examples/table/mixing_use_of_datastream_and_table.py +++ b/flink-python/pyflink/examples/table/mixing_use_of_datastream_and_table.py @@ -21,6 +21,7 @@ import sys from pyflink.common import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import (DataTypes, TableDescriptor, Schema, StreamTableEnvironment) +from pyflink.table.expressions import col from pyflink.table.udf import udf @@ -55,7 +56,7 @@ def mixing_use_of_datastream_and_table(): # perform table api operations table = t_env.from_path("source") - table = table.select(table.id, length(table.data)) + table = table.select(col('id'), length(col('data'))) # convert table to datastream and perform datastream api operations ds = t_env.to_data_stream(table) diff --git a/flink-python/pyflink/examples/table/pandas/pandas_udaf.py b/flink-python/pyflink/examples/table/pandas/pandas_udaf.py index 4b884dd4803..72febe590e7 100644 --- a/flink-python/pyflink/examples/table/pandas/pandas_udaf.py +++ b/flink-python/pyflink/examples/table/pandas/pandas_udaf.py @@ -75,8 +75,8 @@ def pandas_udaf(): # define the tumble window operation table = table.window(Tumble.over(lit(5).seconds).on(col("ts")).alias("w")) \ - .group_by(table.name, col('w')) \ - .select(table.name, mean_udaf(table.price), col("w").start, col("w").end) + .group_by(col('name'), col('w')) \ + .select(col('name'), mean_udaf(col('price')), col("w").start, col("w").end) # submit for execution table.execute_insert('sink') \ diff --git a/flink-python/pyflink/examples/table/process_json_data.py b/flink-python/pyflink/examples/table/process_json_data.py index eeb6f7c88cd..b410d11c58e 100644 --- a/flink-python/pyflink/examples/table/process_json_data.py +++ b/flink-python/pyflink/examples/table/process_json_data.py @@ -20,6 +20,7 @@ import sys from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes, TableDescriptor, Schema) +from pyflink.table.expressions import col def process_json_data(): @@ -45,7 +46,7 @@ def process_json_data(): .build()) .build()) - table = table.select(table.id, table.data.json_value('$.addr.country', DataTypes.STRING())) + table = table.select(col('id'), col('data').json_value('$.addr.country', DataTypes.STRING())) # execute table.execute_insert('sink') \ diff --git a/flink-python/pyflink/examples/table/process_json_data_with_udf.py b/flink-python/pyflink/examples/table/process_json_data_with_udf.py index 31969e2f675..934e73a68d8 100644 --- a/flink-python/pyflink/examples/table/process_json_data_with_udf.py +++ b/flink-python/pyflink/examples/table/process_json_data_with_udf.py @@ -21,6 +21,7 @@ import sys from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes, TableDescriptor, Schema) +from pyflink.table.expressions import col from pyflink.table.udf import udf @@ -54,7 +55,7 @@ def process_json_data_with_udf(): json_data['tel'] += 1 return json.dumps(json_data) - table = table.select(table.id, update_tel(table.data)) + table = table.select(col('id'), update_tel(col('data'))) # execute table.execute_insert('sink') \ diff --git a/flink-python/pyflink/examples/table/windowing/over_window.py b/flink-python/pyflink/examples/table/windowing/over_window.py index 5fd736b529a..d958e75ddc1 100644 --- a/flink-python/pyflink/examples/table/windowing/over_window.py +++ b/flink-python/pyflink/examples/table/windowing/over_window.py @@ -73,7 +73,7 @@ def tumble_window_demo(): .preceding(row_interval(2)) .following(CURRENT_ROW) .alias('w')) \ - .select(table.name, table.price.max.over(col('w'))) + .select(col('name'), col('price').max.over(col('w'))) # submit for execution table.execute_insert('sink') \ diff --git a/flink-python/pyflink/examples/table/windowing/session_window.py b/flink-python/pyflink/examples/table/windowing/session_window.py index 49e46809389..e7506b0f8b3 100644 --- a/flink-python/pyflink/examples/table/windowing/session_window.py +++ b/flink-python/pyflink/examples/table/windowing/session_window.py @@ -68,8 +68,8 @@ def session_window_demo(): # define the session window operation table = table.window(Session.with_gap(lit(5).seconds).on(col("ts")).alias("w")) \ - .group_by(table.name, col('w')) \ - .select(table.name, table.price.sum, col("w").start, col("w").end) + .group_by(col('name'), col('w')) \ + .select(col('name'), col('price').sum, col("w").start, col("w").end) # submit for execution table.execute_insert('sink') \ diff --git a/flink-python/pyflink/examples/table/windowing/sliding_window.py b/flink-python/pyflink/examples/table/windowing/sliding_window.py index fc460c67df4..10ba068b1a5 100644 --- a/flink-python/pyflink/examples/table/windowing/sliding_window.py +++ b/flink-python/pyflink/examples/table/windowing/sliding_window.py @@ -70,8 +70,8 @@ def sliding_window_demo(): # define the sliding window operation table = table.window(Slide.over(lit(5).seconds).every(lit(2).seconds).on(col("ts")).alias("w"))\ - .group_by(table.name, col('w')) \ - .select(table.name, table.price.sum, col("w").start, col("w").end) + .group_by(col('name'), col('w')) \ + .select(col('name'), col('price').sum, col("w").start, col("w").end) # submit for execution table.execute_insert('sink') \ diff --git a/flink-python/pyflink/examples/table/windowing/tumble_window.py b/flink-python/pyflink/examples/table/windowing/tumble_window.py index c7787473451..ff5bb99516e 100644 --- a/flink-python/pyflink/examples/table/windowing/tumble_window.py +++ b/flink-python/pyflink/examples/table/windowing/tumble_window.py @@ -70,8 +70,8 @@ def tumble_window_demo(): # define the tumble window operation table = table.window(Tumble.over(lit(5).seconds).on(col("ts")).alias("w")) \ - .group_by(table.name, col('w')) \ - .select(table.name, table.price.sum, col("w").start, col("w").end) + .group_by(col('name'), col('w')) \ + .select(col('name'), col('price').sum, col("w").start, col("w").end) # submit for execution table.execute_insert('sink') \ diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index fba2ea91c82..eaa5fe908f7 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -900,7 +900,7 @@ class Expression(Generic[T]): :: >>> tab.where(col("a").in_(1, 2, 3)) - >>> table_a.where(col("x").in_(table_b.select("y"))) + >>> table_a.where(col("x").in_(table_b.select(col("y")))) """ from pyflink.table import Table if isinstance(first_element_or_table, Table): diff --git a/flink-python/pyflink/table/schema.py b/flink-python/pyflink/table/schema.py index 5903557783e..d8856bf1669 100644 --- a/flink-python/pyflink/table/schema.py +++ b/flink-python/pyflink/table/schema.py @@ -139,9 +139,9 @@ class Schema(object): Example: :: - >>> Schema.new_builder(). - ... column_by_expression("ts", "orig_ts - INTERVAL '60' MINUTE"). - ... column_by_metadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp") + >>> Schema.new_builder() \\ + ... .column_by_expression("ts", "orig_ts - INTERVAL '60' MINUTE") \\ + ... .column_by_metadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp") :param column_name: Column name :param expr: Computation of the column diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py index 3068ef4117f..a0d02b90dd2 100644 --- a/flink-python/pyflink/table/table.py +++ b/flink-python/pyflink/table/table.py @@ -84,7 +84,7 @@ class Table(object): Example: :: - >>> from pyflink.table import TableEnvironment + >>> from pyflink.table import EnvironmentSettings, TableEnvironment >>> from pyflink.table.expressions import * >>> env_settings = EnvironmentSettings.in_streaming_mode() >>> t_env = TableEnvironment.create(env_settings) @@ -122,9 +122,9 @@ class Table(object): Example: :: - >>> from pyflink.table import expressions as expr - >>> tab.select(tab.key, expr.concat(tab.value, 'hello')) - >>> tab.select(expr.col('key'), expr.concat(expr.col('value'), 'hello')) + >>> from pyflink.table.expressions import col, concat + >>> tab.select(tab.key, concat(tab.value, 'hello')) + >>> tab.select(col('key'), concat(col('value'), 'hello')) :return: The result table. """ @@ -156,7 +156,7 @@ class Table(object): Example: :: - >>> tab.filter(tab.name == 'Fred') + >>> tab.filter(col('name') == 'Fred') :param predicate: Predicate expression string. :return: The result table. @@ -171,7 +171,7 @@ class Table(object): Example: :: - >>> tab.where(tab.name == 'Fred') + >>> tab.where(col('name') == 'Fred') :param predicate: Predicate expression string. :return: The result table. @@ -186,7 +186,7 @@ class Table(object): Example: :: - >>> tab.group_by(tab.key).select(tab.key, tab.value.avg) + >>> tab.group_by(col('key')).select(col('key'), col('value').avg) :param fields: Group keys. :return: The grouped table. @@ -200,7 +200,7 @@ class Table(object): Example: :: - >>> tab.select(tab.key, tab.value).distinct() + >>> tab.select(col('key'), col('value')).distinct() :return: The result table. """ @@ -220,9 +220,8 @@ class Table(object): Example: :: - >>> left.join(right).where((left.a == right.b) && (left.c > 3)) - >>> left.join(right).where("a = b && c > 3") - >>> left.join(right, left.a == right.b) + >>> left.join(right).where((col('a') == col('b')) && (col('c') > 3)) + >>> left.join(right, col('a') == col('b')) :param right: Right table. :param join_predicate: Optional, the join predicate expression string. @@ -251,7 +250,7 @@ class Table(object): :: >>> left.left_outer_join(right) - >>> left.left_outer_join(right, left.a == right.b) + >>> left.left_outer_join(right, col('a') == col('b')) :param right: Right table. :param join_predicate: Optional, the join predicate expression string. @@ -279,7 +278,7 @@ class Table(object): Example: :: - >>> left.right_outer_join(right, left.a == right.b) + >>> left.right_outer_join(right, col('a') == col('b')) :param right: Right table. :param join_predicate: The join predicate expression string. @@ -304,7 +303,7 @@ class Table(object): Example: :: - >>> left.full_outer_join(right, left.a == right.b) + >>> left.full_outer_join(right, col('a') == col('b')) :param right: Right table. :param join_predicate: The join predicate expression string. @@ -324,11 +323,10 @@ class Table(object): Example: :: + >>> from pyflink.table.expressions import * >>> t_env.create_java_temporary_system_function("split", ... "java.table.function.class.name") - - >>> from pyflink.table import expressions as expr - >>> tab.join_lateral(expr.call('split', ' ').alias('b'), expr.col('a') == expr.col('b')) + >>> tab.join_lateral(call('split', ' ').alias('b'), col('a') == col('b')) >>> # take all the columns as inputs >>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) ... def split_row(row: Row): @@ -374,8 +372,9 @@ class Table(object): >>> t_env.create_java_temporary_system_function("split", ... "java.table.function.class.name") - >>> from pyflink.table import expressions as expr - >>> tab.left_outer_join_lateral(expr.call('split', ' ').alias('b')) + >>> from pyflink.table.expressions import * + >>> tab.left_outer_join_lateral(call('split', ' ').alias('b')) + >>> # take all the columns as inputs >>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) ... def split_row(row: Row): @@ -539,7 +538,7 @@ class Table(object): Example: :: - >>> tab.order_by(tab.name.desc) + >>> tab.order_by(col('name').desc) For unbounded tables, this operation requires a sorting on a time attribute or a subsequent fetch operation. @@ -561,9 +560,9 @@ class Table(object): :: # skips the first 3 rows and returns all following rows. - >>> tab.order_by(tab.name.desc).offset(3) + >>> tab.order_by(col('name').desc).offset(3) # skips the first 10 rows and returns the next 5 rows. - >>> tab.order_by(tab.name.desc).offset(10).fetch(5) + >>> tab.order_by(col('name').desc).offset(10).fetch(5) For unbounded tables, this operation requires a subsequent fetch operation. @@ -585,12 +584,12 @@ class Table(object): Returns the first 3 records. :: - >>> tab.order_by(tab.name.desc).fetch(3) + >>> tab.order_by(col('name').desc).fetch(3) Skips the first 10 rows and returns the next 5 rows. :: - >>> tab.order_by(tab.name.desc).offset(10).fetch(5) + >>> tab.order_by(col('name').desc).offset(10).fetch(5) :param fetch: The number of records to return. Fetch must be >= 0. :return: The result table. @@ -646,10 +645,10 @@ class Table(object): Example: :: - >>> from pyflink.table import expressions as expr - >>> tab.window(Tumble.over(expr.lit(10).minutes).on(tab.rowtime).alias('w')) \\ + >>> from pyflink.table.expressions import col, lit + >>> tab.window(Tumble.over(lit(10).minutes).on(col('rowtime')).alias('w')) \\ ... .group_by(col('w')) \\ - ... .select(tab.a.sum.alias('a'), + ... .select(col('a').sum.alias('a'), ... col('w').start.alias('b'), ... col('w').end.alias('c'), ... col('w').rowtime.alias('d')) @@ -671,10 +670,10 @@ class Table(object): Example: :: - >>> from pyflink.table import expressions as expr - >>> tab.over_window(Over.partition_by(tab.c).order_by(tab.rowtime) \\ + >>> from pyflink.table.expressions import col, lit + >>> tab.over_window(Over.partition_by(col('c')).order_by(col('rowtime')) \\ ... .preceding(lit(10).seconds).alias("ow")) \\ - ... .select(tab.c, tab.b.count.over(col('ow'), tab.e.sum.over(col('ow')))) + ... .select(col('c'), col('b').count.over(col('ow'), col('e').sum.over(col('ow')))) .. note:: @@ -703,8 +702,8 @@ class Table(object): Example: :: - >>> from pyflink.table import expressions as expr - >>> tab.add_columns((tab.a + 1).alias('a1'), expr.concat(tab.b, 'sunny').alias('b1')) + >>> from pyflink.table.expressions import col, concat + >>> tab.add_columns((col('a') + 1).alias('a1'), concat(col('b'), 'sunny').alias('b1')) :param fields: Column list string. :return: The result table. @@ -721,9 +720,9 @@ class Table(object): Example: :: - >>> from pyflink.table import expressions as expr - >>> tab.add_or_replace_columns((tab.a + 1).alias('a1'), - ... expr.concat(tab.b, 'sunny').alias('b1')) + >>> from pyflink.table.expressions import col, concat + >>> tab.add_or_replace_columns((col('a') + 1).alias('a1'), + ... concat(col('b'), 'sunny').alias('b1')) :param fields: Column list string. :return: The result table. @@ -739,7 +738,7 @@ class Table(object): Example: :: - >>> tab.rename_columns(tab.a.alias('a1'), tab.b.alias('b1')) + >>> tab.rename_columns(col('a').alias('a1'), col('b').alias('b1')) :param fields: Column list string. :return: The result table. @@ -754,7 +753,7 @@ class Table(object): Example: :: - >>> tab.drop_columns(tab.a, tab.b) + >>> tab.drop_columns(col('a'), col('b')) :param fields: Column list string. :return: The result table. @@ -771,7 +770,7 @@ class Table(object): >>> add = udf(lambda x: Row(x + 1, x * x), result_type=DataTypes.Row( ... [DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b", DataTypes.INT())])) - >>> tab.map(add(tab.a)).alias("a, b") + >>> tab.map(add(col('a'))).alias("a", "b") >>> # take all the columns as inputs >>> identity = udf(lambda row: row, result_type=DataTypes.Row( ... [DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b", DataTypes.INT())])) @@ -799,7 +798,7 @@ class Table(object): ... def split(x, string): ... for s in string.split(","): ... yield x, s - >>> tab.flat_map(split(tab.a, table.b)) + >>> tab.flat_map(split(col('a'), col('b'))) >>> # take all the columns as inputs >>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) ... def split_row(row: Row): @@ -832,7 +831,7 @@ class Table(object): ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") - >>> tab.aggregate(agg(tab.a).alias("a", "b")).select(col('a'), col('b')) + >>> tab.aggregate(agg(col('a')).alias("a", "b")).select(col('a'), col('b')) >>> # take all the columns as inputs >>> # pd is a Pandas.DataFrame >>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.a.max()), @@ -840,7 +839,7 @@ class Table(object): ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") - >>> tab.aggregate(agg.alias("a, b")).select(col('a'), col('b')) + >>> tab.aggregate(agg.alias("a", "b")).select(col('a'), col('b')) :param func: user-defined aggregate function. :return: The result table. @@ -869,7 +868,7 @@ class Table(object): :: >>> table_agg = udtaf(MyTableAggregateFunction()) - >>> tab.flat_aggregate(table_agg(tab.a).alias("a", "b")).select(col('a'), col('b')) + >>> tab.flat_aggregate(table_agg(col('a')).alias("a", "b")).select(col('a'), col('b')) >>> # take all the columns as inputs >>> class Top2(TableAggregateFunction): ... def emit_value(self, accumulator): @@ -924,7 +923,7 @@ class Table(object): >>> pdf = pd.DataFrame(np.random.rand(1000, 2)) >>> table = table_env.from_pandas(pdf, ["a", "b"]) - >>> table.filter(table.a > 0.5).to_pandas() + >>> table.filter(col('a') > 0.5).to_pandas() :return: the result pandas DataFrame. @@ -1095,7 +1094,7 @@ class GroupedTable(object): Example: :: - >>> tab.group_by(tab.key).select(tab.key, tab.value.avg.alias('average')) + >>> tab.group_by(col('key')).select(col('key'), col('value').avg.alias('average')) :param fields: Expression string that contains group keys and aggregate function calls. :return: The result table. @@ -1116,7 +1115,7 @@ class GroupedTable(object): ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") - >>> tab.group_by(tab.a).aggregate(agg(tab.b).alias("c", "d")).select( + >>> tab.group_by(col('a')).aggregate(agg(col('b')).alias("c", "d")).select( ... col('a'), col('c'), col('d')) >>> # take all the columns as inputs >>> # pd is a Pandas.DataFrame @@ -1125,7 +1124,7 @@ class GroupedTable(object): ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") - >>> tab.group_by(tab.a).aggregate(agg.alias("a, b")).select(col('a'), col('b')) + >>> tab.group_by(col('a')).aggregate(agg.alias("a", "b")).select(col('a'), col('b')) :param func: user-defined aggregate function. :return: The result table. @@ -1154,7 +1153,7 @@ class GroupedTable(object): :: >>> table_agg = udtaf(MyTableAggregateFunction()) - >>> tab.group_by(tab.c).flat_aggregate(table_agg(tab.a).alias("a")).select( + >>> tab.group_by(col('c')).flat_aggregate(table_agg(col('a')).alias("a")).select( ... col('c'), col('a')) >>> # take all the columns as inputs >>> class Top2(TableAggregateFunction): @@ -1181,7 +1180,9 @@ class GroupedTable(object): ... return DataTypes.ROW( ... [DataTypes.FIELD("a", DataTypes.BIGINT())]) >>> top2 = udtaf(Top2()) - >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a", "b")).select(col('a'), col('b')) + >>> tab.group_by(col('c')) \\ + ... .flat_aggregate(top2.alias("a", "b")) \\ + ... .select(col('a'), col('b')) :param func: user-defined table aggregate function. :return: The result table. @@ -1224,10 +1225,10 @@ class GroupWindowedTable(object): Example: :: - >>> from pyflink.table import expressions as expr - >>> tab.window(Tumble.over(expr.lit(10).minutes).on(tab.rowtime).alias('w')) \\ + >>> from pyflink.table.expressions import col, lit + >>> tab.window(Tumble.over(lit(10).minutes).on(col('rowtime')).alias('w')) \\ ... .group_by(col('w')) \\ - ... .select(tab.a.sum.alias('a'), + ... .select(col('a').sum.alias('a'), ... col('w').start.alias('b'), ... col('w').end.alias('c'), ... col('w').rowtime.alias('d')) @@ -1285,9 +1286,9 @@ class WindowGroupedTable(object): ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") - >>> window_grouped_table.group_by("w") \ - ... .aggregate(agg(window_grouped_table.b) \ - ... .alias("c", "d")) \ + >>> window_grouped_table.group_by(col("w")) \ + ... .aggregate(agg(col('b'))) \ + ... .alias("c", "d") \ ... .select(col('c'), col('d')) >>> # take all the columns as inputs >>> # pd is a Pandas.DataFrame @@ -1296,7 +1297,7 @@ class WindowGroupedTable(object): ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") - >>> window_grouped_table.group_by("w, a").aggregate(agg_row) + >>> window_grouped_table.group_by(col("w"), col("a")).aggregate(agg_row) :param func: user-defined aggregate function. :return: The result table. @@ -1378,7 +1379,7 @@ class AggregatedTable(object): ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") - >>> tab.aggregate(agg(tab.a).alias("a", "b")).select(col('a'), col('b')) + >>> tab.aggregate(agg(col('a')).alias("a", "b")).select(col('a'), col('b')) >>> # take all the columns as inputs >>> # pd is a Pandas.DataFrame >>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()), @@ -1386,7 +1387,7 @@ class AggregatedTable(object): ... [DataTypes.FIELD("a", DataTypes.FLOAT()), ... DataTypes.FIELD("b", DataTypes.INT())]), ... func_type="pandas") - >>> tab.group_by(tab.a).aggregate(agg.alias("a, b")).select(col('a'), col('b')) + >>> tab.group_by(col('a')).aggregate(agg.alias("a", "b")).select(col('a'), col('b')) :param fields: Expression string. :return: The result table. @@ -1413,7 +1414,7 @@ class FlatAggregateTable(object): :: >>> table_agg = udtaf(MyTableAggregateFunction()) - >>> tab.flat_aggregate(table_agg(tab.a).alias("a", "b")).select(col('a'), col('b')) + >>> tab.flat_aggregate(table_agg(col('a')).alias("a", "b")).select(col('a'), col('b')) >>> # take all the columns as inputs >>> class Top2(TableAggregateFunction): ... def emit_value(self, accumulator): @@ -1439,7 +1440,9 @@ class FlatAggregateTable(object): ... return DataTypes.ROW( ... [DataTypes.FIELD("a", DataTypes.BIGINT())]) >>> top2 = udtaf(Top2()) - >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a", "b")).select(col('a'), col('b')) + >>> tab.group_by(col('c')) \\ + ... .flat_aggregate(top2.alias("a", "b")) \\ + ... .select(col('a'), col('b')) :param fields: Expression string. :return: The result table. diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py index 18e0eba4dcb..948f71930d4 100644 --- a/flink-python/pyflink/table/table_config.py +++ b/flink-python/pyflink/table/table_config.py @@ -211,8 +211,7 @@ class TableConfig(object): Example: :: - >>> table_config = TableConfig() \\ - ... .set_idle_state_retention(datetime.timedelta(days=1)) + >>> table_config.set_idle_state_retention(datetime.timedelta(days=1)) .. note:: diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 3a06c04908c..5f29035c10c 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -1934,7 +1934,7 @@ class StreamTableEnvironment(TableEnvironment): ... .column("id", DataTypes.BIGINT()) ... .column("payload", DataTypes.ROW( ... [DataTypes.FIELD("name", DataTypes.STRING()), - ... DataTypes.FIELD("age", DataTypes.INT())])) + ... DataTypes.FIELD("age", DataTypes.INT())])) ... .build()) Note that the type system of the table ecosystem is richer than the one of the DataStream diff --git a/flink-python/pyflink/table/window.py b/flink-python/pyflink/table/window.py index fff726e6120..4b35bd4fd88 100644 --- a/flink-python/pyflink/table/window.py +++ b/flink-python/pyflink/table/window.py @@ -60,12 +60,10 @@ class Tumble(object): Example: :: - >>> from pyflink.table import expressions as expr - >>> Tumble.over(expr.lit(10).minutes) - ... .on(expr.col("rowtime")) + >>> from pyflink.table.expressions import col, lit + >>> Tumble.over(lit(10).minutes) \\ + ... .on(col("rowtime")) \\ ... .alias("w") - - >>> Tumble.over("10.minutes").on("rowtime").alias("w") """ @classmethod @@ -139,9 +137,9 @@ class Session(object): Example: :: - >>> from pyflink.table import expressions as expr - >>> Session.with_gap(expr.lit(10).minutes) - ... .on(expr.col("rowtime")) + >>> from pyflink.table.expressions import col, lit + >>> Session.with_gap(lit(10).minutes) \\ + ... .on(col("rowtime")) \\ ... .alias("w") """ @@ -221,10 +219,10 @@ class Slide(object): Example: :: - >>> from pyflink.table import expressions as expr - >>> Slide.over(expr.lit(10).minutes) - ... .every(expr.lit(5).minutes) - ... .on(expr.col("rowtime")) + >>> from pyflink.table.expressions import col, lit + >>> Slide.over(lit(10).minutes) \\ + ... .every(lit(5).minutes) \\ + ... .on(col("rowtime")) \\ ... .alias("w") """ @@ -327,10 +325,10 @@ class Over(object): Example: :: - >>> from pyflink.table import expressions as expr + >>> from pyflink.table.expressions import col, UNBOUNDED_RANGE >>> Over.partition_by(col("a")) \\ ... .order_by(col("rowtime")) \\ - ... .preceding(expr.UNBOUNDED_RANGE) \\ + ... .preceding(UNBOUNDED_RANGE) \\ ... .alias("w") """