This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1a5fc5f28a1c3569da8de953f6cd1fad5371f6a4 Author: godfreyhe <godfre...@163.com> AuthorDate: Wed Apr 29 10:57:32 2020 +0800 [FLINK-17267] [table] Introduce TableEnvironment#explainSql api --- flink-python/pyflink/table/__init__.py | 4 +- flink-python/pyflink/table/explain_detail.py | 34 ++++++++++++ flink-python/pyflink/table/table_environment.py | 20 ++++++- .../table/tests/test_table_environment_api.py | 29 +++++++++- flink-python/pyflink/util/utils.py | 20 +++++++ .../org/apache/flink/table/api/ExplainDetail.java} | 45 +++++----------- .../apache/flink/table/api/TableEnvironment.java | 15 +++++- .../table/api/internal/TableEnvironmentImpl.java | 35 +++++++++++-- .../org/apache/flink/table/delegation/Planner.java | 7 +-- .../org/apache/flink/table/utils/PlannerMock.java | 3 +- .../table/planner/delegation/BatchPlanner.scala | 8 +-- .../table/planner/delegation/StreamPlanner.scala | 11 ++-- .../resources/explain/testExplainSqlWithInsert.out | 31 +++++++++++ .../resources/explain/testExplainSqlWithSelect.out | 21 ++++++++ .../flink/table/api/TableEnvironmentTest.scala | 57 ++++++++++++++++++++ .../flink/table/planner/utils/TableTestBase.scala | 2 +- .../table/api/internal/BatchTableEnvImpl.scala | 20 +++++-- .../flink/table/api/internal/TableEnvImpl.scala | 18 +++++-- .../apache/flink/table/planner/StreamPlanner.scala | 2 +- .../api/batch/BatchTableEnvironmentTest.scala | 61 +++++++++++++++++++++- .../api/stream/StreamTableEnvironmentTest.scala | 58 ++++++++++++++++++++ .../flink/table/utils/MockTableEnvironment.scala | 4 +- .../scala/resources/testExplainSqlWithInsert0.out | 31 +++++++++++ .../scala/resources/testExplainSqlWithInsert1.out | 43 +++++++++++++++ .../scala/resources/testExplainSqlWithSelect0.out | 21 ++++++++ .../scala/resources/testExplainSqlWithSelect1.out | 27 ++++++++++ 26 files changed, 562 insertions(+), 65 deletions(-) diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py index 140c6b3..1e367f3 100644 --- a/flink-python/pyflink/table/__init__.py +++ b/flink-python/pyflink/table/__init__.py @@ -70,6 +70,7 @@ from pyflink.table.sources import TableSource, CsvTableSource from pyflink.table.types import DataTypes, UserDefinedType, Row from pyflink.table.table_schema import TableSchema from pyflink.table.udf import FunctionContext, ScalarFunction +from pyflink.table.explain_detail import ExplainDetail __all__ = [ 'TableEnvironment', @@ -93,5 +94,6 @@ __all__ = [ 'TableSchema', 'FunctionContext', 'ScalarFunction', - 'SqlDialect' + 'SqlDialect', + 'ExplainDetail' ] diff --git a/flink-python/pyflink/table/explain_detail.py b/flink-python/pyflink/table/explain_detail.py new file mode 100644 index 0000000..48e7ce9 --- /dev/null +++ b/flink-python/pyflink/table/explain_detail.py @@ -0,0 +1,34 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +__all__ = ['ExplainDetail'] + + +class ExplainDetail(object): + """ + ExplainDetail defines the types of details for explain result. + """ + + # The cost information on physical rel node estimated by optimizer. + # e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, + # 0.0 memory} + ESTIMATED_COST = 0 + + # The changelog traits produced by a physical rel node. + # e.g. GroupAggregate(..., changelogMode=[I,UA,D]) + CHANGELOG_TRAITS = 1 diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index d8e8c51..94ff785 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -36,7 +36,8 @@ from pyflink.table import Table from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, DataType, \ _infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema from pyflink.util import utils -from pyflink.util.utils import get_j_env_configuration, is_local_deployment, load_java_class +from pyflink.util.utils import get_j_env_configuration, is_local_deployment, load_java_class, \ + to_j_explain_detail_arr __all__ = [ 'BatchTableEnvironment', @@ -468,6 +469,23 @@ class TableEnvironment(object): else: return self._j_tenv.explain(table._j_table, extended) + def explain_sql(self, stmt, *extra_details): + """ + Returns the AST of the specified statement and the execution plan to compute + the result of the given statement. + + :param stmt: The statement for which the AST and execution plan will be returned. + :type stmt: str + :param extra_details: The extra explain details which the explain result should include, + e.g. estimated cost, change log trait for streaming + :type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail) + :return: The statement for which the AST and execution plan will be returned. + :rtype: str + """ + + j_extra_details = to_j_explain_detail_arr(extra_details) + return self._j_tenv.explainSql(stmt, j_extra_details) + def sql_query(self, query): """ Evaluates a SQL query on registered tables and retrieves the result as a diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index 87c8023..bd279af 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -35,8 +35,8 @@ from pyflink.table.types import RowType from pyflink.testing import source_sink_utils from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase, \ PyFlinkBlinkBatchTableTestCase -from pyflink.util.exceptions import TableException from pyflink.util.utils import get_j_env_configuration +from pyflink.table.explain_detail import ExplainDetail class TableEnvironmentTest(object): @@ -242,6 +242,33 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa actual = t_env.explain(extended=True) assert isinstance(actual, str) + def test_explain_sql_without_explain_detail(self): + t_env = self.t_env + source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"]) + field_names = ["a", "b", "c"] + field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()] + t_env.register_table_sink( + "sinks", + source_sink_utils.TestAppendSink(field_names, field_types)) + + result = t_env.explain_sql("select a + 1, b, c from %s" % source) + + assert isinstance(result, str) + + def test_explain_sql_with_explain_detail(self): + t_env = self.t_env + source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"]) + field_names = ["a", "b", "c"] + field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()] + t_env.register_table_sink( + "sinks", + source_sink_utils.TestAppendSink(field_names, field_types)) + + result = t_env.explain_sql( + "select a + 1, b, c from %s" % source, ExplainDetail.ESTIMATED_COST) + + assert isinstance(result, str) + def test_create_table_environment(self): table_config = TableConfig() table_config.set_max_generated_code_length(32000) diff --git a/flink-python/pyflink/util/utils.py b/flink-python/pyflink/util/utils.py index 89db742..29a20da 100644 --- a/flink-python/pyflink/util/utils.py +++ b/flink-python/pyflink/util/utils.py @@ -125,3 +125,23 @@ def add_jars_to_context_class_loader(jar_urls): new_classloader = gateway.jvm.java.net.URLClassLoader( to_jarray(gateway.jvm.java.net.URL, j_urls), context_classloader) gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader) + + +def to_j_explain_detail_arr(p_extra_details): + # sphinx will check "import loop" when generating doc, + # use local import to avoid above error + from pyflink.table.explain_detail import ExplainDetail + gateway = get_gateway() + + def to_j_explain_detail(p_extra_detail): + if p_extra_detail == ExplainDetail.CHANGELOG_TRAITS: + return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_TRAITS + else: + return gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST + + _len = len(p_extra_details) if p_extra_details else 0 + j_arr = gateway.new_array(gateway.jvm.org.apache.flink.table.api.ExplainDetail, _len) + for i in range(0, _len): + j_arr[i] = to_j_explain_detail(p_extra_details[i]) + + return j_arr diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java similarity index 50% copy from flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java index 92f50ff..6e9d014 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java @@ -16,38 +16,21 @@ * limitations under the License. */ -package org.apache.flink.table.utils; - -import org.apache.flink.api.dag.Transformation; -import org.apache.flink.table.delegation.Parser; -import org.apache.flink.table.delegation.Planner; -import org.apache.flink.table.operations.ModifyOperation; -import org.apache.flink.table.operations.Operation; - -import java.util.List; +package org.apache.flink.table.api; /** - * Mocking {@link Planner} for tests. + * ExplainDetail defines the types of details for explain result. */ -public class PlannerMock implements Planner { - - @Override - public Parser getParser() { - return new ParserMock(); - } - - @Override - public List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) { - return null; - } - - @Override - public String explain(List<Operation> operations, boolean extended) { - return null; - } - - @Override - public String[] getCompletionHints(String statement, int position) { - return new String[0]; - } +public enum ExplainDetail { + /** + * The cost information on physical rel node estimated by optimizer. + * e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory} + */ + ESTIMATED_COST, + + /** + * The changelog traits produced by a physical rel node. + * e.g. GroupAggregate(..., changelogMode=[I,UA,D]) + */ + CHANGELOG_TRAITS } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java index d855b21..12d21ec 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java @@ -828,7 +828,7 @@ public interface TableEnvironment { * the result of the given {@link Table}. * * @param table The table for which the AST and execution plan will be returned. - * @param extended if the plan should contain additional properties such as + * @param extended if the plan should contain additional properties, * e.g. estimated cost, traits */ String explain(Table table, boolean extended); @@ -837,12 +837,23 @@ public interface TableEnvironment { * Returns the AST of the specified Table API and SQL queries and the execution plan to compute * the result of multiple-sinks plan. * - * @param extended if the plan should contain additional properties such as + * @param extended if the plan should contain additional properties, * e.g. estimated cost, traits */ String explain(boolean extended); /** + * Returns the AST of the specified statement and the execution plan to compute + * the result of the given statement. + * + * @param statement The statement for which the AST and execution plan will be returned. + * @param extraDetails The extra explain details which the explain result should include, + * e.g. estimated cost, change log trait for streaming + * @return AST and the execution plan. + */ + String explainSql(String statement, ExplainDetail... extraDetails); + + /** * Returns completion hints for the given statement at the given cursor position. * The completion happens case insensitively. * diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 1ca045b..610627c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -27,6 +27,7 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.core.execution.JobClient; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.ResultKind; import org.apache.flink.table.api.SqlParserException; import org.apache.flink.table.api.Table; @@ -136,6 +137,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { protected final FunctionCatalog functionCatalog; protected final Planner planner; protected final Parser parser; + private final boolean isStreamingMode; private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG = "Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " + "INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " + @@ -179,6 +181,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { this.functionCatalog = functionCatalog; this.planner = planner; this.parser = planner.getParser(); + this.isStreamingMode = isStreamingMode; this.operationTreeBuilder = OperationTreeBuilder.create( tableConfig, functionCatalog.asLookup(parser::parseIdentifier), @@ -589,14 +592,25 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { @Override public String explain(Table table, boolean extended) { - return planner.explain(Collections.singletonList(table.getQueryOperation()), extended); + return planner.explain(Collections.singletonList(table.getQueryOperation()), getExplainDetails(extended)); } @Override public String explain(boolean extended) { List<Operation> operations = bufferedModifyOperations.stream() - .map(o -> (Operation) o).collect(Collectors.toList()); - return planner.explain(operations, extended); + .map(o -> (Operation) o).collect(Collectors.toList()); + return planner.explain(operations, getExplainDetails(extended)); + } + + @Override + public String explainSql(String statement, ExplainDetail... extraDetails) { + List<Operation> operations = parser.parse(statement); + + if (operations.size() != 1) { + throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query."); + } + + return planner.explain(operations, extraDetails); } @Override @@ -854,7 +868,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } else if (operation instanceof ShowViewsOperation) { return buildShowResult(listViews()); } else if (operation instanceof ExplainOperation) { - String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()), false); + String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild())); return TableResultImpl.builder() .resultKind(ResultKind.SUCCESS_WITH_CONTENT) .tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build()) @@ -979,6 +993,19 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { bufferedModifyOperations.addAll(modifyOperations); } + @VisibleForTesting + protected ExplainDetail[] getExplainDetails(boolean extended) { + if (extended) { + if (isStreamingMode) { + return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_TRAITS }; + } else { + return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST }; + } + } else { + return new ExplainDetail[0]; + } + } + private void registerTableSourceInternal(String name, TableSource<?> tableSource) { validateTableSource(tableSource); ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(name)); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java index 5bb9266..d926e3a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java @@ -20,6 +20,7 @@ package org.apache.flink.table.delegation; import org.apache.flink.annotation.Internal; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; @@ -79,10 +80,10 @@ public interface Planner { * * @param operations The collection of relational queries for which the AST * and execution plan will be returned. - * @param extended if the plan should contain additional properties such as - * e.g. estimated cost, traits + * @param extraDetails The extra explain details which the explain result should include, + * e.g. estimated cost, change log trait for streaming */ - String explain(List<Operation> operations, boolean extended); + String explain(List<Operation> operations, ExplainDetail... extraDetails); /** * Returns completion hints for the given statement at the given cursor position. diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java index 92f50ff..42b5403 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java @@ -19,6 +19,7 @@ package org.apache.flink.table.utils; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.operations.ModifyOperation; @@ -42,7 +43,7 @@ public class PlannerMock implements Planner { } @Override - public String explain(List<Operation> operations, boolean extended) { + public String explain(List<Operation> operations, ExplainDetail... extraDetails) { return null; } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala index 9161753..f97e015 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.delegation import org.apache.flink.api.dag.Transformation -import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException} import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier} import org.apache.flink.table.delegation.Executor import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation} @@ -78,7 +78,7 @@ class BatchPlanner( } } - override def explain(operations: util.List[Operation], extended: Boolean): String = { + override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = { require(operations.nonEmpty, "operations should not be empty") val sinkRelNodes = operations.map { case queryOperation: QueryOperation => @@ -122,10 +122,10 @@ class BatchPlanner( sb.append("== Optimized Logical Plan ==") sb.append(System.lineSeparator) - val explainLevel = if (extended) { + val explainLevel = if (extraDetails.contains(ExplainDetail.ESTIMATED_COST)) { SqlExplainLevel.ALL_ATTRIBUTES } else { - SqlExplainLevel.DIGEST_ATTRIBUTES + SqlExplainLevel.EXPPLAN_ATTRIBUTES } sb.append(ExecNodePlanDumper.dagToString(execNodes, explainLevel)) sb.append(System.lineSeparator) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala index 7006533..959de06 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.delegation import org.apache.flink.api.dag.Transformation -import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException} import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier} import org.apache.flink.table.delegation.Executor import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation} @@ -69,7 +69,7 @@ class StreamPlanner( } } - override def explain(operations: util.List[Operation], extended: Boolean): String = { + override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = { require(operations.nonEmpty, "operations should not be empty") val sinkRelNodes = operations.map { case queryOperation: QueryOperation => @@ -109,11 +109,12 @@ class StreamPlanner( sb.append("== Optimized Logical Plan ==") sb.append(System.lineSeparator) - val (explainLevel, withChangelogTraits) = if (extended) { - (SqlExplainLevel.ALL_ATTRIBUTES, true) + val explainLevel = if (extraDetails.contains(ExplainDetail.ESTIMATED_COST)) { + SqlExplainLevel.ALL_ATTRIBUTES } else { - (SqlExplainLevel.DIGEST_ATTRIBUTES, false) + SqlExplainLevel.DIGEST_ATTRIBUTES } + val withChangelogTraits = extraDetails.contains(ExplainDetail.CHANGELOG_TRAITS) sb.append(ExecNodePlanDumper.dagToString( execNodes, explainLevel, diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out new file mode 100644 index 0000000..870269f --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out @@ -0,0 +1,31 @@ +== Abstract Syntax Tree == +LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e]) ++- LogicalProject(a=[$0], b=[$1]) + +- LogicalFilter(condition=[>($0, 10)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]]) + +== Optimized Logical Plan == +Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e]) ++- Calc(select=[a, b], where=[>(a, 10)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c]) + +== Physical Execution Plan == + : Data Source + content : Source: Custom Source + + : Operator + content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c]) + ship_strategy : FORWARD + + : Operator + content : Calc(select=[a, b], where=[(a > 10)]) + ship_strategy : FORWARD + + : Operator + content : SinkConversionToRow + ship_strategy : FORWARD + + : Data Sink + content : Sink: Unnamed + ship_strategy : FORWARD + diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out new file mode 100644 index 0000000..0c87ae3 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out @@ -0,0 +1,21 @@ +== Abstract Syntax Tree == +LogicalProject(a=[$0], b=[$1], c=[$2]) ++- LogicalFilter(condition=[>($0, 10)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]]) + +== Optimized Logical Plan == +Calc(select=[a, b, c], where=[>(a, 10)], changelogMode=[I]) ++- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I]) + +== Physical Execution Plan == + : Data Source + content : Source: Custom Source + + : Operator + content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c]) + ship_strategy : FORWARD + + : Operator + content : Calc(select=[a, b, c], where=[(a > 10)]) + ship_strategy : FORWARD + diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index a27b47a..0e197ba 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -894,6 +894,63 @@ class TableEnvironmentTest { testUnsupportedExplain("explain plan as json for select * from MyTable") } + @Test + def testExplainSqlWithSelect(): Unit = { + val createTableStmt = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val actual = tableEnv.explainSql( + "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_TRAITS) + val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + } + + @Test + def testExplainSqlWithInsert(): Unit = { + val createTableStmt1 = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = tableEnv.executeSql(createTableStmt1) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val createTableStmt2 = + """ + |CREATE TABLE MySink ( + | d bigint, + | e int + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult2 = tableEnv.executeSql(createTableStmt2) + assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind) + + val actual = tableEnv.explainSql( + "insert into MySink select a, b from MyTable where a > 10") + val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithInsert.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + } + private def testUnsupportedExplain(explain: String): Unit = { try { tableEnv.executeSql(explain) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 4cd0f5d..2d50f43 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -1091,7 +1091,7 @@ class TestingTableEnvironment private( } override def explain(extended: Boolean): String = { - planner.explain(bufferedOperations.toList, extended) + planner.explain(bufferedOperations.toList, getExplainDetails(extended): _*) } @throws[Exception] diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala index efc38a5..b3caf20 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala @@ -217,16 +217,20 @@ abstract class BatchTableEnvImpl( * @param extended Flag to include detailed optimizer estimates. */ private[flink] def explain(table: Table, extended: Boolean): String = { - explain(JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]), extended) + explain( + JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]), + getExplainDetails(extended): _*) } override def explain(table: Table): String = explain(table: Table, extended = false) override def explain(extended: Boolean): String = { - explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended) + explain( + bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, + getExplainDetails(extended): _*) } - protected def explain(operations: JList[Operation], extended: Boolean): String = { + protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String = { require(operations.asScala.nonEmpty, "operations should not be empty") val astList = operations.asScala.map { case queryOperation: QueryOperation => @@ -285,6 +289,8 @@ abstract class BatchTableEnvImpl( val env = dataSinks.head.getDataSet.getExecutionEnvironment val jasonSqlPlan = env.getExecutionPlan + // keep the behavior as before + val extended = extraDetails.contains(ExplainDetail.ESTIMATED_COST) val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended) s"== Abstract Syntax Tree ==" + @@ -597,6 +603,14 @@ abstract class BatchTableEnvImpl( TableSchema.builder().fields(originalNames, fieldTypes).build() } + private def getExplainDetails(extended: Boolean): Array[ExplainDetail] = { + if (extended) { + Array(ExplainDetail.ESTIMATED_COST) + } else { + Array.empty + } + } + protected def createDummyBatchTableEnv(): BatchTableEnvImpl } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 7c6f144..1f01186 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -762,14 +762,13 @@ abstract class TableEnvImpl( case _: ShowViewsOperation => buildShowResult(listViews()) case explainOperation: ExplainOperation => - val explanation = explain( - JCollections.singletonList(explainOperation.getChild), - extended = false) + val explanation = explain(JCollections.singletonList(explainOperation.getChild)) TableResultImpl.builder. resultKind(ResultKind.SUCCESS_WITH_CONTENT) .tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build) .data(JCollections.singletonList(Row.of(explanation))) .build + case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG) } } @@ -1135,7 +1134,18 @@ abstract class TableEnvImpl( } } - protected def explain(operations: JList[Operation], extended: Boolean): String + override def explainSql(statement: String, extraDetails: ExplainDetail*): String = { + val operations = parser.parse(statement) + + if (operations.size != 1) { + throw new TableException( + "Unsupported SQL query! explainSql() only accepts a single SQL query.") + } + + explain(operations, extraDetails: _*) + } + + protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String override def fromValues(values: Expression*): Table = { createTable(operationTreeBuilder.values(values: _*)) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala index 756d9ca..d81ca1c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala @@ -120,7 +120,7 @@ class StreamPlanner( }.filter(Objects.nonNull).asJava } - override def explain(operations: util.List[Operation], extended: Boolean): String = { + override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = { require(operations.asScala.nonEmpty, "operations should not be empty") val astWithUpdatesAsRetractionTuples = operations.asScala.map { case queryOperation: QueryOperation => diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala index d09bd5d..c928314 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala @@ -24,8 +24,7 @@ import org.apache.flink.table.api.{ResultKind, TableException} import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath} import org.apache.flink.table.runtime.stream.sql.FunctionITCase.{SimpleScalarFunction, TestUDF} import org.apache.flink.table.utils.TableTestBase -import org.apache.flink.table.utils.TableTestUtil._ -import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId} +import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId, _} import org.apache.flink.types.Row import org.hamcrest.Matchers.containsString @@ -447,6 +446,64 @@ class BatchTableEnvironmentTest extends TableTestBase { "explain plan as json for select * from MyTable") } + @Test + def testExplainSqlWithSelect(): Unit = { + val util = batchTestUtil() + val createTableStmt = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = util.tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val actual = util.tableEnv.explainSql("select * from MyTable where a > 10") + val expected = readFromResource("testExplainSqlWithSelect1.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + } + + @Test + def testExplainSqlWithInsert(): Unit = { + val util = batchTestUtil() + val createTableStmt1 = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = util.tableEnv.executeSql(createTableStmt1) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val createTableStmt2 = + """ + |CREATE TABLE MySink ( + | d bigint, + | e int + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult2 = util.tableEnv.executeSql(createTableStmt2) + assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind) + + val actual = util.tableEnv.explainSql( + "insert into MySink select a, b from MyTable where a > 10") + val expected = readFromResource("testExplainSqlWithInsert1.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + } + private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: String): Unit = { try { tableEnv.executeSql(explain) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala index 439fadb..25bb536 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala @@ -326,6 +326,64 @@ class StreamTableEnvironmentTest extends TableTestBase { } } + @Test + def testExplainSqlWithSelect(): Unit = { + val util = streamTestUtil() + val createTableStmt = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = util.tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val actual = util.tableEnv.explainSql("select * from MyTable where a > 10") + val expected = readFromResource("testExplainSqlWithSelect0.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + } + + @Test + def testExplainSqlWithInsert(): Unit = { + val util = streamTestUtil() + val createTableStmt1 = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = util.tableEnv.executeSql(createTableStmt1) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val createTableStmt2 = + """ + |CREATE TABLE MySink ( + | d bigint, + | e int + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult2 = util.tableEnv.executeSql(createTableStmt2) + assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind) + + val actual = util.tableEnv.explainSql( + "insert into MySink select a, b from MyTable where a > 10") + val expected = readFromResource("testExplainSqlWithInsert0.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + } + private def prepareSchemaExpressionParser: (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala index 8a9d9c4..312d980 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.utils import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, TableResult} +import org.apache.flink.table.api.{ExplainDetail, Table, TableConfig, TableEnvironment, TableResult} import org.apache.flink.table.catalog.Catalog import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDescriptor} import org.apache.flink.table.expressions.Expression @@ -74,6 +74,8 @@ class MockTableEnvironment extends TableEnvironment { override def explain(extended: Boolean): String = ??? + override def explainSql(statement: String, extraDetails: ExplainDetail*): String = ??? + override def getCompletionHints(statement: String, position: Int): Array[String] = ??? override def sqlQuery(query: String): Table = ??? diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out new file mode 100644 index 0000000..bbd0f53 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out @@ -0,0 +1,31 @@ +== Abstract Syntax Tree == +LogicalSink(name=[default_catalog.default_database.MySink], fields=[d, e]) + LogicalProject(a=[$0], b=[$1]) + LogicalFilter(condition=[>($0, 10)]) + LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Logical Plan == +DataStreamSink(name=[default_catalog.default_database.MySink], fields=[d, e]) + DataStreamCalc(select=[a, b], where=[>(a, 10)]) + StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)]) + +== Physical Execution Plan == + : Data Source + content : collect elements with CollectionInputFormat + + : Operator + content : from: (a, b) + ship_strategy : FORWARD + + : Operator + content : where: (>(a, 10)), select: (a, b) + ship_strategy : FORWARD + + : Operator + content : to: Row + ship_strategy : FORWARD + + : Data Sink + content : Sink: Unnamed + ship_strategy : FORWARD + diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out new file mode 100644 index 0000000..b904056 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out @@ -0,0 +1,43 @@ +== Abstract Syntax Tree == +LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e]) + LogicalProject(a=[$0], b=[$1]) + LogicalFilter(condition=[>($0, 10)]) + LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Logical Plan == +DataSetSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e]) + DataSetCalc(select=[a, b], where=[>(a, 10)]) + BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)]) + +== Physical Execution Plan == + : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + : Map + content : from: (a, b) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + + : FlatMap + content : where: (>(a, 10)), select: (a, b) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : FlatMap + Partitioning : RANDOM_PARTITIONED + + : Map + content : to: Row + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + + : Data Sink + content : org.apache.flink.api.java.io.LocalCollectionOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out new file mode 100644 index 0000000..4459ad6 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out @@ -0,0 +1,21 @@ +== Abstract Syntax Tree == +LogicalProject(a=[$0], b=[$1], c=[$2]) + LogicalFilter(condition=[>($0, 10)]) + LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Logical Plan == +DataStreamCalc(select=[a, b, c], where=[>(a, 10)]) + StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)]) + +== Physical Execution Plan == + : Data Source + content : collect elements with CollectionInputFormat + + : Operator + content : Map + ship_strategy : FORWARD + + : Operator + content : where: (>(a, 10)), select: (a, b, c) + ship_strategy : FORWARD + diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out new file mode 100644 index 0000000..91e87ee --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out @@ -0,0 +1,27 @@ +== Abstract Syntax Tree == +LogicalProject(a=[$0], b=[$1], c=[$2]) + LogicalFilter(condition=[>($0, 10)]) + LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Logical Plan == +DataSetCalc(select=[a, b, c], where=[>(a, 10)]) + BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)]) + +== Physical Execution Plan == + : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + : FlatMap + content : where: (>(a, 10)), select: (a, b, c) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : FlatMap + Partitioning : RANDOM_PARTITIONED + + : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED +