This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 4497072 [FLINK-13263] [python] Supports explain DAG plan in flink-python 4497072 is described below commit 4497072f7baae228c506385fb9f1c2c0450ef55c Author: godfreyhe <godfre...@163.com> AuthorDate: Mon Jul 15 18:09:31 2019 +0800 [FLINK-13263] [python] Supports explain DAG plan in flink-python This closes #9114 --- flink-python/pyflink/table/table_environment.py | 14 +++-- .../table/tests/test_table_environment_api.py | 69 +++++++++++++++++++++- flink-python/pyflink/util/exceptions.py | 8 +++ 3 files changed, 85 insertions(+), 6 deletions(-) diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index b5619e8..071c1f5 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -224,15 +224,21 @@ class TableEnvironment(object): j_table_name_array = self._j_tenv.listTables() return [item for item in j_table_name_array] - def explain(self, table): + def explain(self, table=None, extended=False): """ Returns the AST of the specified Table API and SQL queries and the execution plan to compute - the result of the given :class:`Table`. + the result of the given :class:`Table` or multi-sinks plan. - :param table: The table to be explained. + :param table: The table to be explained. If table is None, explain for multi-sinks plan, + else for given table. + :param extended: If the plan should contain additional properties. + e.g. estimated cost, traits :return: The table for which the AST and execution plan will be returned. """ - return self._j_tenv.explain(table._j_table) + if table is None: + return self._j_tenv.explain(extended) + else: + return self._j_tenv.explain(table._j_table, extended) def sql_query(self, query): """ diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index 54188ff..ef787f8 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -22,11 +22,13 @@ from py4j.compat import unicode from pyflink.dataset import ExecutionEnvironment from pyflink.datastream import StreamExecutionEnvironment -from pyflink.table.table_environment import BatchTableEnvironment, StreamTableEnvironment +from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment from pyflink.table.table_config import TableConfig -from pyflink.table.types import DataTypes, RowType +from pyflink.table.table_environment import BatchTableEnvironment +from pyflink.table.types import RowType from pyflink.testing import source_sink_utils from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase +from pyflink.util.exceptions import TableException class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase): @@ -103,6 +105,38 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase): assert isinstance(actual, str) or isinstance(actual, unicode) + def test_explain_with_extended(self): + schema = RowType() \ + .add('a', DataTypes.INT()) \ + .add('b', DataTypes.STRING()) \ + .add('c', DataTypes.STRING()) + t_env = self.t_env + t = t_env.from_elements([], schema) + result = t.select("1 + a, b, c") + + actual = t_env.explain(result, True) + + assert isinstance(actual, str) or isinstance(actual, unicode) + + def test_explain_with_multi_sinks(self): + t_env = self.t_env + source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"]) + field_names = ["a", "b", "c"] + field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()] + t_env.register_table_sink( + "sink1", + source_sink_utils.TestAppendSink(field_names, field_types)) + t_env.register_table_sink( + "sink2", + source_sink_utils.TestAppendSink(field_names, field_types)) + + t_env.sql_update("insert into sink1 select * from %s where a > 100" % source) + t_env.sql_update("insert into sink2 select * from %s where a < 100" % source) + + actual = t_env.explain(extended=True) + + assert isinstance(actual, str) or isinstance(actual, unicode) + def test_sql_query(self): t_env = self.t_env source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"]) @@ -195,6 +229,37 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase): self.assertIsInstance(actual, (str, unicode)) + def test_explain_with_extended(self): + schema = RowType() \ + .add('a', DataTypes.INT()) \ + .add('b', DataTypes.STRING()) \ + .add('c', DataTypes.STRING()) + t_env = self.t_env + t = t_env.from_elements([], schema) + result = t.select("1 + a, b, c") + + actual = t_env.explain(result, True) + + assert isinstance(actual, str) or isinstance(actual, unicode) + + def test_explain_with_multi_sinks(self): + t_env = self.t_env + source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"]) + field_names = ["a", "b", "c"] + field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()] + t_env.register_table_sink( + "sink1", + CsvTableSink(field_names, field_types, "path1")) + t_env.register_table_sink( + "sink2", + CsvTableSink(field_names, field_types, "path2")) + + t_env.sql_update("insert into sink1 select * from %s where a > 100" % source) + t_env.sql_update("insert into sink2 select * from %s where a < 100" % source) + + with self.assertRaises(TableException): + t_env.explain(extended=True) + def test_table_config(self): table_config = TableConfig() diff --git a/flink-python/pyflink/util/exceptions.py b/flink-python/pyflink/util/exceptions.py index 57d17ad..2a28936 100644 --- a/flink-python/pyflink/util/exceptions.py +++ b/flink-python/pyflink/util/exceptions.py @@ -28,6 +28,12 @@ class JavaException(Exception): return repr(self.msg) +class TableException(JavaException): + """ + General Exception for all errors during table handling. + """ + + class CatalogException(JavaException): """ A catalog-related exception. @@ -106,6 +112,8 @@ class TableNotPartitionedException(JavaException): # Mapping from JavaException to PythonException exception_mapping = { + "org.apache.flink.table.api.TableException": + TableException, "org.apache.flink.table.catalog.exceptions.CatalogException": CatalogException, "org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException":