This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 992836e3809 [python][docs] Update documentation to remove deprecated 
API examples
992836e3809 is described below

commit 992836e380998fb1b210d54394f318c5f6060a93
Author: Dian Fu <dia...@apache.org>
AuthorDate: Thu Apr 28 11:08:49 2022 +0800

    [python][docs] Update documentation to remove deprecated API examples
---
 docs/content.zh/docs/dev/python/debugging.md       |   6 +-
 .../docs/dev/python/table/conversion_of_pandas.md  |   6 +-
 .../docs/dev/python/table/intro_to_table_api.md    |  17 ++-
 docs/content.zh/docs/dev/python/table/metrics.md   |  19 ++-
 .../table/operations/row_based_operations.md       |  11 +-
 .../python/table/python_table_api_connectors.md    |   6 +-
 .../docs/dev/python/table/udfs/python_udfs.md      |  36 +++--
 .../python/table/udfs/vectorized_python_udfs.md    |  25 +++-
 docs/content.zh/docs/dev/table/catalogs.md         |   2 +-
 docs/content.zh/docs/dev/table/common.md           |   8 +-
 docs/content.zh/docs/dev/table/tableApi.md         | 146 ++++++++++----------
 docs/content/docs/dev/python/debugging.md          |   6 +-
 .../docs/dev/python/table/conversion_of_pandas.md  |   6 +-
 .../docs/dev/python/table/intro_to_table_api.md    |  20 +--
 docs/content/docs/dev/python/table/metrics.md      |  22 ++-
 .../table/operations/row_based_operations.md       |  13 +-
 .../python/table/python_table_api_connectors.md    |   6 +-
 .../docs/dev/python/table/udfs/python_udfs.md      |  36 +++--
 .../python/table/udfs/vectorized_python_udfs.md    |  25 ++--
 docs/content/docs/dev/table/catalogs.md            |   2 +-
 docs/content/docs/dev/table/common.md              |   8 +-
 docs/content/docs/dev/table/data_stream_api.md     |   2 +-
 docs/content/docs/dev/table/tableApi.md            | 138 ++++++++++---------
 .../table/mixing_use_of_datastream_and_table.py    |   3 +-
 .../pyflink/examples/table/pandas/pandas_udaf.py   |   6 +-
 .../pyflink/examples/table/process_json_data.py    |   3 +-
 .../examples/table/process_json_data_with_udf.py   |   3 +-
 .../examples/table/windowing/over_window.py        |   8 +-
 .../examples/table/windowing/session_window.py     |   6 +-
 .../examples/table/windowing/sliding_window.py     |   6 +-
 .../examples/table/windowing/tumble_window.py      |   6 +-
 flink-python/pyflink/table/expression.py           |   2 +-
 flink-python/pyflink/table/schema.py               |   6 +-
 flink-python/pyflink/table/table.py                | 151 +++++++++------------
 flink-python/pyflink/table/table_config.py         |   3 +-
 flink-python/pyflink/table/table_environment.py    |   2 +-
 flink-python/pyflink/table/window.py               |  33 ++---
 37 files changed, 426 insertions(+), 378 deletions(-)

diff --git a/docs/content.zh/docs/dev/python/debugging.md 
b/docs/content.zh/docs/dev/python/debugging.md
index 0cefc596b59..8dd058897ad 100644
--- a/docs/content.zh/docs/dev/python/debugging.md
+++ b/docs/content.zh/docs/dev/python/debugging.md
@@ -58,10 +58,14 @@ print(table.get_schema())
 在作业运行的过程中,日志信息会打印在 `TaskManager` 的日志文件中。
 
 ```python
+from pyflink.table import DataTypes
+from pyflink.table.udf import udf
+
+import logging
+
 @udf(result_type=DataTypes.BIGINT())
 def add(i, j):
     # 使用 logging 模块
-    import logging
     logging.info("debug")
     # 使用 print 函数
     print('debug')
diff --git a/docs/content.zh/docs/dev/python/table/conversion_of_pandas.md 
b/docs/content.zh/docs/dev/python/table/conversion_of_pandas.md
index 67544095938..1d16020efa1 100644
--- a/docs/content.zh/docs/dev/python/table/conversion_of_pandas.md
+++ b/docs/content.zh/docs/dev/python/table/conversion_of_pandas.md
@@ -36,6 +36,8 @@ PyFlink 支持将 Pandas DataFrame 转换成 PyFlink Table。在内部实现上
 以下示例显示如何从 Pandas DataFrame 创建 PyFlink Table:
 
 ```python
+from pyflink.table import DataTypes
+
 import pandas as pd
 import numpy as np
 
@@ -54,7 +56,7 @@ table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(), 
DataTypes.DOUBLE()])
 # 由Pandas DataFrame创建列名和列类型的PyFlink表
 table = t_env.from_pandas(pdf,
                           DataTypes.ROW([DataTypes.FIELD("f0", 
DataTypes.DOUBLE()),
-                                         DataTypes.FIELD("f1", 
DataTypes.DOUBLE())])
+                                         DataTypes.FIELD("f1", 
DataTypes.DOUBLE())]))
 ```
 
 ## 将 PyFlink Table 转换为 Pandas DataFrame
@@ -68,6 +70,8 @@ table = t_env.from_pandas(pdf,
 以下示例显示了如何将 PyFlink Table 转换为 Pandas DataFrame:
 
 ```python
+from pyflink.table.expressions import col
+
 import pandas as pd
 import numpy as np
 
diff --git a/docs/content.zh/docs/dev/python/table/intro_to_table_api.md 
b/docs/content.zh/docs/dev/python/table/intro_to_table_api.md
index fb4008e4cc5..e9fc93aca1b 100644
--- a/docs/content.zh/docs/dev/python/table/intro_to_table_api.md
+++ b/docs/content.zh/docs/dev/python/table/intro_to_table_api.md
@@ -319,6 +319,7 @@ new_table.execute().print()
 
 ```python
 from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
 
 # 通过 batch table environment 来执行查询
 env_settings = EnvironmentSettings.in_batch_mode()
@@ -329,11 +330,11 @@ orders = table_env.from_elements([('Jack', 'FRANCE', 10), 
('Rose', 'ENGLAND', 30
 
 # 计算所有来自法国客户的收入
 revenue = orders \
-    .select(orders.name, orders.country, orders.revenue) \
-    .where(orders.country == 'FRANCE') \
-    .group_by(orders.name) \
-    .select(orders.name, orders.revenue.sum.alias('rev_sum'))
-    
+    .select(col("name"), col("country"), col("revenue")) \
+    .where(col("country") == 'FRANCE') \
+    .group_by(col("name")) \
+    .select(col("name"), col("country").sum.alias('rev_sum'))
+
 revenue.execute().print()
 ```
 
@@ -686,6 +687,7 @@ Table API 提供了一种机制来查看 `Table` 的逻辑查询计划和优化
 ```python
 # 使用流模式 TableEnvironment
 from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
 
 env_settings = EnvironmentSettings.in_streaming_mode()
 table_env = TableEnvironment.create(env_settings)
@@ -693,7 +695,7 @@ table_env = TableEnvironment.create(env_settings)
 table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
 table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
 table = table1 \
-    .where(table1.data.like('H%')) \
+    .where(col("data").like('H%')) \
     .union_all(table2)
 print(table.explain())
 ```
@@ -739,6 +741,7 @@ Stage 136 : Data Source
 ```python
 # 使用流模式 TableEnvironment
 from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
 
 env_settings = EnvironmentSettings.in_streaming_mode()
 table_env = TableEnvironment.create(environment_settings=env_settings)
@@ -764,7 +767,7 @@ table_env.execute_sql("""
 
 statement_set = table_env.create_statement_set()
 
-statement_set.add_insert("print_sink_table", 
table1.where(table1.data.like('H%')))
+statement_set.add_insert("print_sink_table", 
table1.where(col("data").like('H%')))
 statement_set.add_insert("black_hole_sink_table", table2)
 
 print(statement_set.explain())
diff --git a/docs/content.zh/docs/dev/python/table/metrics.md 
b/docs/content.zh/docs/dev/python/table/metrics.md
index e00af42c5d0..fb0c7840c00 100644
--- a/docs/content.zh/docs/dev/python/table/metrics.md
+++ b/docs/content.zh/docs/dev/python/table/metrics.md
@@ -127,7 +127,6 @@ class MyUDF(ScalarFunction):
         self.meter = None
 
     def open(self, function_context):
-        super().open(function_context)
         # 120秒内统计的平均每秒事件数,默认是60秒
         self.meter = function_context.get_metric_group().meter("my_meter", 
time_span_in_seconds=120)
 
@@ -151,14 +150,14 @@ class MyUDF(ScalarFunction):
 {{< tab "Python" >}}
 ```python
 
-function_context
-    .get_metric_group()
-    .add_group("my_metrics")
+function_context \
+    .get_metric_group() \
+    .add_group("my_metrics") \
     .counter("my_counter")
 
-function_context
-    .get_metric_group()
-    .add_group("my_metrics_key", "my_metrics_value")
+function_context \
+    .get_metric_group() \
+    .add_group("my_metrics_key", "my_metrics_value") \
     .counter("my_counter")
 
 ```
@@ -182,9 +181,9 @@ function_context
 {{< tabs "6d0715c0-6c39-489a-b3f3-e9bf7d50c268" >}}
 {{< tab "Python" >}}
 ```python
-function_context
-    .get_metric_group()
-    .add_group("my_metrics_key", "my_metrics_value")
+function_context \
+    .get_metric_group() \
+    .add_group("my_metrics_key", "my_metrics_value") \
     .counter("my_counter")
 ```
 {{< /tab >}}
diff --git 
a/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md 
b/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
index 59f3a61de5b..59e9fa03a24 100644
--- a/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
+++ b/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
@@ -44,7 +44,7 @@ table_env = TableEnvironment.create(env_settings)
 table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
 
 @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
-                               DataTypes.FIELD("data", DataTypes.STRING())]))
+                                DataTypes.FIELD("data", DataTypes.STRING())]))
 def func1(id: int, data: str) -> Row:
     return Row(id, data * 2)
 
@@ -63,7 +63,7 @@ It also supports to take a Row object (containing all the 
columns of the input t
 
 ```python
 @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
-                               DataTypes.FIELD("data", DataTypes.STRING())]))
+                                DataTypes.FIELD("data", DataTypes.STRING())]))
 def func2(data: Row) -> Row:
     return Row(data.id, data.data * 2)
 
@@ -86,7 +86,7 @@ It should be noted that the input type and output type should 
be pandas.DataFram
 ```python
 import pandas as pd
 @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
-                               DataTypes.FIELD("data", DataTypes.STRING())]),
+                                DataTypes.FIELD("data", DataTypes.STRING())]),
      func_type='pandas')
 def func3(data: pd.DataFrame) -> pd.DataFrame:
     res = pd.concat([data.id, data.data * 2], axis=1)
@@ -288,10 +288,11 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'),
                              (3, 'Hi', 'hi'),
                              (5, 'Hi2', 'hi'),
                              (7, 'Hi', 'Hello'),
-                             (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
+                             (2, 'Hi', 'Hello')],
+                            ['a', 'b', 'c'])
 
 # call function "inline" without registration in Table API
-result = t.group_by(t.b).flat_aggregate(top2).select(col('*')).to_pandas()
+result = t.group_by(col('b')).flat_aggregate(top2).select(col('*')).to_pandas()
 
 # the result is:
 #+----+--------------------------------+----------------------+
diff --git 
a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md 
b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
index dbf94fbd1ee..d718d4d45fe 100644
--- a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
+++ b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
@@ -134,12 +134,14 @@ if __name__ == '__main__':
 PyFlink 表支持与 Pandas DataFrame 之间互相转换。
 
 ```python
+from pyflink.table.expressions import col
+
 import pandas as pd
 import numpy as np
 
 # 创建一个 PyFlink 表
 pdf = pd.DataFrame(np.random.rand(1000, 2))
-table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
+table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)
 
 # 将 PyFlink 表转换成 Pandas DataFrame
 pdf = table.to_pandas()
@@ -150,6 +152,8 @@ pdf = table.to_pandas()
 `from_elements()` 用于从一个元素集合中创建一张表。元素类型必须是可支持的原子类型或者复杂类型。
 
 ```python
+from pyflink.table import DataTypes
+
 table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
 
 # 使用第二个参数指定自定义字段名
diff --git a/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md 
b/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
index 7b55f45ca35..7194eb00323 100644
--- a/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
+++ b/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
@@ -41,7 +41,9 @@ Python 标量函数的行为由名为 `eval` 的方法定义,`eval` 方法支
 以下示例显示了如何定义自己的 Python 哈希函数、如何在 TableEnvironment 中注册它以及如何在作业中使用它。
 
 ```python
-from pyflink.table.expressions import call 
+from pyflink.table.expressions import call, col
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.udf import ScalarFunction, udf
 
 class HashCode(ScalarFunction):
   def __init__(self):
@@ -56,7 +58,7 @@ table_env = TableEnvironment.create(settings)
 hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
 
 # 在 Python Table API 中使用 Python 自定义函数
-my_table.select(my_table.string, my_table.bigint, hash_code(my_table.bigint), 
call(hash_code, my_table.bigint))
+my_table.select(col("string"), col("bigint"), hash_code(col("bigint")), 
call(hash_code, col("bigint")))
 
 # 在 SQL API 中使用 Python 自定义函数
 table_env.create_temporary_function("hash_code", udf(HashCode(), 
result_type=DataTypes.BIGINT()))
@@ -78,7 +80,8 @@ public class HashCode extends ScalarFunction {
   }
 }
 '''
-from pyflink.table.expressions import call
+from pyflink.table.expressions import call, col
+from pyflink.table import TableEnvironment, EnvironmentSettings
 
 settings = EnvironmentSettings.in_batch_mode()
 table_env = TableEnvironment.create(settings)
@@ -87,7 +90,7 @@ table_env = TableEnvironment.create(settings)
 table_env.create_java_temporary_function("hash_code", 
"my.java.function.HashCode")
 
 # 在 Python Table API 中使用 Java 函数
-my_table.select(call('hash_code', my_table.string))
+my_table.select(call('hash_code', col("string")))
 
 # 在 SQL API 中使用 Java 函数
 table_env.sql_query("SELECT string, bigint, hash_code(string) FROM MyTable")
@@ -128,10 +131,10 @@ add = udf(functools.partial(partial_add, k=1), 
result_type=DataTypes.BIGINT())
 # 注册 Python 自定义函数
 table_env.create_temporary_function("add", add)
 # 在 Python Table API 中使用 Python 自定义函数
-my_table.select("add(a, b)")
+my_table.select(call('add', col('a'), col('b')))
 
 # 也可以在 Python Table API 中直接使用 Python 自定义函数
-my_table.select(add(my_table.a, my_table.b))
+my_table.select(add(col('a'), col('b')))
 ```
 
 <a name="table-functions"></a>
@@ -143,6 +146,10 @@ my_table.select(add(my_table.a, my_table.b))
 以下示例说明了如何定义自己的 Python 自定义表值函数,将其注册到 TableEnvironment 中,并在作业中使用它。
 
 ```python
+from pyflink.table.expressions import col
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.udf import TableFunction, udtf
+
 class Split(TableFunction):
     def eval(self, string):
         for s in string.split(" "):
@@ -156,8 +163,8 @@ my_table = ...  # type: Table, table schema: [a: String]
 split = udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()])
 
 # 在 Python Table API 中使用 Python 表值函数
-my_table.join_lateral(split(my_table.a).alias("word, length"))
-my_table.left_outer_join_lateral(split(my_table.a).alias("word, length"))
+my_table.join_lateral(split(col("a")).alias("word", "length"))
+my_table.left_outer_join_lateral(split(col("a")).alias("word", "length"))
 
 # 在 SQL API 中使用 Python 表值函数
 table_env.create_temporary_function("split", udtf(Split(), 
result_types=[DataTypes.STRING(), DataTypes.INT()]))
@@ -183,7 +190,8 @@ public class Split extends TableFunction<Tuple2<String, 
Integer>> {
     }
 }
 '''
-from pyflink.table.expressions import call
+from pyflink.table.expressions import call, col
+from pyflink.table import TableEnvironment, EnvironmentSettings
 
 env_settings = EnvironmentSettings.in_streaming_mode()
 table_env = TableEnvironment.create(env_settings)
@@ -193,8 +201,8 @@ my_table = ...  # type: Table, table schema: [a: String]
 table_env.create_java_temporary_function("split", "my.java.function.Split")
 
 # 在 Python Table API 中使用表值函数。 "alias"指定表的字段名称。
-my_table.join_lateral(call('split', my_table.a).alias("word, 
length")).select(my_table.a, col('word'), col('length'))
-my_table.left_outer_join_lateral(call('split', my_table.a).alias("word, 
length")).select(my_table.a, col('word'), col('length'))
+my_table.join_lateral(call('split', col('a')).alias("word", 
"length")).select(col('a'), col('word'), col('length'))
+my_table.left_outer_join_lateral(call('split', col('a')).alias("word", 
"length")).select(col('a'), col('word'), col('length'))
 
 # 注册 Python 函数。
 
@@ -311,14 +319,14 @@ t = table_env.from_elements([(1, 2, "Lee"),
                              (7, 8, "Lee")]).alias("value", "count", "name")
 
 # call function "inline" without registration in Table API
-result = t.group_by(t.name).select(weighted_avg(t.value, 
t.count).alias("avg")).execute()
+result = t.group_by(col("name")).select(weighted_avg(col("value"), 
col("count")).alias("avg")).execute()
 result.print()
 
 # register function
 table_env.create_temporary_function("weighted_avg", WeightedAvg())
 
 # call registered function in Table API
-result = t.group_by(t.name).select(call("weighted_avg", t.value, 
t.count).alias("avg")).execute()
+result = t.group_by(col("name")).select(call("weighted_avg", col("value"), 
col("count")).alias("avg")).execute()
 result.print()
 
 # register table
@@ -487,7 +495,7 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'),
                             ['a', 'b', 'c'])
 
 # call function "inline" without registration in Table API
-t.group_by(t.b).flat_aggregate(top2).select(col('*')).execute().print()
+t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print()
 
 # the result is:
 +----+--------------------------------+----------------------+
diff --git 
a/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md 
b/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
index af6c7f60c15..5454af1057d 100644
--- a/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
+++ b/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
@@ -49,6 +49,10 @@ under the License.
 以下示例显示了如何定义自己的向量化 Python 标量函数,该函数计算两列的总和,并在查询中使用它:
 
 ```python
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.expressions import col
+from pyflink.table.udf import udf
+
 @udf(result_type=DataTypes.BIGINT(), func_type="pandas")
 def add(i, j):
   return i + j
@@ -57,7 +61,7 @@ settings = EnvironmentSettings.in_batch_mode()
 table_env = TableEnvironment.create(settings)
 
 # use the vectorized Python scalar function in Python Table API
-my_table.select(add(my_table.bigint, my_table.bigint))
+my_table.select(add(col("bigint"), col("bigint")))
 
 # 在SQL API中使用Python向量化标量函数
 table_env.create_temporary_function("add", add)
@@ -81,6 +85,11 @@ table_env.sql_query("SELECT add(bigint, bigint) FROM 
MyTable")
 and `Over Window Aggregation` 使用它:
 
 ```python
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.expressions import col, lit
+from pyflink.table.udf import udaf
+from pyflink.table.window import Tumble
+
 @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
 def mean_udaf(v):
     return v.mean()
@@ -91,18 +100,17 @@ table_env = TableEnvironment.create(settings)
 my_table = ...  # type: Table, table schema: [a: String, b: BigInt, c: BigInt]
 
 # 在 GroupBy Aggregation 中使用向量化聚合函数
-my_table.group_by(my_table.a).select(my_table.a, mean_udaf(add(my_table.b)))
+my_table.group_by(col('a')).select(col('a'), mean_udaf(col('b')))
 
 
 # 在 GroupBy Window Aggregation 中使用向量化聚合函数
-tumble_window = Tumble.over(expr.lit(1).hours) \
-            .on(expr.col("rowtime")) \
+tumble_window = Tumble.over(lit(1).hours) \
+            .on(col("rowtime")) \
             .alias("w")
 
 my_table.window(tumble_window) \
-    .group_by("w") \
-    .select("w.start, w.end, mean_udaf(b)")
-
+    .group_by(col("w")) \
+    .select(col('w').start, col('w').end, mean_udaf(col('b')))
 
 # 在 Over Window Aggregation 中使用向量化聚合函数
 table_env.create_temporary_function("mean_udaf", mean_udaf)
@@ -118,6 +126,9 @@ table_env.sql_query("""
 以下示例显示了多种定义向量化 Python 聚合函数的方式。该函数需要两个类型为 bigint 的参数作为输入参数,并返回它们的最大值的和作为结果。
 
 ```python
+from pyflink.table import DataTypes
+from pyflink.table.udf import AggregateFunction, udaf
+
 # 方式一:扩展基类 `AggregateFunction`
 class MaxAdd(AggregateFunction):
 
diff --git a/docs/content.zh/docs/dev/table/catalogs.md 
b/docs/content.zh/docs/dev/table/catalogs.md
index 0b86fbd6769..05722d87f65 100644
--- a/docs/content.zh/docs/dev/table/catalogs.md
+++ b/docs/content.zh/docs/dev/table/catalogs.md
@@ -241,7 +241,7 @@ schema = Schema.new_builder() \
     
 catalog_table = t_env.create_table("myhive.mydb.mytable", 
TableDescriptor.for_connector("kafka")
     .schema(schema)
-    // …
+    # …
     .build())
 
 # tables should contain "mytable"
diff --git a/docs/content.zh/docs/dev/table/common.md 
b/docs/content.zh/docs/dev/table/common.md
index 02f06056a7f..83d0ddacdcc 100644
--- a/docs/content.zh/docs/dev/table/common.md
+++ b/docs/content.zh/docs/dev/table/common.md
@@ -475,9 +475,9 @@ table_env = # see "Create a TableEnvironment" section
 orders = table_env.from_path("Orders")
 # compute revenue for all customers from France
 revenue = orders \
-    .filter(orders.cCountry == 'FRANCE') \
-    .group_by(orders.cID, orders.cName) \
-    .select(orders.cID, orders.cName, orders.revenue.sum.alias('revSum'))
+    .filter(col('cCountry') == 'FRANCE') \
+    .group_by(col('cID'), col('cName')) \
+    .select(col('cID'), col('cName'), col('revenue').sum.alias('revSum'))
 
 # emit or convert Table
 # execute query
@@ -856,7 +856,7 @@ t_env = StreamTableEnvironment.create(env)
 table1 = t_env.from_elements([(1, "hello")], ["count", "word"])
 table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
 table = table1 \
-    .where(table1.word.like('F%')) \
+    .where(col('word').like('F%')) \
     .union_all(table2)
 print(table.explain())
 
diff --git a/docs/content.zh/docs/dev/table/tableApi.md 
b/docs/content.zh/docs/dev/table/tableApi.md
index 65a9f9d87a2..e5961db83d5 100644
--- a/docs/content.zh/docs/dev/table/tableApi.md
+++ b/docs/content.zh/docs/dev/table/tableApi.md
@@ -113,6 +113,7 @@ val result = orders
 
 ```python
 from pyflink.table import *
+from pyflink.table.expressions import col
 
 # 环境配置
 t_env = TableEnvironment.create(
@@ -151,8 +152,7 @@ t_env.execute_sql(sink_ddl)
 # 指定表程序
 orders = t_env.from_path("Orders")  # schema (a, b, c, rowtime)
 
-orders.group_by("a").select(orders.a, 
orders.b.count.alias('cnt')).execute_insert("result").wait()
-
+orders.group_by(col("a")).select(col("a"), 
col("b").count.alias('cnt')).execute_insert("result").wait()
 ```
 
 {{< /tab >}}
@@ -207,14 +207,15 @@ val result: Table = orders
 ```python
 # 指定表程序
 from pyflink.table.expressions import col, lit
+from pyflink.table.window import Tumble
 
 orders = t_env.from_path("Orders")  # schema (a, b, c, rowtime)
 
-result = orders.filter(orders.a.is_not_null & orders.b.is_not_null & 
orders.c.is_not_null) \
-               .select(orders.a.lower_case.alias('a'), orders.b, 
orders.rowtime) \
-               
.window(Tumble.over(lit(1).hour).on(orders.rowtime).alias("hourly_window")) \
+result = orders.filter(col("a").is_not_null & col("b").is_not_null & 
col("c").is_not_null) \
+               .select(col("a").lower_case.alias('a'), col("b"), 
col("rowtime")) \
+               
.window(Tumble.over(lit(1).hour).on(col("rowtime")).alias("hourly_window")) \
                .group_by(col('hourly_window'), col('a')) \
-               .select(col('a'), col('hourly_window').end.alias('hour'), 
b.avg.alias('avg_billing_amount'))
+               .select(col('a'), col('hourly_window').end.alias('hour'), 
col("b").avg.alias('avg_billing_amount'))
 ```
 
 {{< /tab >}}
@@ -369,7 +370,7 @@ Table result = orders.select($"a", $"c" as "d")
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.select(orders.a, orders.c.alias('d'))
+result = orders.select(col("a"), col("c").alias('d'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -417,7 +418,7 @@ val orders: Table = tableEnv.from("Orders").as("x", "y", 
"z", "t")
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.alias("x, y, z, t")
+result = orders.alias("x", "y", "z", "t")
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -446,7 +447,7 @@ val result = orders.filter($"a" % 2 === 0)
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.where(orders.a == 'red')
+result = orders.where(col("a") == 'red')
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -469,7 +470,7 @@ val result = orders.filter($"a" % 2 === 0)
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.filter(orders.a == 'red')
+result = orders.filter(col("a") == 'red')
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -504,7 +505,7 @@ val result = orders.addColumns(concat($"c", "Sunny"))
 from pyflink.table.expressions import concat
 
 orders = t_env.from_path("Orders")
-result = orders.add_columns(concat(orders.c, 'sunny'))
+result = orders.add_columns(concat(col("c"), 'sunny'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -535,7 +536,7 @@ val result = orders.addOrReplaceColumns(concat($"c", 
"Sunny") as "desc")
 from pyflink.table.expressions import concat
 
 orders = t_env.from_path("Orders")
-result = orders.add_or_replace_columns(concat(orders.c, 'sunny').alias('desc'))
+result = orders.add_or_replace_columns(concat(col("c"), 'sunny').alias('desc'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -560,7 +561,7 @@ val result = orders.dropColumns($"b", $"c")
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.drop_columns(orders.b, orders.c)
+result = orders.drop_columns(col("b"), col("c"))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -588,7 +589,7 @@ val result = orders.renameColumns($"b" as "b2", $"c" as 
"c2")
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2'))
+result = orders.rename_columns(col("b").alias('b2'), col("c").alias('c2'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -621,7 +622,7 @@ val result = orders.groupBy($"a").select($"a", 
$"b".sum().as("d"))
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d'))
+result = orders.group_by(col("a")).select(col("a"), col("b").sum.alias('d'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -666,9 +667,9 @@ from pyflink.table.window import Tumble
 from pyflink.table.expressions import lit, col
 
 orders = t_env.from_path("Orders")
-result = 
orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \ 
-               .group_by(orders.a, col('w')) \
-               .select(orders.a, col('w').start, col('w').end, 
orders.b.sum.alias('d'))
+result = 
orders.window(Tumble.over(lit(5).minutes).on(col('rowtime')).alias("w")) \
+               .group_by(col('a'), col('w')) \
+               .select(col('a'), col('w').start, col('w').end, 
col('b').sum.alias('d'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -721,10 +722,10 @@ from pyflink.table.window import Over
 from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE
 
 orders = t_env.from_path("Orders")
-result = 
orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime)
+result = 
orders.over_window(Over.partition_by(col("a")).order_by(col("rowtime"))
                             
.preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE)
                             .alias("w")) \
-               .select(orders.a, orders.b.avg.over(col('w')), 
orders.b.max.over(col('w')), orders.b.min.over(col('w')))
+               .select(col("a"), col("b").avg.over(col('w')), 
col("b").max.over(col('w')), col("b").min.over(col('w')))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -795,22 +796,23 @@ val result = orders
 {{< tab "Python" >}}
 ```python
 from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE
+from pyflink.table.window import Over, Tumble
 
 orders = t_env.from_path("Orders")
 # 按属性分组后的的互异(互不相同、去重)聚合
-group_by_distinct_result = orders.group_by(orders.a) \
-                                 .select(orders.a, 
orders.b.sum.distinct.alias('d'))
+group_by_distinct_result = orders.group_by(col("a")) \
+                                 .select(col("a"), 
col("b").sum.distinct.alias('d'))
 # 按属性、时间窗口分组后的互异(互不相同、去重)聚合
-group_by_window_distinct_result = orders.window(
-    
Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(orders.a, 
col('w')) \
-    .select(orders.a, orders.b.sum.distinct.alias('d'))
+group_by_window_distinct_result = 
orders.window(Tumble.over(lit(5).minutes).on(col("rowtime")).alias("w")) \
+    .group_by(col("a"), col('w')) \
+    .select(col("a"), col("b").sum.distinct.alias('d'))
 # over window 上的互异(互不相同、去重)聚合
 result = orders.over_window(Over
-                       .partition_by(orders.a)
-                       .order_by(orders.rowtime)
-                       .preceding(UNBOUNDED_RANGE)
-                       .alias("w")) \
-                       .select(orders.a, orders.b.avg.distinct.over(col('w')), 
orders.b.max.over(col('w')), orders.b.min.over(col('w')))
+                            .partition_by(col("a"))
+                            .order_by(col("rowtime"))
+                            .preceding(UNBOUNDED_RANGE)
+                            .alias("w")) \
+    .select(col("a"), col("b").avg.distinct.over(col('w')), 
col("b").max.over(col('w')), col("b").min.over(col('w')))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -911,7 +913,7 @@ from pyflink.table.expressions import col
 
 left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
 right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
-result = left.join(right).where(left.a == right.d).select(left.a, left.b, 
right.e)
+result = left.join(right).where(col('a') == col('d')).select(col('a'), 
col('b'), col('e'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -957,9 +959,9 @@ from pyflink.table.expressions import col
 left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
 right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
 
-left_outer_result = left.left_outer_join(right, left.a == 
right.d).select(left.a, left.b, right.e)
-right_outer_result = left.right_outer_join(right, left.a == 
right.d).select(left.a, left.b, right.e)
-full_outer_result = left.full_outer_join(right, left.a == 
right.d).select(left.a, left.b, right.e)
+left_outer_result = left.left_outer_join(right, col('a') == 
col('d')).select(col('a'), col('b'), col('e'))
+right_outer_result = left.right_outer_join(right, col('a') == 
col('d')).select(col('a'), col('b'), col('e'))
+full_outer_result = left.full_outer_join(right, col('a') == 
col('d')).select(col('a'), col('b'), col('e'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1006,9 +1008,9 @@ from pyflink.table.expressions import col
 
 left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'), 
col('rowtime1'))
 right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'), 
col('rowtime2'))
-  
-joined_table = left.join(right).where((left.a == right.d) & (left.rowtime1 >= 
right.rowtime2 - lit(1).second) & (left.rowtime1 <= right.rowtime2 + 
lit(2).seconds))
-result = joined_table.select(joined_table.a, joined_table.b, joined_table.e, 
joined_table.rowtime1)
+
+joined_table = left.join(right).where((col('a') == col('d')) & 
(col('rowtime1') >= col('rowtime2') - lit(1).second) & (col('rowtime1') <= 
col('rowtime2') + lit(2).seconds))
+result = joined_table.select(col('a'), col('b'), col('e'), col('rowtime1'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1054,8 +1056,8 @@ def split(x):
 
 # join
 orders = t_env.from_path("Orders")
-joined_table = orders.join_lateral(split(orders.c).alias("s, t, v"))
-result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, 
joined_table.t, joined_table.v)
+joined_table = orders.join_lateral(split(col('c')).alias("s", "t", "v"))
+result = joined_table.select(col('a'), col('b'), col('s'), col('t'), col('v'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1103,8 +1105,8 @@ def split(x):
 
 # join
 orders = t_env.from_path("Orders")
-joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s, t, v"))
-result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, 
joined_table.t, joined_table.v)
+joined_table = orders.left_outer_join_lateral(split(col('c')).alias("s", "t", 
"v"))
+result = joined_table.select(col('a'), col('b'), col('s'), col('t'), col('v'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1370,7 +1372,7 @@ val result = left.select($"a", $"b", 
$"c").where($"a".in(right))
 left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
 right = t_env.from_path("Source2").select(col('a'))
 
-result = left.select(left.a, left.b, left.c).where(left.a.in_(right))
+result = left.select(col('a'), col('b'), col('c')).where(col('a').in_(right))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1390,17 +1392,17 @@ result = left.select(left.a, left.b, 
left.c).where(left.a.in_(right))
 {{< tabs "orderby" >}}
 {{< tab "Java" >}}
 ```java
-Table result = in.orderBy($("a").asc());
+Table result = tab.orderBy($("a").asc());
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-val result = in.orderBy($"a".asc)
+val result = tab.orderBy($"a".asc)
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
 ```python
-result = in.order_by(in.a.asc)
+result = tab.order_by(col('a').asc)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1440,13 +1442,13 @@ val result3: Table = 
in.orderBy($"a".asc).offset(10).fetch(5)
 {{< tab "Python" >}}
 ```python
 # 从已排序的结果集中返回前5条记录
-result1 = table.order_by(table.a.asc).fetch(5)
+result1 = table.order_by(col('a').asc).fetch(5)
 
 # 从已排序的结果集中返回跳过3条记录之后的所有记录
-result2 = table.order_by(table.a.asc).offset(3)
+result2 = table.order_by(col('a').asc).offset(3)
 
 # 从已排序的结果集中返回跳过10条记录之后的前5条记录
-result3 = table.order_by(table.a.asc).offset(10).fetch(5)
+result3 = table.order_by(col('a').asc).offset(10).fetch(5)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1518,7 +1520,7 @@ val table = input
 ```python
 # 定义窗口并指定别名为 w,以窗口 w 对表进行分组,然后再聚合
 table = input.window([w: GroupWindow].alias("w")) \
-             .group_by(col('w')).select(input.b.sum)
+             .group_by(col('w')).select(col('b').sum)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1554,7 +1556,7 @@ val table = input
 # 定义窗口并指定别名为 w,以属性 a 和窗口 w 对表进行分组,
 # 然后再聚合
 table = input.window([w: GroupWindow].alias("w")) \
-             .group_by(col('w'), input.a).select(input.b.sum)
+             .group_by(col('w'), col('a')).select(col('b').sum)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1583,8 +1585,8 @@ val table = input
 # 定义窗口并指定别名为 w,以属性 a 和窗口 w 对表进行分组,
 # 然后再聚合并添加窗口开始、结束和 rowtime 时间戳
 table = input.window([w: GroupWindow].alias("w")) \
-             .group_by(col('w'), input.a) \
-             .select(input.a, col('w').start, col('w').end, col('w').rowtime, 
input.b.count)
+             .group_by(col('w'), col('a')) \
+             .select(col('a'), col('w').start, col('w').end, col('w').rowtime, 
col('b').count)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1989,8 +1991,8 @@ val table = input
 {{< tab "Python" >}}
 ```python
 # define over window with alias w and aggregate over the over window w
-table = input.over_window([w: OverWindow].alias("w")) \
-    .select(input.a, input.b.sum.over(col('w')), input.c.min.over(col('w')))
+table = input.over_window([w: OverWindow].alias("w"))
+    .select(col('a'), col('b').sum.over(col('w')), col('c').min.over(col('w')))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -2217,9 +2219,9 @@ func = udf(map_function, result_type=DataTypes.ROW(
 table = input.map(func).alias('a', 'b')
 
 # 使用 python 向量化标量函数进行 map 操作
-pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW(
-                                                    [DataTypes.FIELD("a", 
DataTypes.BIGINT()),
-                                                    DataTypes.FIELD("b", 
DataTypes.BIGINT()))]),
+pandas_func = udf(lambda x: x * 2,
+                  result_type=DataTypes.ROW([DataTypes.FIELD("a", 
DataTypes.BIGINT()),
+                                             DataTypes.FIELD("b", 
DataTypes.BIGINT())]),
                   func_type='pandas')
 
 table = input.map(pandas_func).alias('a', 'b')
@@ -2451,9 +2453,9 @@ agg = udaf(function,
            name=str(function.__class__.__name__))
 
 # 使用 python 通用聚合函数进行聚合
-result = t.group_by(t.a) \
+result = t.group_by(col('a')) \
     .aggregate(agg.alias("c", "d")) \
-    .select("a, c, d")
+    .select(col('a'), col('c'), col('d'))
     
 # 使用 python 向量化聚合函数进行聚合
 pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
@@ -2462,8 +2464,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                         DataTypes.FIELD("b", DataTypes.INT())]),
                    func_type="pandas")
 t.aggregate(pandas_udaf.alias("a", "b")) \
-    .select("a, b")
-
+ .select(col('a'), col('b'))
 ```
 
 {{< /tab >}}
@@ -2504,20 +2505,22 @@ val table = input
 ```python
 from pyflink.table import DataTypes
 from pyflink.table.udf import AggregateFunction, udaf
+from pyflink.table.expressions import col, lit
+from pyflink.table.window import Tumble
 
 pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                    result_type=DataTypes.ROW(
                        [DataTypes.FIELD("a", DataTypes.FLOAT()),
                         DataTypes.FIELD("b", DataTypes.INT())]),
                    func_type="pandas")
-tumble_window = Tumble.over(expr.lit(1).hours) \
-    .on(expr.col("rowtime")) \
+tumble_window = Tumble.over(lit(1).hours) \
+    .on(col("rowtime")) \
     .alias("w")
-t.select(t.b, t.rowtime) \
+t.select(col('b'), col('rowtime')) \
     .window(tumble_window) \
-    .group_by("w") \
+    .group_by(col("w")) \
     .aggregate(pandas_udaf.alias("d", "e")) \
-    .select("w.rowtime, d, e")
+    .select(col('w').rowtime, col('d'), col('e'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -2667,6 +2670,7 @@ val result = orders
 from pyflink.common import Row
 from pyflink.table.udf import TableAggregateFunction, udtaf
 from pyflink.table import DataTypes
+from pyflink.table.expressions import col
 
 class Top2(TableAggregateFunction):
 
@@ -2702,13 +2706,13 @@ t = t_env.from_elements([(1, 'Hi', 'Hello'),
                               (3, 'Hi', 'hi'),
                               (5, 'Hi2', 'hi'),
                               (7, 'Hi', 'Hello'),
-                              (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
-result = t.select(t.a, t.c) \
-    .group_by(t.c) \
+                              (2, 'Hi', 'Hello')],
+                        ['a', 'b', 'c'])
+result = t.select(col('a'), col('c')) \
+    .group_by(col('c')) \
     .flat_aggregate(mytop) \
-    .select(t.a) \
+    .select(col('a')) \
     .flat_aggregate(mytop.alias("b"))
-
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git a/docs/content/docs/dev/python/debugging.md 
b/docs/content/docs/dev/python/debugging.md
index a1723e2a55e..d0597032513 100644
--- a/docs/content/docs/dev/python/debugging.md
+++ b/docs/content/docs/dev/python/debugging.md
@@ -60,10 +60,14 @@ You can log contextual and debug information via `print` or 
standard Python logg
 The logging messages will be printed in the log files of the `TaskManagers` 
during job execution.
 
 ```python
+from pyflink.table import DataTypes
+from pyflink.table.udf import udf
+
+import logging
+
 @udf(result_type=DataTypes.BIGINT())
 def add(i, j):
     # use logging modules
-    import logging
     logging.info("debug")
     # use print function
     print('debug')
diff --git a/docs/content/docs/dev/python/table/conversion_of_pandas.md 
b/docs/content/docs/dev/python/table/conversion_of_pandas.md
index 36a43e1c91b..46e90bdd331 100644
--- a/docs/content/docs/dev/python/table/conversion_of_pandas.md
+++ b/docs/content/docs/dev/python/table/conversion_of_pandas.md
@@ -39,6 +39,8 @@ provide exactly-once guarantees.
 The following example shows how to create a PyFlink Table from a Pandas 
DataFrame:
 
 ```python
+from pyflink.table import DataTypes
+
 import pandas as pd
 import numpy as np
 
@@ -57,7 +59,7 @@ table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(), 
DataTypes.DOUBLE()])
 # Create a PyFlink Table from a Pandas DataFrame with the specified row type
 table = t_env.from_pandas(pdf,
                           DataTypes.ROW([DataTypes.FIELD("f0", 
DataTypes.DOUBLE()),
-                                         DataTypes.FIELD("f1", 
DataTypes.DOUBLE())])
+                                         DataTypes.FIELD("f1", 
DataTypes.DOUBLE())]))
 ```
 
 ## Convert PyFlink Table to Pandas DataFrame
@@ -72,6 +74,8 @@ You can limit the number of rows collected to client side via 
{{< pythondoc file
 The following example shows how to convert a PyFlink Table to a Pandas 
DataFrame:
 
 ```python
+from pyflink.table.expressions import col
+
 import pandas as pd
 import numpy as np
 
diff --git a/docs/content/docs/dev/python/table/intro_to_table_api.md 
b/docs/content/docs/dev/python/table/intro_to_table_api.md
index 938e0379955..13bb85736c0 100644
--- a/docs/content/docs/dev/python/table/intro_to_table_api.md
+++ b/docs/content/docs/dev/python/table/intro_to_table_api.md
@@ -36,6 +36,7 @@ All Table API and SQL programs, both batch and streaming, 
follow the same patter
 
 ```python
 from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
 
 # 1. create a TableEnvironment
 env_settings = EnvironmentSettings.in_streaming_mode()
@@ -70,7 +71,7 @@ source_table = table_env.from_path("datagen")
 # or create a Table from a SQL query:
 # source_table = table_env.sql_query("SELECT * FROM datagen")
 
-result_table = source_table.select(source_table.id + 1, source_table.data)
+result_table = source_table.select(col("id") + 1, col("data"))
 
 # 5. emit query result to sink table
 # emit a Table API result Table to a sink table:
@@ -319,6 +320,7 @@ The following example shows a simple Table API aggregation 
query:
 
 ```python
 from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
 
 # using batch table environment to execute the queries
 env_settings = EnvironmentSettings.in_batch_mode()
@@ -329,11 +331,11 @@ orders = table_env.from_elements([('Jack', 'FRANCE', 10), 
('Rose', 'ENGLAND', 30
 
 # compute revenue for all customers from France
 revenue = orders \
-    .select(orders.name, orders.country, orders.revenue) \
-    .where(orders.country == 'FRANCE') \
-    .group_by(orders.name) \
-    .select(orders.name, orders.revenue.sum.alias('rev_sum'))
-    
+    .select(col("name"), col("country"), col("revenue")) \
+    .where(col("country") == 'FRANCE') \
+    .group_by(col("name")) \
+    .select(col("name"), col("country").sum.alias('rev_sum'))
+
 revenue.execute().print()
 ```
 
@@ -687,6 +689,7 @@ The following code shows how to use the `Table.explain()` 
method:
 ```python
 # using a stream TableEnvironment
 from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
 
 env_settings = EnvironmentSettings.in_streaming_mode()
 table_env = TableEnvironment.create(env_settings)
@@ -694,7 +697,7 @@ table_env = TableEnvironment.create(env_settings)
 table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
 table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
 table = table1 \
-    .where(table1.data.like('H%')) \
+    .where(col("data").like('H%')) \
     .union_all(table2)
 print(table.explain())
 ```
@@ -740,6 +743,7 @@ The following code shows how to use the 
`StatementSet.explain()` method:
 ```python
 # using a stream TableEnvironment
 from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
 
 env_settings = EnvironmentSettings.in_streaming_mode()
 table_env = TableEnvironment.create(env_settings)
@@ -765,7 +769,7 @@ table_env.execute_sql("""
 
 statement_set = table_env.create_statement_set()
 
-statement_set.add_insert("print_sink_table", 
table1.where(table1.data.like('H%')))
+statement_set.add_insert("print_sink_table", 
table1.where(col("data").like('H%')))
 statement_set.add_insert("black_hole_sink_table", table2)
 
 print(statement_set.explain())
diff --git a/docs/content/docs/dev/python/table/metrics.md 
b/docs/content/docs/dev/python/table/metrics.md
index 56441980de8..2767c654ecf 100644
--- a/docs/content/docs/dev/python/table/metrics.md
+++ b/docs/content/docs/dev/python/table/metrics.md
@@ -60,7 +60,6 @@ class MyUDF(ScalarFunction):
     def eval(self, i):
         self.counter.inc(i)
         return i
-
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -137,7 +136,6 @@ class MyUDF(ScalarFunction):
         self.meter = None
 
     def open(self, function_context):
-        super().open(function_context)
         # an average rate of events per second over 120s, default is 60s.
         self.meter = function_context.get_metric_group().meter("my_meter", 
time_span_in_seconds=120)
 
@@ -162,17 +160,15 @@ group's sub-groups. In this case, the value group will be 
returned, and a user v
 {{< tabs "a3040b2d-bf2d-4ce4-be2f-2896f48334c8" >}}
 {{< tab "Python" >}}
 ```python
-
-function_context
-    .get_metric_group()
-    .add_group("my_metrics")
+function_context \
+    .get_metric_group() \
+    .add_group("my_metrics") \
     .counter("my_counter")
 
-function_context
-    .get_metric_group()
-    .add_group("my_metrics_key", "my_metrics_value")
+function_context \
+    .get_metric_group() \
+    .add_group("my_metrics_key", "my_metrics_value") \
     .counter("my_counter")
-
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -195,9 +191,9 @@ specifying the value parameter.
 {{< tabs "d27cbda0-da5f-4a77-a02e-2e54e3156e31" >}}
 {{< tab "Python" >}}
 ```python
-function_context
-    .get_metric_group()
-    .add_group("my_metrics_key", "my_metrics_value")
+function_context \
+    .get_metric_group() \
+    .add_group("my_metrics_key", "my_metrics_value") \
     .counter("my_counter")
 ```
 {{< /tab >}}
diff --git 
a/docs/content/docs/dev/python/table/operations/row_based_operations.md 
b/docs/content/docs/dev/python/table/operations/row_based_operations.md
index 3e313b274bc..640d8a05ddd 100644
--- a/docs/content/docs/dev/python/table/operations/row_based_operations.md
+++ b/docs/content/docs/dev/python/table/operations/row_based_operations.md
@@ -44,7 +44,7 @@ table_env = TableEnvironment.create(env_settings)
 table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
 
 @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
-                               DataTypes.FIELD("data", DataTypes.STRING())]))
+                                DataTypes.FIELD("data", DataTypes.STRING())]))
 def func1(id: int, data: str) -> Row:
     return Row(id, data * 2)
 
@@ -63,7 +63,7 @@ It also supports to take a Row object (containing all the 
columns of the input t
 
 ```python
 @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
-                               DataTypes.FIELD("data", DataTypes.STRING())]))
+                                DataTypes.FIELD("data", DataTypes.STRING())]))
 def func2(data: Row) -> Row:
     return Row(data.id, data.data * 2)
 
@@ -86,7 +86,7 @@ It should be noted that the input type and output type should 
be pandas.DataFram
 ```python
 import pandas as pd
 @udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
-                               DataTypes.FIELD("data", DataTypes.STRING())]),
+                                DataTypes.FIELD("data", DataTypes.STRING())]),
      func_type='pandas')
 def func3(data: pd.DataFrame) -> pd.DataFrame:
     res = pd.concat([data.id, data.data * 2], axis=1)
@@ -226,7 +226,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                         DataTypes.FIELD("b", DataTypes.INT())]),
                    func_type="pandas")
 t.aggregate(pandas_udaf.alias("a", "b")) \
-    .select(col('a'), col('b')).execute().print()
+ .select(col('a'), col('b')).execute().print()
 
 # the result is
 #+--------------------------------+-------------+
@@ -288,10 +288,11 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'),
                              (3, 'Hi', 'hi'),
                              (5, 'Hi2', 'hi'),
                              (7, 'Hi', 'Hello'),
-                             (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
+                             (2, 'Hi', 'Hello')],
+                            ['a', 'b', 'c'])
 
 # call function "inline" without registration in Table API
-result = 
t.group_by(t.b).flat_aggregate(top2).select(col('*')).execute().print()
+result = 
t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print()
 
 # the result is:
 #+----+--------------------------------+----------------------+
diff --git a/docs/content/docs/dev/python/table/python_table_api_connectors.md 
b/docs/content/docs/dev/python/table/python_table_api_connectors.md
index a7f31d30cfb..2d85d5c4024 100644
--- a/docs/content/docs/dev/python/table/python_table_api_connectors.md
+++ b/docs/content/docs/dev/python/table/python_table_api_connectors.md
@@ -138,12 +138,14 @@ The predefined data sinks support writing to Pandas 
DataFrame.
 PyFlink Tables support conversion to and from Pandas DataFrame.
 
 ```python
+from pyflink.table.expressions import col
+
 import pandas as pd
 import numpy as np
 
 # Create a PyFlink Table
 pdf = pd.DataFrame(np.random.rand(1000, 2))
-table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
+table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)
 
 # Convert the PyFlink Table to a Pandas DataFrame
 pdf = table.to_pandas()
@@ -155,6 +157,8 @@ pdf = table.to_pandas()
 be acceptable atomic types or acceptable composite types.
 
 ```python
+from pyflink.table import DataTypes
+
 table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
 
 # use the second parameter to specify custom field names
diff --git a/docs/content/docs/dev/python/table/udfs/python_udfs.md 
b/docs/content/docs/dev/python/table/udfs/python_udfs.md
index 1cbf1a6ee49..d4301a26576 100644
--- a/docs/content/docs/dev/python/table/udfs/python_udfs.md
+++ b/docs/content/docs/dev/python/table/udfs/python_udfs.md
@@ -41,7 +41,9 @@ The following example shows how to define your own Python 
hash code function, re
 Note that you can configure your scalar function via a constructor before it 
is registered:
 
 ```python
-from pyflink.table.expressions import call 
+from pyflink.table.expressions import call, col
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.udf import ScalarFunction, udf
 
 class HashCode(ScalarFunction):
   def __init__(self):
@@ -56,7 +58,7 @@ table_env = TableEnvironment.create(settings)
 hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
 
 # use the Python function in Python Table API
-my_table.select(my_table.string, my_table.bigint, hash_code(my_table.bigint), 
call(hash_code, my_table.bigint))
+my_table.select(col("string"), col("bigint"), hash_code(col("bigint")), 
call(hash_code, col("bigint")))
 
 # use the Python function in SQL API
 table_env.create_temporary_function("hash_code", udf(HashCode(), 
result_type=DataTypes.BIGINT()))
@@ -78,7 +80,8 @@ public class HashCode extends ScalarFunction {
   }
 }
 '''
-from pyflink.table.expressions import call
+from pyflink.table.expressions import call, col
+from pyflink.table import TableEnvironment, EnvironmentSettings
 
 settings = EnvironmentSettings.in_batch_mode()
 table_env = TableEnvironment.create(settings)
@@ -87,7 +90,7 @@ table_env = TableEnvironment.create(settings)
 table_env.create_java_temporary_function("hash_code", 
"my.java.function.HashCode")
 
 # use the Java function in Python Table API
-my_table.select(call('hash_code', my_table.string))
+my_table.select(call('hash_code', col("string")))
 
 # use the Java function in SQL API
 table_env.sql_query("SELECT string, bigint, hash_code(string) FROM MyTable")
@@ -129,10 +132,10 @@ add = udf(functools.partial(partial_add, k=1), 
result_type=DataTypes.BIGINT())
 # register the Python function
 table_env.create_temporary_function("add", add)
 # use the function in Python Table API
-my_table.select("add(a, b)")
+my_table.select(call('add', col('a'), col('b')))
 
 # You can also use the Python function in Python Table API directly
-my_table.select(add(my_table.a, my_table.b))
+my_table.select(add(col('a'), col('b')))
 ```
 
 ## Table Functions
@@ -145,6 +148,10 @@ The following example shows how to define your own Python 
multi emit function, r
 TableEnvironment, and call it in a query.
 
 ```python
+from pyflink.table.expressions import col
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.udf import TableFunction, udtf
+
 class Split(TableFunction):
     def eval(self, string):
         for s in string.split(" "):
@@ -158,8 +165,8 @@ my_table = ...  # type: Table, table schema: [a: String]
 split = udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()])
 
 # use the Python Table Function in Python Table API
-my_table.join_lateral(split(my_table.a).alias("word, length"))
-my_table.left_outer_join_lateral(split(my_table.a).alias("word, length"))
+my_table.join_lateral(split(col("a")).alias("word", "length"))
+my_table.left_outer_join_lateral(split(col("a")).alias("word", "length"))
 
 # use the Python Table function in SQL API
 table_env.create_temporary_function("split", udtf(Split(), 
result_types=[DataTypes.STRING(), DataTypes.INT()]))
@@ -186,7 +193,8 @@ public class Split extends TableFunction<Tuple2<String, 
Integer>> {
     }
 }
 '''
-from pyflink.table.expressions import call
+from pyflink.table.expressions import call, col
+from pyflink.table import TableEnvironment, EnvironmentSettings
 
 env_settings = EnvironmentSettings.in_streaming_mode()
 table_env = TableEnvironment.create(env_settings)
@@ -196,8 +204,8 @@ my_table = ...  # type: Table, table schema: [a: String]
 table_env.create_java_temporary_function("split", "my.java.function.Split")
 
 # Use the table function in the Python Table API. "alias" specifies the field 
names of the table.
-my_table.join_lateral(call('split', my_table.a).alias("word, 
length")).select(my_table.a, col('word'), col('length'))
-my_table.left_outer_join_lateral(call('split', my_table.a).alias("word, 
length")).select(my_table.a, col('word'), col('length'))
+my_table.join_lateral(call('split', col('a')).alias("word", 
"length")).select(col('a'), col('word'), col('length'))
+my_table.left_outer_join_lateral(call('split', col('a')).alias("word", 
"length")).select(col('a'), col('word'), col('length'))
 
 # Register the python function.
 
@@ -313,14 +321,14 @@ t = table_env.from_elements([(1, 2, "Lee"),
                              (7, 8, "Lee")]).alias("value", "count", "name")
 
 # call function "inline" without registration in Table API
-result = t.group_by(t.name).select(weighted_avg(t.value, 
t.count).alias("avg")).execute()
+result = t.group_by(col("name")).select(weighted_avg(col("value"), 
col("count")).alias("avg")).execute()
 result.print()
 
 # register function
 table_env.create_temporary_function("weighted_avg", WeightedAvg())
 
 # call registered function in Table API
-result = t.group_by(t.name).select(call("weighted_avg", t.value, 
t.count).alias("avg")).execute()
+result = t.group_by(col("name")).select(call("weighted_avg", col("value"), 
col("count")).alias("avg")).execute()
 result.print()
 
 # register table
@@ -489,7 +497,7 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'),
                             ['a', 'b', 'c'])
 
 # call function "inline" without registration in Table API
-t.group_by(t.b).flat_aggregate(top2).select(col('*')).execute().print()
+t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print()
 
 # the result is:
 #      b    a
diff --git a/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md 
b/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
index 7b79eabab8e..e31d79b54ed 100644
--- a/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
+++ b/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
@@ -49,6 +49,10 @@ The following example shows how to define your own 
vectorized Python scalar func
 and use it in a query:
 
 ```python
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.expressions import col
+from pyflink.table.udf import udf
+
 @udf(result_type=DataTypes.BIGINT(), func_type="pandas")
 def add(i, j):
   return i + j
@@ -57,7 +61,7 @@ settings = EnvironmentSettings.in_batch_mode()
 table_env = TableEnvironment.create(settings)
 
 # use the vectorized Python scalar function in Python Table API
-my_table.select(add(my_table.bigint, my_table.bigint))
+my_table.select(add(col("bigint"), col("bigint")))
 
 # use the vectorized Python scalar function in SQL API
 table_env.create_temporary_function("add", add)
@@ -80,6 +84,11 @@ The following example shows how to define your own 
vectorized Python aggregate f
 and use it in `GroupBy Aggregation`, `GroupBy Window Aggregation` and `Over 
Window Aggregation`:
 
 ```python
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.expressions import col, lit
+from pyflink.table.udf import udaf
+from pyflink.table.window import Tumble
+
 @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
 def mean_udaf(v):
     return v.mean()
@@ -90,18 +99,17 @@ table_env = TableEnvironment.create(settings)
 my_table = ...  # type: Table, table schema: [a: String, b: BigInt, c: BigInt]
 
 # use the vectorized Python aggregate function in GroupBy Aggregation
-my_table.group_by(my_table.a).select(my_table.a, mean_udaf(add(my_table.b)))
+my_table.group_by(col('a')).select(col('a'), mean_udaf(col('b')))
 
 
 # use the vectorized Python aggregate function in GroupBy Window Aggregation
-tumble_window = Tumble.over(expr.lit(1).hours) \
-            .on(expr.col("rowtime")) \
+tumble_window = Tumble.over(lit(1).hours) \
+            .on(col("rowtime")) \
             .alias("w")
 
 my_table.window(tumble_window) \
-    .group_by("w") \
-    .select("w.start, w.end, mean_udaf(b)")
-
+    .group_by(col("w")) \
+    .select(col('w').start, col('w').end, mean_udaf(col('b')))
 
 # use the vectorized Python aggregate function in Over Window Aggregation
 table_env.create_temporary_function("mean_udaf", mean_udaf)
@@ -111,7 +119,6 @@ table_env.sql_query("""
         over (PARTITION BY a ORDER BY rowtime
         ROWS BETWEEN UNBOUNDED preceding AND UNBOUNDED FOLLOWING)
     FROM MyTable""")
-
 ```
 
 There are many ways to define a vectorized Python aggregate functions.
@@ -119,6 +126,8 @@ The following examples show the different ways to define a 
vectorized Python agg
 which takes two columns of bigint as the inputs and returns the sum of the 
maximum of them as the result.
 
 ```python
+from pyflink.table import DataTypes
+from pyflink.table.udf import AggregateFunction, udaf
 
 # option 1: extending the base class `AggregateFunction`
 class MaxAdd(AggregateFunction):
diff --git a/docs/content/docs/dev/table/catalogs.md 
b/docs/content/docs/dev/table/catalogs.md
index d0b7ab6be64..f166ab5f61b 100644
--- a/docs/content/docs/dev/table/catalogs.md
+++ b/docs/content/docs/dev/table/catalogs.md
@@ -245,7 +245,7 @@ schema = Schema.new_builder() \
     
 catalog_table = t_env.create_table("myhive.mydb.mytable", 
TableDescriptor.for_connector("kafka")
     .schema(schema)
-    // …
+    # …
     .build())
 
 # tables should contain "mytable"
diff --git a/docs/content/docs/dev/table/common.md 
b/docs/content/docs/dev/table/common.md
index 5fa3ead0509..3aad29f7e01 100644
--- a/docs/content/docs/dev/table/common.md
+++ b/docs/content/docs/dev/table/common.md
@@ -480,9 +480,9 @@ table_env = # see "Create a TableEnvironment" section
 orders = table_env.from_path("Orders")
 # compute revenue for all customers from France
 revenue = orders \
-    .filter(orders.cCountry == 'FRANCE') \
-    .group_by(orders.cID, orders.cName) \
-    .select(orders.cID, orders.cName, orders.revenue.sum.alias('revSum'))
+    .filter(col('cCountry') == 'FRANCE') \
+    .group_by(col('cID'), col('cName')) \
+    .select(col('cID'), col('cName'), col('revenue').sum.alias('revSum'))
 
 # emit or convert Table
 # execute query
@@ -849,7 +849,7 @@ t_env = StreamTableEnvironment.create(env)
 table1 = t_env.from_elements([(1, "hello")], ["count", "word"])
 table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
 table = table1 \
-    .where(table1.word.like('F%')) \
+    .where(col('word').like('F%')) \
     .union_all(table2)
 print(table.explain())
 
diff --git a/docs/content/docs/dev/table/data_stream_api.md 
b/docs/content/docs/dev/table/data_stream_api.md
index 28950e2e942..6dd85151de2 100644
--- a/docs/content/docs/dev/table/data_stream_api.md
+++ b/docs/content/docs/dev/table/data_stream_api.md
@@ -2883,7 +2883,7 @@ t_env = ...
 
 table = t_env.from_elements([("john", 35), ("sarah", 32)], 
               DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()),
-                            DataTypes.FIELD("age", DataTypes.INT())]))
+                             DataTypes.FIELD("age", DataTypes.INT())]))
 
 # Convert the Table into an append DataStream of Row by specifying the type 
information
 ds_row = t_env.to_append_stream(table, Types.ROW([Types.STRING(), 
Types.INT()]))
diff --git a/docs/content/docs/dev/table/tableApi.md 
b/docs/content/docs/dev/table/tableApi.md
index d4f132c5afb..08e3223b9e9 100644
--- a/docs/content/docs/dev/table/tableApi.md
+++ b/docs/content/docs/dev/table/tableApi.md
@@ -114,6 +114,7 @@ The following example shows how a Python Table API program 
is constructed and ho
 
 ```python
 from pyflink.table import *
+from pyflink.table.expressions import col
 
 # environment configuration
 t_env = TableEnvironment.create(
@@ -152,8 +153,7 @@ t_env.execute_sql(sink_ddl)
 # specify table program
 orders = t_env.from_path("Orders")  # schema (a, b, c, rowtime)
 
-orders.group_by("a").select(orders.a, 
orders.b.count.alias('cnt')).execute_insert("result").wait()
-
+orders.group_by(col("a")).select(col("a"), 
col("b").count.alias('cnt')).execute_insert("result").wait()
 ```
 
 {{< /tab >}}
@@ -208,14 +208,15 @@ val result: Table = orders
 ```python
 # specify table program
 from pyflink.table.expressions import col, lit
+from pyflink.table.window import Tumble
 
 orders = t_env.from_path("Orders")  # schema (a, b, c, rowtime)
 
-result = orders.filter(orders.a.is_not_null & orders.b.is_not_null & 
orders.c.is_not_null) \
-               .select(orders.a.lower_case.alias('a'), orders.b, 
orders.rowtime) \
-               
.window(Tumble.over(lit(1).hour).on(orders.rowtime).alias("hourly_window")) \
+result = orders.filter(col("a").is_not_null & col("b").is_not_null & 
col("c").is_not_null) \
+               .select(col("a").lower_case.alias('a'), col("b"), 
col("rowtime")) \
+               
.window(Tumble.over(lit(1).hour).on(col("rowtime")).alias("hourly_window")) \
                .group_by(col('hourly_window'), col('a')) \
-               .select(col('a'), col('hourly_window').end.alias('hour'), 
b.avg.alias('avg_billing_amount'))
+               .select(col('a'), col('hourly_window').end.alias('hour'), 
col("b").avg.alias('avg_billing_amount'))
 ```
 
 {{< /tab >}}
@@ -370,7 +371,7 @@ Table result = orders.select($"a", $"c" as "d")
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.select(orders.a, orders.c.alias('d'))
+result = orders.select(col("a"), col("c").alias('d'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -418,7 +419,7 @@ val orders: Table = tableEnv.from("Orders").as("x", "y", 
"z", "t")
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.alias("x, y, z, t")
+result = orders.alias("x", "y", "z", "t")
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -446,7 +447,7 @@ val result = orders.filter($"a" % 2 === 0)
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.where(orders.a == 'red')
+result = orders.where(col("a") == 'red')
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -469,7 +470,7 @@ val result = orders.filter($"a" % 2 === 0)
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.filter(orders.a == 'red')
+result = orders.filter(col("a") == 'red')
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -502,7 +503,7 @@ val result = orders.addColumns(concat($"c", "Sunny"))
 from pyflink.table.expressions import concat
 
 orders = t_env.from_path("Orders")
-result = orders.add_columns(concat(orders.c, 'sunny'))
+result = orders.add_columns(concat(col("c"), 'sunny'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -533,7 +534,7 @@ val result = orders.addOrReplaceColumns(concat($"c", 
"Sunny") as "desc")
 from pyflink.table.expressions import concat
 
 orders = t_env.from_path("Orders")
-result = orders.add_or_replace_columns(concat(orders.c, 'sunny').alias('desc'))
+result = orders.add_or_replace_columns(concat(col("c"), 'sunny').alias('desc'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -558,7 +559,7 @@ val result = orders.dropColumns($"b", $"c")
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.drop_columns(orders.b, orders.c)
+result = orders.drop_columns(col("b"), col("c"))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -586,7 +587,7 @@ val result = orders.renameColumns($"b" as "b2", $"c" as 
"c2")
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2'))
+result = orders.rename_columns(col("b").alias('b2'), col("c").alias('c2'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -619,7 +620,7 @@ val result = orders.groupBy($"a").select($"a", 
$"b".sum().as("d"))
 {{< tab "Python" >}}
 ```python
 orders = t_env.from_path("Orders")
-result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d'))
+result = orders.group_by(col("a")).select(col("a"), col("b").sum.alias('d'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -664,9 +665,9 @@ from pyflink.table.window import Tumble
 from pyflink.table.expressions import lit, col
 
 orders = t_env.from_path("Orders")
-result = 
orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \ 
-               .group_by(orders.a, col('w')) \
-               .select(orders.a, col('w').start, col('w').end, 
orders.b.sum.alias('d'))
+result = 
orders.window(Tumble.over(lit(5).minutes).on(col('rowtime')).alias("w")) \ 
+               .group_by(col('a'), col('w')) \
+               .select(col('a'), col('w').start, col('w').end, 
col('b').sum.alias('d'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -720,10 +721,10 @@ from pyflink.table.window import Over
 from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE
 
 orders = t_env.from_path("Orders")
-result = 
orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime)
+result = 
orders.over_window(Over.partition_by(col("a")).order_by(col("rowtime"))
                             
.preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE)
                             .alias("w")) \
-               .select(orders.a, orders.b.avg.over(col('w')), 
orders.b.max.over(col('w')), orders.b.min.over(col('w')))
+               .select(col("a"), col("b").avg.over(col('w')), 
col("b").max.over(col('w')), col("b").min.over(col('w')))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -794,22 +795,23 @@ val result = orders
 {{< tab "Python" >}}
 ```python
 from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE
+from pyflink.table.window import Over, Tumble
 
 orders = t_env.from_path("Orders")
 # Distinct aggregation on group by
-group_by_distinct_result = orders.group_by(orders.a) \
-                                 .select(orders.a, 
orders.b.sum.distinct.alias('d'))
+group_by_distinct_result = orders.group_by(col("a")) \
+                                 .select(col("a"), 
col("b").sum.distinct.alias('d'))
 # Distinct aggregation on time window group by
-group_by_window_distinct_result = orders.window(
-    
Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(orders.a, 
col('w')) \
-    .select(orders.a, orders.b.sum.distinct.alias('d'))
+group_by_window_distinct_result = 
orders.window(Tumble.over(lit(5).minutes).on(col("rowtime")).alias("w")) \
+    .group_by(col("a"), col('w')) \
+    .select(col("a"), col("b").sum.distinct.alias('d'))
 # Distinct aggregation on over window
 result = orders.over_window(Over
-                       .partition_by(orders.a)
-                       .order_by(orders.rowtime)
+                       .partition_by(col("a"))
+                       .order_by(col("rowtime"))
                        .preceding(UNBOUNDED_RANGE)
                        .alias("w")) \
-                       .select(orders.a, orders.b.avg.distinct.over(col('w')), 
orders.b.max.over(col('w')), orders.b.min.over(col('w')))
+    .select(col("a"), col("b").avg.distinct.over(col('w')), 
col("b").max.over(col('w')), col("b").min.over(col('w')))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -910,7 +912,7 @@ from pyflink.table.expressions import col
 
 left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
 right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
-result = left.join(right).where(left.a == right.d).select(left.a, left.b, 
right.e)
+result = left.join(right).where(col('a') == col('d')).select(col('a'), 
col('b'), col('e'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -956,9 +958,9 @@ from pyflink.table.expressions import col
 left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
 right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
 
-left_outer_result = left.left_outer_join(right, left.a == 
right.d).select(left.a, left.b, right.e)
-right_outer_result = left.right_outer_join(right, left.a == 
right.d).select(left.a, left.b, right.e)
-full_outer_result = left.full_outer_join(right, left.a == 
right.d).select(left.a, left.b, right.e)
+left_outer_result = left.left_outer_join(right, col('a') == 
col('d')).select(col('a'), col('b'), col('e'))
+right_outer_result = left.right_outer_join(right, col('a') == 
col('d')).select(col('a'), col('b'), col('e'))
+full_outer_result = left.full_outer_join(right, col('a') == 
col('d')).select(col('a'), col('b'), col('e'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1006,8 +1008,8 @@ from pyflink.table.expressions import col
 left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'), 
col('rowtime1'))
 right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'), 
col('rowtime2'))
   
-joined_table = left.join(right).where((left.a == right.d) & (left.rowtime1 >= 
right.rowtime2 - lit(1).second) & (left.rowtime1 <= right.rowtime2 + 
lit(2).seconds))
-result = joined_table.select(joined_table.a, joined_table.b, joined_table.e, 
joined_table.rowtime1)
+joined_table = left.join(right).where((col('a') == col('d')) & 
(col('rowtime1') >= col('rowtime2') - lit(1).second) & (col('rowtime1') <= 
col('rowtime2') + lit(2).seconds))
+result = joined_table.select(col('a'), col('b'), col('e'), col('rowtime1'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1053,8 +1055,8 @@ def split(x):
 
 # join
 orders = t_env.from_path("Orders")
-joined_table = orders.join_lateral(split(orders.c).alias("s, t, v"))
-result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, 
joined_table.t, joined_table.v)
+joined_table = orders.join_lateral(split(col('c')).alias("s", "t", "v"))
+result = joined_table.select(col('a'), col('b'), col('s'), col('t'), col('v'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1102,8 +1104,8 @@ def split(x):
 
 # join
 orders = t_env.from_path("Orders")
-joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s, t, v"))
-result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, 
joined_table.t, joined_table.v)
+joined_table = orders.left_outer_join_lateral(split(col('c')).alias("s", "t", 
"v"))
+result = joined_table.select(col('a'), col('b'), col('s'), col('t'), col('v'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1369,7 +1371,7 @@ val result = left.select($"a", $"b", 
$"c").where($"a".in(right))
 left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
 right = t_env.from_path("Source2").select(col('a'))
 
-result = left.select(left.a, left.b, left.c).where(left.a.in_(right))
+result = left.select(col('a'), col('b'), col('c')).where(col('a').in_(right))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1389,17 +1391,17 @@ Similar to a SQL `ORDER BY` clause. Returns records 
globally sorted across all p
 {{< tabs "orderby" >}}
 {{< tab "Java" >}}
 ```java
-Table result = in.orderBy($("a").asc());
+Table result = tab.orderBy($("a").asc());
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-val result = in.orderBy($"a".asc)
+val result = tab.orderBy($"a".asc)
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
 ```python
-result = in.order_by(in.a.asc)
+result = tab.order_by(col('a').asc)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1439,13 +1441,13 @@ val result3: Table = 
in.orderBy($"a".asc).offset(10).fetch(5)
 {{< tab "Python" >}}
 ```python
 # returns the first 5 records from the sorted result
-result1 = table.order_by(table.a.asc).fetch(5)
+result1 = table.order_by(col('a').asc).fetch(5)
 
 # skips the first 3 records and returns all following records from the sorted 
result
-result2 = table.order_by(table.a.asc).offset(3)
+result2 = table.order_by(col('a').asc).offset(3)
 
 # skips the first 10 records and returns the next 5 records from the sorted 
result
-result3 = table.order_by(table.a.asc).offset(10).fetch(5)
+result3 = table.order_by(col('a').asc).offset(10).fetch(5)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1517,7 +1519,7 @@ The following example shows how to define a window 
aggregation on a table.
 ```python
 # define window with alias w, group the table by window w, then aggregate
 table = input.window([w: GroupWindow].alias("w")) \
-             .group_by(col('w')).select(input.b.sum)
+             .group_by(col('w')).select(col('b').sum)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1553,7 +1555,7 @@ The following example shows how to define a window 
aggregation with additional g
 # define window with alias w, group the table by attribute a and window w,
 # then aggregate
 table = input.window([w: GroupWindow].alias("w")) \
-             .group_by(col('w'), input.a).select(input.b.sum)
+             .group_by(col('w'), col('a')).select(col('b').sum)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1582,8 +1584,8 @@ val table = input
 # define window with alias w, group the table by attribute a and window w,
 # then aggregate and add window start, end, and rowtime timestamps
 table = input.window([w: GroupWindow].alias("w")) \
-             .group_by(col('w'), input.a) \
-             .select(input.a, col('w').start, col('w').end, col('w').rowtime, 
input.b.count)
+             .group_by(col('w'), col('a')) \
+             .select(col('a'), col('w').start, col('w').end, col('w').rowtime, 
col('b').count)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -1989,7 +1991,7 @@ val table = input
 ```python
 # define over window with alias w and aggregate over the over window w
 table = input.over_window([w: OverWindow].alias("w")) \
-    .select(input.a, input.b.sum.over(col('w')), input.c.min.over(col('w')))
+    .select(col('a'), col('b').sum.over(col('w')), col('c').min.over(col('w')))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -2216,9 +2218,9 @@ func = udf(map_function, result_type=DataTypes.ROW(
 table = input.map(func).alias('a', 'b')
 
 # map operation with a python vectorized scalar function
-pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW(
-                                                    [DataTypes.FIELD("a", 
DataTypes.BIGINT()),
-                                                    DataTypes.FIELD("b", 
DataTypes.BIGINT()))]),
+pandas_func = udf(lambda x: x * 2,
+                  result_type=DataTypes.ROW([DataTypes.FIELD("a", 
DataTypes.BIGINT()),
+                                             DataTypes.FIELD("b", 
DataTypes.BIGINT())]),
                   func_type='pandas')
 
 table = input.map(pandas_func).alias('a', 'b')
@@ -2450,9 +2452,9 @@ agg = udaf(function,
            name=str(function.__class__.__name__))
 
 # aggregate with a python general aggregate function
-result = t.group_by(t.a) \
+result = t.group_by(col('a')) \
     .aggregate(agg.alias("c", "d")) \
-    .select("a, c, d")
+    .select(col('a'), col('c'), col('d'))
     
 # aggregate with a python vectorized aggregate function
 pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
@@ -2461,8 +2463,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                         DataTypes.FIELD("b", DataTypes.INT())]),
                    func_type="pandas")
 t.aggregate(pandas_udaf.alias("a", "b")) \
-    .select("a, b")
-
+ .select(col('a'), col('b'))
 ```
 
 {{< /tab >}}
@@ -2504,20 +2505,22 @@ val table = input
 ```python
 from pyflink.table import DataTypes
 from pyflink.table.udf import AggregateFunction, udaf
+from pyflink.table.expressions import col, lit
+from pyflink.table.window import Tumble
 
 pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                    result_type=DataTypes.ROW(
                        [DataTypes.FIELD("a", DataTypes.FLOAT()),
                         DataTypes.FIELD("b", DataTypes.INT())]),
                    func_type="pandas")
-tumble_window = Tumble.over(expr.lit(1).hours) \
-    .on(expr.col("rowtime")) \
+tumble_window = Tumble.over(lit(1).hours) \
+    .on(col("rowtime")) \
     .alias("w")
-t.select(t.b, t.rowtime) \
+t.select(col('b'), col('rowtime')) \
     .window(tumble_window) \
-    .group_by("w") \
+    .group_by(col("w")) \
     .aggregate(pandas_udaf.alias("d", "e")) \
-    .select("w.rowtime, d, e")
+    .select(col('w').rowtime, col('d'), col('e'))
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -2667,6 +2670,7 @@ Similar to a **GroupBy Aggregation**. Groups the rows on 
the grouping keys with
 from pyflink.common import Row
 from pyflink.table.udf import TableAggregateFunction, udtaf
 from pyflink.table import DataTypes
+from pyflink.table.expressions import col
 
 class Top2(TableAggregateFunction):
 
@@ -2702,13 +2706,13 @@ t = t_env.from_elements([(1, 'Hi', 'Hello'),
                               (3, 'Hi', 'hi'),
                               (5, 'Hi2', 'hi'),
                               (7, 'Hi', 'Hello'),
-                              (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
-result = t.select(t.a, t.c) \
-    .group_by(t.c) \
+                              (2, 'Hi', 'Hello')],
+                        ['a', 'b', 'c'])
+result = t.select(col('a'), col('c')) \
+    .group_by(col('c')) \
     .flat_aggregate(mytop) \
-    .select(t.a) \
+    .select(col('a')) \
     .flat_aggregate(mytop.alias("b"))
-
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git 
a/flink-python/pyflink/examples/table/mixing_use_of_datastream_and_table.py 
b/flink-python/pyflink/examples/table/mixing_use_of_datastream_and_table.py
index dc518f4e3d2..3dc9b7d82dd 100644
--- a/flink-python/pyflink/examples/table/mixing_use_of_datastream_and_table.py
+++ b/flink-python/pyflink/examples/table/mixing_use_of_datastream_and_table.py
@@ -21,6 +21,7 @@ import sys
 from pyflink.common import Types
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.table import (DataTypes, TableDescriptor, Schema, 
StreamTableEnvironment)
+from pyflink.table.expressions import col
 from pyflink.table.udf import udf
 
 
@@ -55,7 +56,7 @@ def mixing_use_of_datastream_and_table():
 
     # perform table api operations
     table = t_env.from_path("source")
-    table = table.select(table.id, length(table.data))
+    table = table.select(col('id'), length(col('data')))
 
     # convert table to datastream and perform datastream api operations
     ds = t_env.to_data_stream(table)
diff --git a/flink-python/pyflink/examples/table/pandas/pandas_udaf.py 
b/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
index e4e8b9ea522..72febe590e7 100644
--- a/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
+++ b/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
@@ -55,7 +55,7 @@ def pandas_udaf():
               .column("f2", DataTypes.FLOAT())
               .watermark("ts", "ts - INTERVAL '3' SECOND")
               .build()
-    ).alias("ts, name, price")
+    ).alias("ts", "name", "price")
 
     # define the sink
     t_env.create_temporary_table(
@@ -75,8 +75,8 @@ def pandas_udaf():
 
     # define the tumble window operation
     table = table.window(Tumble.over(lit(5).seconds).on(col("ts")).alias("w")) 
\
-                 .group_by(table.name, col('w')) \
-                 .select(table.name, mean_udaf(table.price), col("w").start, 
col("w").end)
+                 .group_by(col('name'), col('w')) \
+                 .select(col('name'), mean_udaf(col('price')), col("w").start, 
col("w").end)
 
     # submit for execution
     table.execute_insert('sink') \
diff --git a/flink-python/pyflink/examples/table/process_json_data.py 
b/flink-python/pyflink/examples/table/process_json_data.py
index eeb6f7c88cd..b410d11c58e 100644
--- a/flink-python/pyflink/examples/table/process_json_data.py
+++ b/flink-python/pyflink/examples/table/process_json_data.py
@@ -20,6 +20,7 @@ import sys
 
 from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes, 
TableDescriptor,
                            Schema)
+from pyflink.table.expressions import col
 
 
 def process_json_data():
@@ -45,7 +46,7 @@ def process_json_data():
                                .build())
                        .build())
 
-    table = table.select(table.id, table.data.json_value('$.addr.country', 
DataTypes.STRING()))
+    table = table.select(col('id'), col('data').json_value('$.addr.country', 
DataTypes.STRING()))
 
     # execute
     table.execute_insert('sink') \
diff --git a/flink-python/pyflink/examples/table/process_json_data_with_udf.py 
b/flink-python/pyflink/examples/table/process_json_data_with_udf.py
index 31969e2f675..934e73a68d8 100644
--- a/flink-python/pyflink/examples/table/process_json_data_with_udf.py
+++ b/flink-python/pyflink/examples/table/process_json_data_with_udf.py
@@ -21,6 +21,7 @@ import sys
 
 from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes, 
TableDescriptor,
                            Schema)
+from pyflink.table.expressions import col
 from pyflink.table.udf import udf
 
 
@@ -54,7 +55,7 @@ def process_json_data_with_udf():
         json_data['tel'] += 1
         return json.dumps(json_data)
 
-    table = table.select(table.id, update_tel(table.data))
+    table = table.select(col('id'), update_tel(col('data')))
 
     # execute
     table.execute_insert('sink') \
diff --git a/flink-python/pyflink/examples/table/windowing/over_window.py 
b/flink-python/pyflink/examples/table/windowing/over_window.py
index 982d6b860d4..d958e75ddc1 100644
--- a/flink-python/pyflink/examples/table/windowing/over_window.py
+++ b/flink-python/pyflink/examples/table/windowing/over_window.py
@@ -54,7 +54,7 @@ def tumble_window_demo():
               .column("f2", DataTypes.FLOAT())
               .watermark("ts", "ts - INTERVAL '3' SECOND")
               .build()
-    ).alias("ts, name, price")
+    ).alias("ts", "name", "price")
 
     # define the sink
     t_env.create_temporary_table(
@@ -68,12 +68,12 @@ def tumble_window_demo():
 
     # define the over window operation
     table = table.over_window(
-        Over.partition_by("name")
-            .order_by("ts")
+        Over.partition_by(col("name"))
+            .order_by(col("ts"))
             .preceding(row_interval(2))
             .following(CURRENT_ROW)
             .alias('w')) \
-        .select(table.name, table.price.max.over(col('w')))
+        .select(col('name'), col('price').max.over(col('w')))
 
     # submit for execution
     table.execute_insert('sink') \
diff --git a/flink-python/pyflink/examples/table/windowing/session_window.py 
b/flink-python/pyflink/examples/table/windowing/session_window.py
index 5b40a7b9af8..e7506b0f8b3 100644
--- a/flink-python/pyflink/examples/table/windowing/session_window.py
+++ b/flink-python/pyflink/examples/table/windowing/session_window.py
@@ -52,7 +52,7 @@ def session_window_demo():
               .column("f2", DataTypes.FLOAT())
               .watermark("ts", "ts - INTERVAL '3' SECOND")
               .build()
-    ).alias("ts, name, price")
+    ).alias("ts", "name", "price")
 
     # define the sink
     t_env.create_temporary_table(
@@ -68,8 +68,8 @@ def session_window_demo():
 
     # define the session window operation
     table = 
table.window(Session.with_gap(lit(5).seconds).on(col("ts")).alias("w")) \
-                 .group_by(table.name, col('w')) \
-                 .select(table.name, table.price.sum, col("w").start, 
col("w").end)
+                 .group_by(col('name'), col('w')) \
+                 .select(col('name'), col('price').sum, col("w").start, 
col("w").end)
 
     # submit for execution
     table.execute_insert('sink') \
diff --git a/flink-python/pyflink/examples/table/windowing/sliding_window.py 
b/flink-python/pyflink/examples/table/windowing/sliding_window.py
index 1b8bb150fd2..10ba068b1a5 100644
--- a/flink-python/pyflink/examples/table/windowing/sliding_window.py
+++ b/flink-python/pyflink/examples/table/windowing/sliding_window.py
@@ -54,7 +54,7 @@ def sliding_window_demo():
               .column("f2", DataTypes.FLOAT())
               .watermark("ts", "ts - INTERVAL '3' SECOND")
               .build()
-    ).alias("ts, name, price")
+    ).alias("ts", "name", "price")
 
     # define the sink
     t_env.create_temporary_table(
@@ -70,8 +70,8 @@ def sliding_window_demo():
 
     # define the sliding window operation
     table = 
table.window(Slide.over(lit(5).seconds).every(lit(2).seconds).on(col("ts")).alias("w"))\
-                 .group_by(table.name, col('w')) \
-                 .select(table.name, table.price.sum, col("w").start, 
col("w").end)
+                 .group_by(col('name'), col('w')) \
+                 .select(col('name'), col('price').sum, col("w").start, 
col("w").end)
 
     # submit for execution
     table.execute_insert('sink') \
diff --git a/flink-python/pyflink/examples/table/windowing/tumble_window.py 
b/flink-python/pyflink/examples/table/windowing/tumble_window.py
index dd3ba2ea408..ff5bb99516e 100644
--- a/flink-python/pyflink/examples/table/windowing/tumble_window.py
+++ b/flink-python/pyflink/examples/table/windowing/tumble_window.py
@@ -54,7 +54,7 @@ def tumble_window_demo():
               .column("f2", DataTypes.FLOAT())
               .watermark("ts", "ts - INTERVAL '3' SECOND")
               .build()
-    ).alias("ts, name, price")
+    ).alias("ts", "name", "price")
 
     # define the sink
     t_env.create_temporary_table(
@@ -70,8 +70,8 @@ def tumble_window_demo():
 
     # define the tumble window operation
     table = table.window(Tumble.over(lit(5).seconds).on(col("ts")).alias("w")) 
\
-                 .group_by(table.name, col('w')) \
-                 .select(table.name, table.price.sum, col("w").start, 
col("w").end)
+                 .group_by(col('name'), col('w')) \
+                 .select(col('name'), col('price').sum, col("w").start, 
col("w").end)
 
     # submit for execution
     table.execute_insert('sink') \
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 89e11e08f11..6cf95e441ea 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -911,7 +911,7 @@ class Expression(Generic[T]):
         ::
 
             >>> tab.where(col("a").in_(1, 2, 3))
-            >>> table_a.where(col("x").in_(table_b.select("y")))
+            >>> table_a.where(col("x").in_(table_b.select(col("y"))))
         """
         from pyflink.table import Table
         if isinstance(first_element_or_table, Table):
diff --git a/flink-python/pyflink/table/schema.py 
b/flink-python/pyflink/table/schema.py
index 5903557783e..d8856bf1669 100644
--- a/flink-python/pyflink/table/schema.py
+++ b/flink-python/pyflink/table/schema.py
@@ -139,9 +139,9 @@ class Schema(object):
             Example:
             ::
 
-                >>> Schema.new_builder().
-                ...  column_by_expression("ts", "orig_ts - INTERVAL '60' 
MINUTE").
-                ...  column_by_metadata("orig_ts", DataTypes.TIMESTAMP(3), 
"timestamp")
+                >>> Schema.new_builder() \\
+                ...     .column_by_expression("ts", "orig_ts - INTERVAL '60' 
MINUTE") \\
+                ...     .column_by_metadata("orig_ts", DataTypes.TIMESTAMP(3), 
"timestamp")
 
             :param column_name: Column name
             :param expr: Computation of the column
diff --git a/flink-python/pyflink/table/table.py 
b/flink-python/pyflink/table/table.py
index 200cafa2342..26b57c805e6 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -84,7 +84,7 @@ class Table(object):
     Example:
     ::
 
-        >>> from pyflink.table import TableEnvironment
+        >>> from pyflink.table import EnvironmentSettings, TableEnvironment
         >>> from pyflink.table.expressions import *
         >>> env_settings = EnvironmentSettings.in_streaming_mode()
         >>> t_env = TableEnvironment.create(env_settings)
@@ -122,11 +122,9 @@ class Table(object):
         Example:
         ::
 
-            >>> from pyflink.table import expressions as expr
-            >>> tab.select(tab.key, expr.concat(tab.value, 'hello'))
-            >>> tab.select(expr.col('key'), expr.concat(expr.col('value'), 
'hello'))
-
-            >>> tab.select("key, value + 'hello'")
+            >>> from pyflink.table.expressions import col, concat
+            >>> tab.select(tab.key, concat(tab.value, 'hello'))
+            >>> tab.select(col('key'), concat(col('value'), 'hello'))
 
         :return: The result table.
         """
@@ -146,7 +144,6 @@ class Table(object):
         ::
 
             >>> tab.alias("a", "b", "c")
-            >>> tab.alias("a, b, c")
 
         :param field: Field alias.
         :param fields: Additional field aliases.
@@ -164,8 +161,7 @@ class Table(object):
         Example:
         ::
 
-            >>> tab.filter(tab.name == 'Fred')
-            >>> tab.filter("name = 'Fred'")
+            >>> tab.filter(col('name') == 'Fred')
 
         :param predicate: Predicate expression string.
         :return: The result table.
@@ -180,8 +176,7 @@ class Table(object):
         Example:
         ::
 
-            >>> tab.where(tab.name == 'Fred')
-            >>> tab.where("name = 'Fred'")
+            >>> tab.where(col('name') == 'Fred')
 
         :param predicate: Predicate expression string.
         :return: The result table.
@@ -196,8 +191,7 @@ class Table(object):
         Example:
         ::
 
-            >>> tab.group_by(tab.key).select(tab.key, tab.value.avg)
-            >>> tab.group_by("key").select("key, value.avg")
+            >>> tab.group_by(col('key')).select(col('key'), col('value').avg)
 
         :param fields: Group keys.
         :return: The grouped table.
@@ -216,7 +210,7 @@ class Table(object):
         Example:
         ::
 
-            >>> tab.select(tab.key, tab.value).distinct()
+            >>> tab.select(col('key'), col('value')).distinct()
 
         :return: The result table.
         """
@@ -236,9 +230,8 @@ class Table(object):
         Example:
         ::
 
-            >>> left.join(right).where((left.a == right.b) && (left.c > 3))
-            >>> left.join(right).where("a = b && c > 3")
-            >>> left.join(right, left.a == right.b)
+            >>> left.join(right).where((col('a') == col('b')) && (col('c') > 
3))
+            >>> left.join(right, col('a') == col('b'))
 
         :param right: Right table.
         :param join_predicate: Optional, the join predicate expression string.
@@ -267,8 +260,7 @@ class Table(object):
         ::
 
             >>> left.left_outer_join(right)
-            >>> left.left_outer_join(right, left.a == right.b)
-            >>> left.left_outer_join(right, "a = b")
+            >>> left.left_outer_join(right, col('a') == col('b'))
 
         :param right: Right table.
         :param join_predicate: Optional, the join predicate expression string.
@@ -296,8 +288,7 @@ class Table(object):
         Example:
         ::
 
-            >>> left.right_outer_join(right, left.a == right.b)
-            >>> left.right_outer_join(right, "a = b")
+            >>> left.right_outer_join(right, col('a') == col('b'))
 
         :param right: Right table.
         :param join_predicate: The join predicate expression string.
@@ -322,8 +313,7 @@ class Table(object):
         Example:
         ::
 
-            >>> left.full_outer_join(right, left.a == right.b)
-            >>> left.full_outer_join(right, "a = b")
+            >>> left.full_outer_join(right, col('a') == col('b'))
 
         :param right: Right table.
         :param join_predicate: The join predicate expression string.
@@ -343,12 +333,10 @@ class Table(object):
         Example:
         ::
 
+            >>> from pyflink.table.expressions import *
             >>> t_env.create_java_temporary_system_function("split",
-           ...     "java.table.function.class.name")
-            >>> tab.join_lateral("split(text, ' ') as (b)", "a = b")
-
-            >>> from pyflink.table import expressions as expr
-            >>> tab.join_lateral(expr.call('split', ' ').alias('b'), 
expr.col('a') == expr.col('b'))
+            ...     "java.table.function.class.name")
+            >>> tab.join_lateral(call('split', ' ').alias('b'), col('a') == 
col('b'))
             >>> # take all the columns as inputs
             >>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
             ... def split_row(row: Row):
@@ -394,9 +382,9 @@ class Table(object):
 
             >>> t_env.create_java_temporary_system_function("split",
             ...     "java.table.function.class.name")
-            >>> tab.left_outer_join_lateral("split(text, ' ') as (b)")
-            >>> from pyflink.table import expressions as expr
-            >>> tab.left_outer_join_lateral(expr.call('split', ' ').alias('b'))
+            >>> from pyflink.table.expressions import *
+            >>> tab.left_outer_join_lateral(call('split', ' ').alias('b'))
+
             >>> # take all the columns as inputs
             >>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
             ... def split_row(row: Row):
@@ -560,8 +548,7 @@ class Table(object):
         Example:
         ::
 
-            >>> tab.order_by(tab.name.desc)
-            >>> tab.order_by("name.desc")
+            >>> tab.order_by(col('name').desc)
 
         For unbounded tables, this operation requires a sorting on a time 
attribute or a subsequent
         fetch operation.
@@ -588,10 +575,9 @@ class Table(object):
         ::
 
             # skips the first 3 rows and returns all following rows.
-            >>> tab.order_by(tab.name.desc).offset(3)
-            >>> tab.order_by("name.desc").offset(3)
+            >>> tab.order_by(col('name').desc).offset(3)
             # skips the first 10 rows and returns the next 5 rows.
-            >>> tab.order_by(tab.name.desc).offset(10).fetch(5)
+            >>> tab.order_by(col('name').desc).offset(10).fetch(5)
 
         For unbounded tables, this operation requires a subsequent fetch 
operation.
 
@@ -613,13 +599,12 @@ class Table(object):
         Returns the first 3 records.
         ::
 
-            >>> tab.order_by(tab.name.desc).fetch(3)
-            >>> tab.order_by("name.desc").fetch(3)
+            >>> tab.order_by(col('name').desc).fetch(3)
 
         Skips the first 10 rows and returns the next 5 rows.
         ::
 
-            >>> tab.order_by(tab.name.desc).offset(10).fetch(5)
+            >>> tab.order_by(col('name').desc).offset(10).fetch(5)
 
         :param fetch: The number of records to return. Fetch must be >= 0.
         :return: The result table.
@@ -675,10 +660,10 @@ class Table(object):
         Example:
         ::
 
-            >>> from pyflink.table import expressions as expr
-            >>> 
tab.window(Tumble.over(expr.lit(10).minutes).on(tab.rowtime).alias('w')) \\
+            >>> from pyflink.table.expressions import col, lit
+            >>> 
tab.window(Tumble.over(lit(10).minutes).on(col('rowtime')).alias('w')) \\
             ...     .group_by(col('w')) \\
-            ...     .select(tab.a.sum.alias('a'),
+            ...     .select(col('a').sum.alias('a'),
             ...             col('w').start.alias('b'),
             ...             col('w').end.alias('c'),
             ...             col('w').rowtime.alias('d'))
@@ -700,10 +685,10 @@ class Table(object):
         Example:
         ::
 
-            >>> from pyflink.table import expressions as expr
-            >>> tab.over_window(Over.partition_by(tab.c).order_by(tab.rowtime) 
\\
+            >>> from pyflink.table.expressions import col, lit
+            >>> 
tab.over_window(Over.partition_by(col('c')).order_by(col('rowtime')) \\
             ...     .preceding(lit(10).seconds).alias("ow")) \\
-            ...     .select(tab.c, tab.b.count.over(col('ow'), 
tab.e.sum.over(col('ow'))))
+            ...     .select(col('c'), col('b').count.over(col('ow'), 
col('e').sum.over(col('ow'))))
 
         .. note::
 
@@ -732,9 +717,8 @@ class Table(object):
         Example:
         ::
 
-            >>> from pyflink.table import expressions as expr
-            >>> tab.add_columns((tab.a + 1).alias('a1'), expr.concat(tab.b, 
'sunny').alias('b1'))
-            >>> tab.add_columns("a + 1 as a1, concat(b, 'sunny') as b1")
+            >>> from pyflink.table.expressions import col, concat
+            >>> tab.add_columns((col('a') + 1).alias('a1'), concat(col('b'), 
'sunny').alias('b1'))
 
         :param fields: Column list string.
         :return: The result table.
@@ -756,10 +740,9 @@ class Table(object):
         Example:
         ::
 
-            >>> from pyflink.table import expressions as expr
-            >>> tab.add_or_replace_columns((tab.a + 1).alias('a1'),
-            ...                            expr.concat(tab.b, 
'sunny').alias('b1'))
-            >>> tab.add_or_replace_columns("a + 1 as a1, concat(b, 'sunny') as 
b1")
+            >>> from pyflink.table.expressions import col, concat
+            >>> tab.add_or_replace_columns((col('a') + 1).alias('a1'),
+            ...                            concat(col('b'), 
'sunny').alias('b1'))
 
         :param fields: Column list string.
         :return: The result table.
@@ -780,8 +763,7 @@ class Table(object):
         Example:
         ::
 
-            >>> tab.rename_columns(tab.a.alias('a1'), tab.b.alias('b1'))
-            >>> tab.rename_columns("a as a1, b as b1")
+            >>> tab.rename_columns(col('a').alias('a1'), col('b').alias('b1'))
 
         :param fields: Column list string.
         :return: The result table.
@@ -801,8 +783,7 @@ class Table(object):
         Example:
         ::
 
-            >>> tab.drop_columns(tab.a, tab.b)
-            >>> tab.drop_columns("a, b")
+            >>> tab.drop_columns(col('a'), col('b'))
 
         :param fields: Column list string.
         :return: The result table.
@@ -824,7 +805,7 @@ class Table(object):
 
             >>> add = udf(lambda x: Row(x + 1, x * x), 
result_type=DataTypes.Row(
             ... [DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b", 
DataTypes.INT())]))
-            >>> tab.map(add(tab.a)).alias("a, b")
+            >>> tab.map(add(col('a'))).alias("a", "b")
             >>> # take all the columns as inputs
             >>> identity = udf(lambda row: row, result_type=DataTypes.Row(
             ... [DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b", 
DataTypes.INT())]))
@@ -854,7 +835,7 @@ class Table(object):
             ... def split(x, string):
             ...     for s in string.split(","):
             ...         yield x, s
-            >>> tab.flat_map(split(tab.a, table.b))
+            >>> tab.flat_map(split(col('a'), col('b')))
             >>> # take all the columns as inputs
             >>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
             ... def split_row(row: Row):
@@ -889,7 +870,7 @@ class Table(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> tab.aggregate(agg(tab.a).alias("a", "b")).select("a, b")
+            >>> tab.aggregate(agg(col('a')).alias("a", "b")).select(col('a'), 
col('b'))
             >>> # take all the columns as inputs
             >>> # pd is a Pandas.DataFrame
             >>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.a.max()),
@@ -897,7 +878,7 @@ class Table(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> tab.aggregate(agg.alias("a, b")).select("a, b")
+            >>> tab.aggregate(agg.alias("a", "b")).select(col("a"), col("b"))
 
         :param func: user-defined aggregate function.
         :return: The result table.
@@ -928,7 +909,7 @@ class Table(object):
         ::
 
             >>> table_agg = udtaf(MyTableAggregateFunction())
-            >>> tab.flat_aggregate(table_agg(tab.a).alias("a", 
"b")).select("a, b")
+            >>> tab.flat_aggregate(table_agg(col('a')).alias("a", 
"b")).select(col('a'), col('b'))
             >>> # take all the columns as inputs
             >>> class Top2(TableAggregateFunction):
             ...     def emit_value(self, accumulator):
@@ -954,7 +935,7 @@ class Table(object):
             ...         return DataTypes.ROW(
             ...             [DataTypes.FIELD("a", DataTypes.BIGINT())])
             >>> top2 = udtaf(Top2())
-            >>> tab.flat_aggregate(top2.alias("a", "b")).select("a, b")
+            >>> tab.flat_aggregate(top2.alias("a", "b")).select(col("a"), 
col("b"))
 
         :param func: user-defined table aggregate function.
         :return: The result table.
@@ -985,7 +966,7 @@ class Table(object):
 
             >>> pdf = pd.DataFrame(np.random.rand(1000, 2))
             >>> table = table_env.from_pandas(pdf, ["a", "b"])
-            >>> table.filter(table.a > 0.5).to_pandas()
+            >>> table.filter(col('a') > 0.5).to_pandas()
 
         :return: the result pandas DataFrame.
 
@@ -1156,9 +1137,7 @@ class GroupedTable(object):
         Example:
         ::
 
-            >>> tab.group_by(tab.key).select(tab.key, 
tab.value.avg.alias('average'))
-            >>> tab.group_by("key").select("key, value.avg as average")
-
+            >>> tab.group_by(col('key')).select(col('key'), 
col('value').avg.alias('average'))
 
         :param fields: Expression string that contains group keys and 
aggregate function calls.
         :return: The result table.
@@ -1184,7 +1163,8 @@ class GroupedTable(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> tab.group_by(tab.a).aggregate(agg(tab.b).alias("c", 
"d")).select("a, c, d")
+            >>> tab.group_by(col('a')).aggregate(agg(col('b')).alias("c", 
"d")).select(
+            ...     col('a'), col('c'), col('d'))
             >>> # take all the columns as inputs
             >>> # pd is a Pandas.DataFrame
             >>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()),
@@ -1192,7 +1172,7 @@ class GroupedTable(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> tab.group_by(tab.a).aggregate(agg.alias("a, b")).select("a, b")
+            >>> tab.group_by(col('a')).aggregate(agg.alias("a", 
"b")).select(col('a'), col('b'))
 
         :param func: user-defined aggregate function.
         :return: The result table.
@@ -1223,7 +1203,8 @@ class GroupedTable(object):
         ::
 
             >>> table_agg = udtaf(MyTableAggregateFunction())
-            >>> 
tab.group_by(tab.c).flat_aggregate(table_agg(tab.a).alias("a")).select("c, a")
+            >>> 
tab.group_by(col('c')).flat_aggregate(table_agg(col('a')).alias("a")).select(
+            ...     col('c'), col('a'))
             >>> # take all the columns as inputs
             >>> class Top2(TableAggregateFunction):
             ...     def emit_value(self, accumulator):
@@ -1249,7 +1230,9 @@ class GroupedTable(object):
             ...         return DataTypes.ROW(
             ...             [DataTypes.FIELD("a", DataTypes.BIGINT())])
             >>> top2 = udtaf(Top2())
-            >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a", 
"b")).select("a, b")
+            >>> tab.group_by(col('c')) \\
+            ...    .flat_aggregate(top2.alias("a", "b")) \\
+            ...    .select(col('a'), col('b'))
 
         :param func: user-defined table aggregate function.
         :return: The result table.
@@ -1294,10 +1277,10 @@ class GroupWindowedTable(object):
         Example:
         ::
 
-            >>> from pyflink.table import expressions as expr
-            >>> 
tab.window(Tumble.over(expr.lit(10).minutes).on(tab.rowtime).alias('w')) \\
+            >>> from pyflink.table.expressions import col, lit
+            >>> 
tab.window(Tumble.over(lit(10).minutes).on(col('rowtime')).alias('w')) \\
             ...     .group_by(col('w')) \\
-            ...     .select(tab.a.sum.alias('a'),
+            ...     .select(col('a').sum.alias('a'),
             ...             col('w').start.alias('b'),
             ...             col('w').end.alias('c'),
             ...             col('w').rowtime.alias('d'))
@@ -1335,7 +1318,6 @@ class WindowGroupedTable(object):
             >>> window_grouped_table.select(col('key'),
             ...                             col('window').start,
             ...                             col('value').avg.alias('valavg'))
-            >>> window_grouped_table.select("key, window.start, value.avg as 
valavg")
 
         :param fields: Expression string.
         :return: The result table.
@@ -1361,10 +1343,10 @@ class WindowGroupedTable(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> window_grouped_table.group_by("w") \
-            ...     .aggregate(agg(window_grouped_table.b) \
-            ...     .alias("c", "d")) \
-            ...     .select("c, d")
+            >>> window_grouped_table.group_by(col("w")) \\
+            ...     .aggregate(agg(col('b'))) \\
+            ...     .alias("c", "d") \\
+            ...     .select(col('c'), col('d'))
             >>> # take all the columns as inputs
             >>> # pd is a Pandas.DataFrame
             >>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()),
@@ -1372,7 +1354,7 @@ class WindowGroupedTable(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> window_grouped_table.group_by("w, a").aggregate(agg_row)
+            >>> window_grouped_table.group_by(col("w"), 
col("a")).aggregate(agg_row)
 
         :param func: user-defined aggregate function.
         :return: The result table.
@@ -1427,7 +1409,6 @@ class OverWindowedTable(object):
             >>> over_windowed_table.select(col('c'),
             ...                            col('b').count.over(col('ow')),
             ...                            col('e').sum.over(col('ow')))
-            >>> over_windowed_table.select("c, b.count over ow, e.sum over ow")
 
         :param fields: Expression string.
         :return: The result table.
@@ -1462,7 +1443,7 @@ class AggregatedTable(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> tab.aggregate(agg(tab.a).alias("a", "b")).select("a, b")
+            >>> tab.aggregate(agg(col('a')).alias("a", "b")).select(col('a'), 
col('b'))
             >>> # take all the columns as inputs
             >>> # pd is a Pandas.DataFrame
             >>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()),
@@ -1470,7 +1451,7 @@ class AggregatedTable(object):
             ...                   [DataTypes.FIELD("a", DataTypes.FLOAT()),
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
-            >>> tab.group_by(tab.a).aggregate(agg.alias("a, b")).select("a, b")
+            >>> tab.group_by(col('a')).aggregate(agg.alias("a", 
"b")).select(col('a'), col('b'))
 
         :param fields: Expression string.
         :return: The result table.
@@ -1502,7 +1483,7 @@ class FlatAggregateTable(object):
         ::
 
             >>> table_agg = udtaf(MyTableAggregateFunction())
-            >>> tab.flat_aggregate(table_agg(tab.a).alias("a", 
"b")).select("a, b")
+            >>> tab.flat_aggregate(table_agg(col('a')).alias("a", 
"b")).select(col('a'), col('b'))
             >>> # take all the columns as inputs
             >>> class Top2(TableAggregateFunction):
             ...     def emit_value(self, accumulator):
@@ -1528,7 +1509,9 @@ class FlatAggregateTable(object):
             ...         return DataTypes.ROW(
             ...             [DataTypes.FIELD("a", DataTypes.BIGINT())])
             >>> top2 = udtaf(Top2())
-            >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a", 
"b")).select("a, b")
+            >>> tab.group_by(col('c')) \\
+            ...    .flat_aggregate(top2.alias("a", "b")) \\
+            ...    .select(col('a'), col('b'))
 
         :param fields: Expression string.
         :return: The result table.
diff --git a/flink-python/pyflink/table/table_config.py 
b/flink-python/pyflink/table/table_config.py
index 5fa9896a566..630e0f2f927 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -236,8 +236,7 @@ class TableConfig(object):
         Example:
         ::
 
-            >>> table_config = TableConfig() \\
-            ...     .set_idle_state_retention(datetime.timedelta(days=1))
+            >>> 
table_config.set_idle_state_retention(datetime.timedelta(days=1))
 
         .. note::
 
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index 2e38ad9006a..2d13f2ea544 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -1940,7 +1940,7 @@ class StreamTableEnvironment(TableEnvironment):
             ...         .column("id", DataTypes.BIGINT())
             ...         .column("payload", DataTypes.ROW(
             ...                                     [DataTypes.FIELD("name", 
DataTypes.STRING()),
-            ...                                     DataTypes.FIELD("age", 
DataTypes.INT())]))
+            ...                                      DataTypes.FIELD("age", 
DataTypes.INT())]))
             ...         .build())
 
         Note that the type system of the table ecosystem is richer than the 
one of the DataStream
diff --git a/flink-python/pyflink/table/window.py 
b/flink-python/pyflink/table/window.py
index 282660c6481..9dcd54998c7 100644
--- a/flink-python/pyflink/table/window.py
+++ b/flink-python/pyflink/table/window.py
@@ -61,12 +61,10 @@ class Tumble(object):
     Example:
     ::
 
-        >>> from pyflink.table import expressions as expr
-        >>> Tumble.over(expr.lit(10).minutes)
-        ...       .on(expr.col("rowtime"))
+        >>> from pyflink.table.expressions import col, lit
+        >>> Tumble.over(lit(10).minutes) \\
+        ...       .on(col("rowtime")) \\
         ...       .alias("w")
-
-        >>> Tumble.over("10.minutes").on("rowtime").alias("w")
     """
 
     @classmethod
@@ -140,13 +138,10 @@ class Session(object):
     Example:
     ::
 
-        >>> from pyflink.table import expressions as expr
-        >>> Session.with_gap(expr.lit(10).minutes)
-        ...        .on(expr.col("rowtime"))
+        >>> from pyflink.table.expressions import col, lit
+        >>> Session.with_gap(lit(10).minutes) \\
+        ...        .on(col("rowtime")) \\
         ...        .alias("w")
-
-        >>> Session.with_gap("10.minutes").on("rowtime").alias("w")
-
     """
 
     @classmethod
@@ -225,13 +220,11 @@ class Slide(object):
     Example:
     ::
 
-        >>> from pyflink.table import expressions as expr
-        >>> Slide.over(expr.lit(10).minutes)
-        ...      .every(expr.lit(5).minutes)
-        ...      .on(expr.col("rowtime"))
+        >>> from pyflink.table.expressions import col, lit
+        >>> Slide.over(lit(10).minutes) \\
+        ...      .every(lit(5).minutes) \\
+        ...      .on(col("rowtime")) \\
         ...      .alias("w")
-
-        >>> 
Slide.over("10.minutes").every("5.minutes").on("rowtime").alias("w")
     """
 
     @classmethod
@@ -333,13 +326,11 @@ class Over(object):
     Example:
     ::
 
-        >>> from pyflink.table import expressions as expr
+        >>> from pyflink.table.expressions import col, UNBOUNDED_RANGE
         >>> Over.partition_by(col("a")) \\
         ...     .order_by(col("rowtime")) \\
-        ...     .preceding(expr.UNBOUNDED_RANGE) \\
+        ...     .preceding(UNBOUNDED_RANGE) \\
         ...     .alias("w")
-
-        >>> 
Over.partition_by("a").order_by("rowtime").preceding("unbounded_range").alias("w")
     """
 
     @classmethod

Reply via email to