This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6013a62523bd2fc73b6aa7b3109f6b7c138ac51c Author: godfreyhe <godfre...@163.com> AuthorDate: Wed Apr 22 17:08:00 2020 +0800 [FLINK-17267] [table] TableEnvironment#explainSql supports EXPLAIN statement --- .../table/api/internal/TableEnvironmentImpl.java | 9 ++ .../flink/table/operations/ExplainOperation.java | 46 ++++++++ .../operations/SqlToOperationConverter.java | 19 ++++ .../table/planner/calcite/FlinkPlannerImpl.scala | 11 +- .../table/planner/delegation/BatchPlanner.scala | 22 +++- .../table/planner/delegation/StreamPlanner.scala | 22 +++- .../explain/testExecuteSqlWithExplainInsert.out | 31 +++++ .../explain/testExecuteSqlWithExplainSelect.out | 21 ++++ .../flink/table/api/TableEnvironmentTest.scala | 115 ++++++++++++++++++- .../table/sqlexec/SqlToOperationConverter.java | 19 ++++ .../table/api/internal/BatchTableEnvImpl.scala | 71 ++++++++---- .../flink/table/api/internal/TableEnvImpl.scala | 13 ++- .../flink/table/calcite/FlinkPlannerImpl.scala | 11 +- .../apache/flink/table/planner/StreamPlanner.scala | 17 ++- .../flink/table/api/TableEnvironmentITCase.scala | 13 +-- .../api/batch/BatchTableEnvironmentTest.scala | 120 +++++++++++++++++++- .../api/stream/StreamTableEnvironmentTest.scala | 126 ++++++++++++++++++++- .../resources/testExecuteSqlWithExplainInsert0.out | 31 +++++ .../resources/testExecuteSqlWithExplainInsert1.out | 36 ++++++ .../resources/testExecuteSqlWithExplainSelect0.out | 21 ++++ .../resources/testExecuteSqlWithExplainSelect1.out | 27 +++++ 21 files changed, 744 insertions(+), 57 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 342ee55..1ca045b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -72,6 +72,7 @@ import org.apache.flink.table.module.Module; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.CatalogQueryOperation; import org.apache.flink.table.operations.CatalogSinkModifyOperation; +import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; @@ -852,6 +853,14 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { return buildShowResult(listFunctions()); } else if (operation instanceof ShowViewsOperation) { return buildShowResult(listViews()); + } else if (operation instanceof ExplainOperation) { + String explanation = planner.explain(Collections.singletonList(((ExplainOperation) operation).getChild()), false); + return TableResultImpl.builder() + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build()) + .data(Collections.singletonList(Row.of(explanation))) + .build(); + } else { throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java new file mode 100644 index 0000000..c78c78f --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import java.util.Collections; + +/** + * Operation to describe an EXPLAIN statement. + * NOTES: currently, only default behavior(EXPLAIN PLAN FOR xx) is supported. + */ +public class ExplainOperation implements Operation { + private final Operation child; + + public ExplainOperation(Operation child) { + this.child = child; + } + + public Operation getChild() { + return child; + } + + @Override + public String asSummaryString() { + return OperationUtils.formatWithChildren( + "EXPLAIN PLAN FOR", + Collections.emptyMap(), + Collections.singletonList(child), + Operation::asSummaryString); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index b303570..9f80aa1 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -62,6 +62,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.table.operations.CatalogSinkModifyOperation; +import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ShowCatalogsOperation; import org.apache.flink.table.operations.ShowDatabasesOperation; @@ -98,6 +99,9 @@ import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlExplain; +import org.apache.calcite.sql.SqlExplainFormat; +import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; @@ -195,6 +199,8 @@ public class SqlToOperationConverter { return Optional.of(converter.convertDropView((SqlDropView) validated)); } else if (validated instanceof SqlShowViews) { return Optional.of(converter.convertShowViews((SqlShowViews) validated)); + } else if (validated instanceof SqlExplain) { + return Optional.of(converter.convertExplain((SqlExplain) validated)); } else if (validated.getKind().belongsTo(SqlKind.QUERY)) { return Optional.of(converter.convertSqlQuery(validated)); } else { @@ -577,6 +583,19 @@ public class SqlToOperationConverter { return new ShowViewsOperation(); } + /** Convert EXPLAIN statement. */ + private Operation convertExplain(SqlExplain sqlExplain) { + Operation operation = convertSqlQuery(sqlExplain.getExplicandum()); + + if (sqlExplain.getDetailLevel() != SqlExplainLevel.EXPPLAN_ATTRIBUTES || + sqlExplain.getDepth() != SqlExplain.Depth.PHYSICAL || + sqlExplain.getFormat() != SqlExplainFormat.TEXT) { + throw new TableException("Only default behavior is supported now, EXPLAIN PLAN FOR xx"); + } + + return new ExplainOperation(operation); + } + /** Fallback method for sql query. */ private Operation convertSqlQuery(SqlNode node) { return toQueryOperation(flinkPlanner, node); diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala index c3f72b3..846f536 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala @@ -30,7 +30,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.{RelFieldCollation, RelRoot} import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator} -import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable} +import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable} import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter} import org.apache.calcite.tools.{FrameworkConfig, RelConversionException} import java.lang.{Boolean => JBoolean} @@ -129,7 +129,14 @@ class FlinkPlannerImpl( || sqlNode.isInstanceOf[SqlShowViews]) { return sqlNode } - validator.validate(sqlNode) + sqlNode match { + case explain: SqlExplain => + val validated = validator.validate(explain.getExplicandum) + explain.setOperand(0, validated) + explain + case _ => + validator.validate(sqlNode) + } } catch { case e: RuntimeException => diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala index 2626809..9161753 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala @@ -20,9 +20,10 @@ package org.apache.flink.table.planner.delegation import org.apache.flink.api.dag.Transformation import org.apache.flink.table.api.{TableConfig, TableException} -import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} +import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier} import org.apache.flink.table.delegation.Executor -import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation} +import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation} +import org.apache.flink.table.planner.operations.PlannerQueryOperation import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode} import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext @@ -32,6 +33,7 @@ import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOp import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil} import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef} +import org.apache.calcite.rel.logical.LogicalTableModify import org.apache.calcite.rel.{RelCollationTraitDef, RelNode} import org.apache.calcite.sql.SqlExplainLevel @@ -80,7 +82,21 @@ class BatchPlanner( require(operations.nonEmpty, "operations should not be empty") val sinkRelNodes = operations.map { case queryOperation: QueryOperation => - getRelBuilder.queryOperation(queryOperation).build() + val relNode = getRelBuilder.queryOperation(queryOperation).build() + relNode match { + // SQL: explain plan for insert into xx + case modify: LogicalTableModify => + // convert LogicalTableModify to CatalogSinkModifyOperation + val qualifiedName = modify.getTable.getQualifiedName + require(qualifiedName.size() == 3, "the length of qualified name should be 3.") + val modifyOperation = new CatalogSinkModifyOperation( + ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)), + new PlannerQueryOperation(modify.getInput) + ) + translateToRel(modifyOperation) + case _ => + relNode + } case modifyOperation: ModifyOperation => translateToRel(modifyOperation) case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}") diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala index e6245f2..7006533 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala @@ -20,9 +20,10 @@ package org.apache.flink.table.planner.delegation import org.apache.flink.api.dag.Transformation import org.apache.flink.table.api.{TableConfig, TableException} -import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} +import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier} import org.apache.flink.table.delegation.Executor -import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation} +import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, QueryOperation} +import org.apache.flink.table.planner.operations.PlannerQueryOperation import org.apache.flink.table.planner.plan.`trait`._ import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode} import org.apache.flink.table.planner.plan.optimize.{Optimizer, StreamCommonSubGraphBasedOptimizer} @@ -30,6 +31,7 @@ import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOp import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil} import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef} +import org.apache.calcite.rel.logical.LogicalTableModify import org.apache.calcite.sql.SqlExplainLevel import java.util @@ -71,7 +73,21 @@ class StreamPlanner( require(operations.nonEmpty, "operations should not be empty") val sinkRelNodes = operations.map { case queryOperation: QueryOperation => - getRelBuilder.queryOperation(queryOperation).build() + val relNode = getRelBuilder.queryOperation(queryOperation).build() + relNode match { + // SQL: explain plan for insert into xx + case modify: LogicalTableModify => + // convert LogicalTableModify to CatalogSinkModifyOperation + val qualifiedName = modify.getTable.getQualifiedName + require(qualifiedName.size() == 3, "the length of qualified name should be 3.") + val modifyOperation = new CatalogSinkModifyOperation( + ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)), + new PlannerQueryOperation(modify.getInput) + ) + translateToRel(modifyOperation) + case _ => + relNode + } case modifyOperation: ModifyOperation => translateToRel(modifyOperation) case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}") diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out new file mode 100644 index 0000000..0e5e015 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsert.out @@ -0,0 +1,31 @@ +== Abstract Syntax Tree == +LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e]) ++- LogicalProject(d=[$0], e=[$1]) + +- LogicalFilter(condition=[>($0, 10)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]]) + +== Optimized Logical Plan == +Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e]) ++- Calc(select=[a, b], where=[>(a, 10)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c]) + +== Physical Execution Plan == + : Data Source + content : Source: Custom Source + + : Operator + content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c]) + ship_strategy : FORWARD + + : Operator + content : Calc(select=[a, b], where=[(a > 10)]) + ship_strategy : FORWARD + + : Operator + content : SinkConversionToRow + ship_strategy : FORWARD + + : Data Sink + content : Sink: Unnamed + ship_strategy : FORWARD + diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out new file mode 100644 index 0000000..4865193 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainSelect.out @@ -0,0 +1,21 @@ +== Abstract Syntax Tree == +LogicalProject(a=[$0], b=[$1], c=[$2]) ++- LogicalFilter(condition=[>($0, 10)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]]) + +== Optimized Logical Plan == +Calc(select=[a, b, c], where=[>(a, 10)]) ++- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c]) + +== Physical Execution Plan == + : Data Source + content : Source: Custom Source + + : Operator + content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c]) + ship_strategy : FORWARD + + : Operator + content : Calc(select=[a, b, c], where=[(a > 10)]) + ship_strategy : FORWARD + diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index e289edb..a27b47a 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -18,8 +18,6 @@ package org.apache.flink.table.api -import org.apache.calcite.plan.RelOptUtil -import org.apache.calcite.sql.SqlExplainLevel import org.apache.flink.api.common.typeinfo.Types.STRING import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment @@ -29,10 +27,14 @@ import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath} import org.apache.flink.table.planner.operations.SqlConversionException import org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase.TestUDF import org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.SimpleScalarFunction +import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks} import org.apache.flink.types.Row + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.sql.SqlExplainLevel import org.hamcrest.Matchers.containsString -import org.junit.Assert.{assertEquals, assertFalse, assertTrue} +import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail} import org.junit.rules.ExpectedException import org.junit.{Rule, Test} @@ -797,6 +799,113 @@ class TableEnvironmentTest { tableResult5.collect()) } + @Test + def testExecuteSqlWithExplainSelect(): Unit = { + val createTableStmt = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val tableResult2 = tableEnv.executeSql("explain plan for select * from MyTable where a > 10") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind) + val it = tableResult2.collect() + assertTrue(it.hasNext) + val row = it.next() + assertEquals(1, row.getArity) + val actual = row.getField(0).toString + val expected = TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainSelect.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + assertFalse(it.hasNext) + } + + @Test + def testExecuteSqlWithExplainInsert(): Unit = { + val createTableStmt1 = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = tableEnv.executeSql(createTableStmt1) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val createTableStmt2 = + """ + |CREATE TABLE MySink ( + | d bigint, + | e int + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult2 = tableEnv.executeSql(createTableStmt2) + assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind) + + val tableResult3 = tableEnv.executeSql( + "explain plan for insert into MySink select a, b from MyTable where a > 10") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind) + val it = tableResult3.collect() + assertTrue(it.hasNext) + val row = it.next() + assertEquals(1, row.getArity) + val actual = row.getField(0).toString + val expected = TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainInsert.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + assertFalse(it.hasNext) + } + + @Test + def testExecuteSqlWithUnsupportedExplain(): Unit = { + val createTableStmt = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + // TODO we can support them later + testUnsupportedExplain("explain plan excluding attributes for select * from MyTable") + testUnsupportedExplain("explain plan including all attributes for select * from MyTable") + testUnsupportedExplain("explain plan with type for select * from MyTable") + testUnsupportedExplain("explain plan without implementation for select * from MyTable") + testUnsupportedExplain("explain plan as xml for select * from MyTable") + testUnsupportedExplain("explain plan as json for select * from MyTable") + } + + private def testUnsupportedExplain(explain: String): Unit = { + try { + tableEnv.executeSql(explain) + fail("This should not happen") + } catch { + case e: TableException => + assertTrue(e.getMessage.contains("Only default behavior is supported now")) + case e => + fail("This should not happen, " + e.getMessage) + } + } + private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = { while (expected.hasNext && actual.hasNext) { assertEquals(expected.next(), actual.next()) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java index 8ebb47e..ded0d85 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java @@ -61,6 +61,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.operations.CatalogSinkModifyOperation; +import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.PlannerQueryOperation; import org.apache.flink.table.operations.ShowCatalogsOperation; @@ -91,6 +92,9 @@ import org.apache.flink.util.StringUtils; import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlExplain; +import org.apache.calcite.sql.SqlExplainFormat; +import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; @@ -187,6 +191,8 @@ public class SqlToOperationConverter { return Optional.of(converter.convertDropView((SqlDropView) validated)); } else if (validated instanceof SqlShowViews) { return Optional.of(converter.convertShowViews((SqlShowViews) validated)); + } else if (validated instanceof SqlExplain) { + return Optional.of(converter.convertExplain((SqlExplain) validated)); } else if (validated.getKind().belongsTo(SqlKind.QUERY)) { return Optional.of(converter.convertSqlQuery(validated)); } else { @@ -534,6 +540,19 @@ public class SqlToOperationConverter { return new ShowViewsOperation(); } + /** Convert EXPLAIN statement. */ + private Operation convertExplain(SqlExplain sqlExplain) { + Operation operation = convertSqlQuery(sqlExplain.getExplicandum()); + + if (sqlExplain.getDetailLevel() != SqlExplainLevel.EXPPLAN_ATTRIBUTES || + sqlExplain.getDepth() != SqlExplain.Depth.PHYSICAL || + sqlExplain.getFormat() != SqlExplainFormat.TEXT) { + throw new TableException("Only default behavior is supported now, EXPLAIN PLAN FOR xx"); + } + + return new ExplainOperation(operation); + } + /** * Create a table schema from {@link SqlCreateTable}. This schema contains computed column * fields, say, we have a create table DDL statement: diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala index e25b8e4..efc38a5 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala @@ -31,14 +31,14 @@ import org.apache.flink.configuration.DeploymentOptions import org.apache.flink.core.execution.{DetachedJobExecutionResult, JobClient} import org.apache.flink.table.api._ import org.apache.flink.table.calcite.{CalciteConfig, FlinkTypeFactory} -import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogManager} +import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogManager, ObjectIdentifier} import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectTableDescriptor, ConnectorDescriptor} import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor import org.apache.flink.table.expressions.{Expression, UnresolvedCallExpression} import org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES import org.apache.flink.table.module.ModuleManager -import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, QueryOperation} +import org.apache.flink.table.operations.{CatalogSinkModifyOperation, DataSetQueryOperation, ModifyOperation, Operation, PlannerQueryOperation, QueryOperation} import org.apache.flink.table.plan.BatchOptimizer import org.apache.flink.table.plan.nodes.LogicalSink import org.apache.flink.table.plan.nodes.dataset.DataSetRel @@ -57,6 +57,7 @@ import org.apache.flink.util.Preconditions.checkNotNull import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.logical.LogicalTableModify import _root_.java.util.{ArrayList => JArrayList, Collections => JCollections, List => JList} @@ -225,11 +226,25 @@ abstract class BatchTableEnvImpl( explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, extended) } - private def explain(operations: JList[Operation], extended: Boolean): String = { + protected def explain(operations: JList[Operation], extended: Boolean): String = { require(operations.asScala.nonEmpty, "operations should not be empty") val astList = operations.asScala.map { case queryOperation: QueryOperation => - getRelBuilder.tableOperation(queryOperation).build() + val relNode = getRelBuilder.tableOperation(queryOperation).build() + relNode match { + // SQL: explain plan for insert into xx + case modify: LogicalTableModify => + // convert LogicalTableModify to CatalogSinkModifyOperation + val qualifiedName = modify.getTable.getQualifiedName + require(qualifiedName.size() == 3, "the length of qualified name should be 3.") + val modifyOperation = new CatalogSinkModifyOperation( + ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)), + new PlannerQueryOperation(modify.getInput) + ) + translateToRel(modifyOperation, addLogicalSink = true) + case _ => + relNode + } case modifyOperation: ModifyOperation => translateToRel(modifyOperation, addLogicalSink = true) case o => throw new TableException(s"Unsupported operation: ${o.asSummaryString()}") @@ -237,27 +252,33 @@ abstract class BatchTableEnvImpl( val optimizedNodes = astList.map(optimizer.optimize) - val batchTableEnv = createDummyBatchTableEnv() - val dataSinks = optimizedNodes.zip(operations.asScala).map { - case (optimizedNode, operation) => - operation match { - case queryOperation: QueryOperation => - val dataSet = translate[Row]( - optimizedNode, - getTableSchema(queryOperation.getTableSchema.getFieldNames, optimizedNode))( - new GenericTypeInfo(classOf[Row])) - dataSet.output(new DiscardingOutputFormat[Row]) - case modifyOperation: ModifyOperation => - val tableSink = getTableSink(modifyOperation) - translate( - batchTableEnv, - optimizedNode, - tableSink, - getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode)) - case o => - throw new TableException("Unsupported Operation: " + o.asSummaryString()) - } - } + val batchTableEnv = createDummyBatchTableEnv() + val dataSinks = optimizedNodes.zip(operations.asScala).map { + case (optimizedNode, operation) => + operation match { + case queryOperation: QueryOperation => + val fieldNames = queryOperation match { + case o: PlannerQueryOperation if o.getCalciteTree.isInstanceOf[LogicalTableModify] => + o.getCalciteTree.getInput(0).getRowType.getFieldNames.asScala.toArray[String] + case _ => + queryOperation.getTableSchema.getFieldNames + } + val dataSet = translate[Row]( + optimizedNode, + getTableSchema(fieldNames, optimizedNode))( + new GenericTypeInfo(classOf[Row])) + dataSet.output(new DiscardingOutputFormat[Row]) + case modifyOperation: ModifyOperation => + val tableSink = getTableSink(modifyOperation) + translate( + batchTableEnv, + optimizedNode, + tableSink, + getTableSchema(modifyOperation.getChild.getTableSchema.getFieldNames, optimizedNode)) + case o => + throw new TableException("Unsupported Operation: " + o.asSummaryString()) + } + } val astPlan = astList.map(RelOptUtil.toString).mkString(System.lineSeparator) val optimizedPlan = optimizedNodes.map(RelOptUtil.toString).mkString(System.lineSeparator) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 3a97106..7c6f144 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -545,8 +545,6 @@ abstract class TableEnvImpl( override def listFunctions(): Array[String] = functionCatalog.getFunctions - override def explain(table: Table): String - override def getCompletionHints(statement: String, position: Int): Array[String] = { val planner = getFlinkPlanner planner.getCompletionHints(statement, position) @@ -763,6 +761,15 @@ abstract class TableEnvImpl( TableResultImpl.TABLE_RESULT_OK case _: ShowViewsOperation => buildShowResult(listViews()) + case explainOperation: ExplainOperation => + val explanation = explain( + JCollections.singletonList(explainOperation.getChild), + extended = false) + TableResultImpl.builder. + resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build) + .data(JCollections.singletonList(Row.of(explanation))) + .build case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG) } } @@ -1128,6 +1135,8 @@ abstract class TableEnvImpl( } } + protected def explain(operations: JList[Operation], extended: Boolean): String + override def fromValues(values: Expression*): Table = { createTable(operationTreeBuilder.values(values: _*)) } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala index b4c6210..3d7dae4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala @@ -28,7 +28,7 @@ import org.apache.calcite.rel.RelRoot import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.RexBuilder import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator} -import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable} +import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable} import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter} import org.apache.calcite.tools.{FrameworkConfig, RelConversionException} import _root_.java.lang.{Boolean => JBoolean} @@ -127,7 +127,14 @@ class FlinkPlannerImpl( || sqlNode.isInstanceOf[SqlShowViews]) { return sqlNode } - validator.validate(sqlNode) + sqlNode match { + case explain: SqlExplain => + val validated = validator.validate(explain.getExplicandum) + explain.setOperand(0, validated) + explain + case _ => + validator.validate(sqlNode) + } } catch { case e: RuntimeException => diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala index f549168..756d9ca 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala @@ -43,6 +43,7 @@ import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.logical.LogicalTableModify import _root_.java.util import _root_.java.util.Objects @@ -123,7 +124,21 @@ class StreamPlanner( require(operations.asScala.nonEmpty, "operations should not be empty") val astWithUpdatesAsRetractionTuples = operations.asScala.map { case queryOperation: QueryOperation => - (getRelBuilder.tableOperation(queryOperation).build(), false) + val relNode = getRelBuilder.tableOperation(queryOperation).build() + relNode match { + // SQL: explain plan for insert into xx + case modify: LogicalTableModify => + // convert LogicalTableModify to CatalogSinkModifyOperation + val qualifiedName = modify.getTable.getQualifiedName + require(qualifiedName.size() == 3, "the length of qualified name should be 3.") + val modifyOperation = new CatalogSinkModifyOperation( + ObjectIdentifier.of(qualifiedName.get(0), qualifiedName.get(1), qualifiedName.get(2)), + new PlannerQueryOperation(modify.getInput) + ) + translateToRel(modifyOperation) + case _ => + (relNode, false) + } case modifyOperation: ModifyOperation => translateToRel(modifyOperation) case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala index c5e90bf..dd83a3e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala @@ -24,13 +24,14 @@ import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecutionEnvironment} -import org.apache.flink.table.api.TableEnvironmentITCase.{getPersonCsvTableSource, getPersonData, readFromResource, replaceStageId} +import org.apache.flink.table.api.TableEnvironmentITCase.{getPersonCsvTableSource, getPersonData} import org.apache.flink.table.api.internal.TableEnvironmentImpl import org.apache.flink.table.api.java.StreamTableEnvironment import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnvironment, _} import org.apache.flink.table.runtime.utils.StreamITCase import org.apache.flink.table.sinks.CsvTableSink import org.apache.flink.table.sources.CsvTableSource +import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId} import org.apache.flink.table.utils.TestingOverwritableTableSink import org.apache.flink.types.Row import org.apache.flink.util.FileUtils @@ -46,7 +47,6 @@ import _root_.java.lang.{Long => JLong} import _root_.java.util import _root_.scala.collection.mutable -import _root_.scala.io.Source @RunWith(classOf[Parameterized]) @@ -452,15 +452,6 @@ object TableEnvironmentITCase { ) } - def readFromResource(file: String): String = { - val source = s"${getClass.getResource("/").getFile}../../src/test/scala/resources/$file" - Source.fromFile(source).mkString - } - - def replaceStageId(s: String): String = { - s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "") - } - def getPersonCsvTableSource: CsvTableSource = { val header = "First#Id#Score#Last" val csvRecords = getPersonData.map { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala index 9823e08..d09bd5d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala @@ -25,10 +25,11 @@ import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath} import org.apache.flink.table.runtime.stream.sql.FunctionITCase.{SimpleScalarFunction, TestUDF} import org.apache.flink.table.utils.TableTestBase import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId} import org.apache.flink.types.Row import org.hamcrest.Matchers.containsString -import org.junit.Assert.{assertEquals, assertFalse, assertTrue} +import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail} import org.junit.Test import java.util @@ -341,6 +342,123 @@ class BatchTableEnvironmentTest extends TableTestBase { tableResult4.collect()) } + @Test + def testExecuteSqlWithExplainSelect(): Unit = { + val util = batchTestUtil() + val createTableStmt = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = util.tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val tableResult2 = util.tableEnv.executeSql( + "explain plan for select * from MyTable where a > 10") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind) + val it = tableResult2.collect() + assertTrue(it.hasNext) + val row = it.next() + assertEquals(1, row.getArity) + val actual = row.getField(0).toString + val expected = readFromResource("testExecuteSqlWithExplainSelect1.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + assertFalse(it.hasNext) + } + + @Test + def testExecuteSqlWithExplainInsert(): Unit = { + val util = batchTestUtil() + val createTableStmt1 = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = util.tableEnv.executeSql(createTableStmt1) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val createTableStmt2 = + """ + |CREATE TABLE MySink ( + | d bigint, + | e int + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult2 = util.tableEnv.executeSql(createTableStmt2) + assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind) + + val tableResult3 = util.tableEnv.executeSql( + "explain plan for insert into MySink select a, b from MyTable where a > 10") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind) + val it = tableResult3.collect() + assertTrue(it.hasNext) + val row = it.next() + assertEquals(1, row.getArity) + val actual = row.getField(0).toString + val expected = readFromResource("testExecuteSqlWithExplainInsert1.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + assertFalse(it.hasNext) + } + + @Test + def testExecuteSqlWithUnsupportedExplain(): Unit = { + val util = batchTestUtil() + val createTableStmt = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = util.tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + // TODO we can support them later + testUnsupportedExplain(util.tableEnv, + "explain plan excluding attributes for select * from MyTable") + testUnsupportedExplain(util.tableEnv, + "explain plan including all attributes for select * from MyTable") + testUnsupportedExplain(util.tableEnv, + "explain plan with type for select * from MyTable") + testUnsupportedExplain(util.tableEnv, + "explain plan without implementation for select * from MyTable") + testUnsupportedExplain(util.tableEnv, + "explain plan as xml for select * from MyTable") + testUnsupportedExplain(util.tableEnv, + "explain plan as json for select * from MyTable") + } + + private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: String): Unit = { + try { + tableEnv.executeSql(explain) + fail("This should not happen") + } catch { + case e: TableException => + assertTrue(e.getMessage.contains("Only default behavior is supported now")) + case e => + fail("This should not happen, " + e.getMessage) + } + } + private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = { while (expected.hasNext && actual.hasNext) { assertEquals(expected.next(), actual.next()) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala index 809d0aa..439fadb 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala @@ -29,20 +29,21 @@ import org.apache.flink.table.api.Expressions.$ import org.apache.flink.table.api.java.internal.{StreamTableEnvironmentImpl => JStreamTableEnvironmentImpl} import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv} import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{EnvironmentSettings, Expressions, TableConfig, Types, ValidationException} -import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} +import org.apache.flink.table.api.{EnvironmentSettings, ResultKind, TableConfig, TableException, Types, ValidationException} +import org.apache.flink.table.catalog.FunctionCatalog import org.apache.flink.table.executor.StreamExecutor +import org.apache.flink.table.module.ModuleManager import org.apache.flink.table.planner.StreamPlanner import org.apache.flink.table.runtime.utils.StreamTestData +import org.apache.flink.table.utils.TableTestUtil.{binaryNode, readFromResource, replaceStageId, streamTableNode, term, unaryNode} import org.apache.flink.table.utils.{CatalogManagerMocks, TableTestBase} -import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode} import org.apache.flink.types.Row +import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail} import org.junit.Test import org.mockito.Mockito.{mock, when} import java.lang.{Integer => JInt, Long => JLong} -import org.apache.flink.table.module.ModuleManager class StreamTableEnvironmentTest extends TableTestBase { @@ -208,6 +209,123 @@ class StreamTableEnvironmentTest extends TableTestBase { jTEnv.fromDataStream(ds, $("rt").rowtime(), $("b"), $("c"), $("d"), $("e"), $("pt").proctime()) } + @Test + def testExecuteSqlWithExplainSelect(): Unit = { + val util = streamTestUtil() + val createTableStmt = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = util.tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val tableResult2 = util.tableEnv.executeSql( + "explain plan for select * from MyTable where a > 10") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind) + val it = tableResult2.collect() + assertTrue(it.hasNext) + val row = it.next() + assertEquals(1, row.getArity) + val actual = row.getField(0).toString + val expected = readFromResource("testExecuteSqlWithExplainSelect0.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + assertFalse(it.hasNext) + } + + @Test + def testExecuteSqlWithExplainInsert(): Unit = { + val util = streamTestUtil() + val createTableStmt1 = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = util.tableEnv.executeSql(createTableStmt1) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val createTableStmt2 = + """ + |CREATE TABLE MySink ( + | d bigint, + | e int + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult2 = util.tableEnv.executeSql(createTableStmt2) + assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind) + + val tableResult3 = util.tableEnv.executeSql( + "explain plan for insert into MySink select a, b from MyTable where a > 10") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind) + val it = tableResult3.collect() + assertTrue(it.hasNext) + val row = it.next() + assertEquals(1, row.getArity) + val actual = row.getField(0).toString + val expected = readFromResource("testExecuteSqlWithExplainInsert0.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + assertFalse(it.hasNext) + } + + @Test + def testExecuteSqlWithUnsupportedExplain(): Unit = { + val util = streamTestUtil() + val createTableStmt = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = util.tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + // TODO we can support them later + testUnsupportedExplain(util.tableEnv, + "explain plan excluding attributes for select * from MyTable") + testUnsupportedExplain(util.tableEnv, + "explain plan including all attributes for select * from MyTable") + testUnsupportedExplain(util.tableEnv, + "explain plan with type for select * from MyTable") + testUnsupportedExplain(util.tableEnv, + "explain plan without implementation for select * from MyTable") + testUnsupportedExplain(util.tableEnv, + "explain plan as xml for select * from MyTable") + testUnsupportedExplain(util.tableEnv, + "explain plan as json for select * from MyTable") + } + + private def testUnsupportedExplain(tableEnv: StreamTableEnvironment, explain: String): Unit = { + try { + tableEnv.executeSql(explain) + fail("This should not happen") + } catch { + case e: TableException => + assertTrue(e.getMessage.contains("Only default behavior is supported now")) + case e => + fail("This should not happen, " + e.getMessage) + } + } + private def prepareSchemaExpressionParser: (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = { diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out new file mode 100644 index 0000000..aad6387 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert0.out @@ -0,0 +1,31 @@ +== Abstract Syntax Tree == +LogicalSink(name=[default_catalog.default_database.MySink], fields=[d, e]) + LogicalProject(d=[$0], e=[$1]) + LogicalFilter(condition=[>($0, 10)]) + LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Logical Plan == +DataStreamSink(name=[default_catalog.default_database.MySink], fields=[d, e]) + DataStreamCalc(select=[a, b], where=[>(a, 10)]) + StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)]) + +== Physical Execution Plan == + : Data Source + content : collect elements with CollectionInputFormat + + : Operator + content : from: (a, b) + ship_strategy : FORWARD + + : Operator + content : where: (>(a, 10)), select: (a, b) + ship_strategy : FORWARD + + : Operator + content : to: Row + ship_strategy : FORWARD + + : Data Sink + content : Sink: Unnamed + ship_strategy : FORWARD + diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out new file mode 100644 index 0000000..98206ae --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainInsert1.out @@ -0,0 +1,36 @@ +== Abstract Syntax Tree == +LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e]) + LogicalProject(d=[$0], e=[$1]) + LogicalFilter(condition=[>($0, 10)]) + LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Logical Plan == +DataSetSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e]) + DataSetCalc(select=[a, b], where=[>(a, 10)]) + BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)]) + +== Physical Execution Plan == + : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + : Map + content : from: (a, b) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : Map + Partitioning : RANDOM_PARTITIONED + + : FlatMap + content : where: (>(a, 10)), select: (a, b) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : FlatMap + Partitioning : RANDOM_PARTITIONED + + : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED + diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out new file mode 100644 index 0000000..4459ad6 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect0.out @@ -0,0 +1,21 @@ +== Abstract Syntax Tree == +LogicalProject(a=[$0], b=[$1], c=[$2]) + LogicalFilter(condition=[>($0, 10)]) + LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Logical Plan == +DataStreamCalc(select=[a, b, c], where=[>(a, 10)]) + StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)]) + +== Physical Execution Plan == + : Data Source + content : collect elements with CollectionInputFormat + + : Operator + content : Map + ship_strategy : FORWARD + + : Operator + content : where: (>(a, 10)), select: (a, b, c) + ship_strategy : FORWARD + diff --git a/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out new file mode 100644 index 0000000..91e87ee --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/resources/testExecuteSqlWithExplainSelect1.out @@ -0,0 +1,27 @@ +== Abstract Syntax Tree == +LogicalProject(a=[$0], b=[$1], c=[$2]) + LogicalFilter(condition=[>($0, 10)]) + LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Logical Plan == +DataSetCalc(select=[a, b, c], where=[>(a, 10)]) + BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], source=[CollectionTableSource(a, b, c)]) + +== Physical Execution Plan == + : Data Source + content : collect elements with CollectionInputFormat + Partitioning : RANDOM_PARTITIONED + + : FlatMap + content : where: (>(a, 10)), select: (a, b, c) + ship_strategy : Forward + exchange_mode : PIPELINED + driver_strategy : FlatMap + Partitioning : RANDOM_PARTITIONED + + : Data Sink + content : org.apache.flink.api.java.io.DiscardingOutputFormat + ship_strategy : Forward + exchange_mode : PIPELINED + Partitioning : RANDOM_PARTITIONED +