sunjincheng121 commented on a change in pull request #13304:
URL: https://github.com/apache/flink/pull/13304#discussion_r482744088



##########
File path: flink-python/pyflink/table/tests/test_column_operation.py
##########
@@ -16,36 +16,37 @@
 # limitations under the License.
 
################################################################################
 
+from pyflink.table import expressions as E
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
 
 
 class StreamTableColumnsOperationTests(PyFlinkStreamTableTestCase):
 
     def test_add_columns(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.select("a").add_columns("a + 1 as b, a + 2 as c")
+        result = t.select(t.a).add_columns((t.a + 1).alias('b'), (t.a + 
2).alias('c'))
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('[a, plus(a, 1), '
                          'plus(a, 2)]',
                          query_operation.getProjectList().toString())
 
     def test_add_or_replace_columns(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.select("a").add_or_replace_columns("a + 1 as b, a + 2 as a")
+        result = t.select("a").add_or_replace_columns((t.a + 1).alias('b'), 
(t.a + 2).alias('a'))
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('[plus(a, 2), '
                          'plus(a, 1)]',
                          query_operation.getProjectList().toString())
 
     def test_rename_columns(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.select("a, b, c").rename_columns("a as d, c as f, b as e")
+        result = t.select("a, b, c").rename_columns(t.a.alias('d'), 
t.c.alias('f'), t.b.alias('e'))
         table_schema = result._j_table.getQueryOperation().getTableSchema()
         self.assertEqual(['d', 'e', 'f'], list(table_schema.getFieldNames()))
 
     def test_drop_columns(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.select("a, b, c").drop_columns("a, c")
+        result = t.select("a, b, c").drop_columns(E.col('a'), E.col('c'))

Review comment:
       `t.select("a, b, c")` -> `t.select(t.a, t.b, t.c)`
   
   `E.col('a')` -> `t.a`?

##########
File path: flink-python/pyflink/table/examples/batch/word_count.py
##########
@@ -23,6 +23,7 @@
 
 from pyflink.dataset import ExecutionEnvironment
 from pyflink.table import BatchTableEnvironment, TableConfig
+from pyflink.table import expressions as E

Review comment:
       How about  `E` -> `expr`(Shorthand in lowercase)? some as pandas did?  
e.g.: `import numpy as np`
   
   
https://github.com/pandas-dev/pandas/blob/master/pandas/tests/dtypes/test_dtypes.py

##########
File path: flink-python/pyflink/table/tests/test_correlate.py
##########
@@ -39,7 +40,8 @@ def test_join_lateral_with_join_predicate(self):
                                                     
"org.apache.flink.table.utils.TableFunc1")
         source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], 
["id", "words"])
 
-        result = source.join_lateral("split(words) as (word)", "id = word")
+        result = source.join_lateral(E.call('split', 
source.words).alias('word'),
+                                     E.col('id') == E.col('word'))

Review comment:
       `E.col('id') == E.col('word')` -> `source.id == source.word`

##########
File path: flink-python/pyflink/table/tests/test_dependency.py
##########
@@ -112,7 +113,7 @@ def check_requirements(i):
             ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
         self.t_env.register_table_sink("Results", table_sink)
         t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
-        exec_insert_table(t.select("check_requirements(a), a"), "Results")
+        exec_insert_table(t.select(E.call('check_requirements', E.col('a')), 
E.col('a')), "Results")

Review comment:
       `E.col('a')` -> `t.a` 

##########
File path: flink-python/pyflink/table/tests/test_calc.py
##########
@@ -31,21 +32,21 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
 
     def test_select(self):
         t = self.t_env.from_elements([(1, 'hi', 'hello')], ['a', 'b', 'c'])
-        result = t.select("a + 1, b, c")
+        result = t.select(t.a + 1, t.b, t.c)
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('[plus(a, 1), b, c]',
                          query_operation.getProjectList().toString())
 
     def test_alias(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.alias("d, e, f").select("d, e, f")
+        result = t.alias("d, e, f").select(E.col('d'), E.col('e'), E.col('f'))

Review comment:
       How about do a little bit change as follows?:
   ```
    t2 = t.alias("d, e, f")
    result = t2.select(t2.d, t2.e, t2.f)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to