This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 3d1c11e [FLINK-20562][sql-client][table] Support explain details for EXPLAIN statement 3d1c11e is described below commit 3d1c11e433f28817f1bb70d5ed028c81db705f27 Author: zhaown <51357674+chao...@users.noreply.github.com> AuthorDate: Wed Jul 21 18:41:14 2021 +0800 [FLINK-20562][sql-client][table] Support explain details for EXPLAIN statement This closes #15317 --- .../flink/table/client/cli/CliClientITCase.java | 7 +- .../src/test/resources/sql/table.q | 238 +++++++++++++++++++++ .../src/main/codegen/data/Parser.tdd | 10 +- .../src/main/codegen/includes/parserImpls.ftl | 36 +++- .../flink/sql/parser/dql/SqlRichExplain.java | 22 +- .../flink/sql/parser/utils/ParserResource.java | 3 + .../ParserResource.properties | 1 + .../flink/sql/parser/FlinkSqlParserImplTest.java | 60 +++++- .../table/api/internal/TableEnvironmentImpl.java | 8 +- .../flink/table/operations/ExplainOperation.java | 29 ++- .../operations/SqlToOperationConverter.java | 7 +- .../operations/SqlToOperationConverterTest.java | 34 +++ .../testExecuteSqlWithExplainDetailsAndUnion.out | 55 +++++ .../testExecuteSqlWithExplainDetailsInsert.out | 70 ++++++ .../testExecuteSqlWithExplainDetailsSelect.out | 45 ++++ .../flink/table/api/TableEnvironmentTest.scala | 122 ++++++++--- 16 files changed, 703 insertions(+), 44 deletions(-) diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java index be8d277..e576d09 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java @@ -302,13 +302,18 @@ public class CliClientITCase extends AbstractTestBase { out.append(sqlScript.comment).append(sqlScript.sql); if (i < results.size()) { Result result = results.get(i); - out.append(result.content).append(result.highestTag.tag).append("\n"); + String content = removeStreamNodeId(result.content); + out.append(content).append(result.highestTag.tag).append("\n"); } } return out.toString(); } + private static String removeStreamNodeId(String s) { + return s.replaceAll("\"id\" : \\d+", "\"id\" : "); + } + private static final class Result { final String content; final Tag highestTag; diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q index 157808e..31797f3 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/table.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q @@ -447,3 +447,241 @@ Sink(table=[default_catalog.default_database.orders2], fields=[user, product, am +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) !ok + +# test explain insert with json format +explain json_execution_plan insert into orders2 select `user`, product, amount, ts from orders; +== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts]) ++- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3]) + +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts]) ++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts]) ++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])", + "pact" : "Data Source", + "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])", + "parallelism" : 1 + }, { + "id" : , + "type" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])", + "pact" : "Operator", + "contents" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "NotNullEnforcer(fields=[user])", + "pact" : "Operator", + "contents" : "NotNullEnforcer(fields=[user])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Sink: Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])", + "pact" : "Data Sink", + "contents" : "Sink: Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +} +!ok + +# test explain select with json format +explain json_execution_plan select `user`, product from orders; +== Abstract Syntax Tree == +LogicalProject(user=[$0], product=[$1]) ++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +== Optimized Physical Plan == +Calc(select=[user, product]) ++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)]) + +- Calc(select=[user, product, ts]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +== Optimized Execution Plan == +Calc(select=[user, product]) ++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)]) + +- Calc(select=[user, product, ts]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])", + "pact" : "Data Source", + "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])", + "parallelism" : 1 + }, { + "id" : , + "type" : "Calc(select=[user, product, ts])", + "pact" : "Operator", + "contents" : "Calc(select=[user, product, ts])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])", + "pact" : "Operator", + "contents" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Calc(select=[user, product])", + "pact" : "Operator", + "contents" : "Calc(select=[user, product])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +} +!ok + +# test explain select with ESTIMATED_COST +explain estimated_cost select `user`, product from orders; +== Abstract Syntax Tree == +LogicalProject(user=[$0], product=[$1]) ++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +== Optimized Physical Plan == +Calc(select=[user, product]): rowcount = 1.0E8, cumulative cost = {4.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} ++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} + +- Calc(select=[user, product, ts]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} + +== Optimized Execution Plan == +Calc(select=[user, product]) ++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)]) + +- Calc(select=[user, product, ts]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +!ok + +# test explain select with CHANGELOG_MODE +explain changelog_mode select `user`, product from orders; +== Abstract Syntax Tree == +LogicalProject(user=[$0], product=[$1]) ++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +== Optimized Physical Plan == +Calc(select=[user, product], changelogMode=[I]) ++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[I]) + +- Calc(select=[user, product, ts], changelogMode=[I]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts], changelogMode=[I]) + +== Optimized Execution Plan == +Calc(select=[user, product]) ++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)]) + +- Calc(select=[user, product, ts]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +!ok + +# test explain select with all details +explain changelog_mode, estimated_cost, json_execution_plan select `user`, product from orders; +== Abstract Syntax Tree == +LogicalProject(user=[$0], product=[$1]) ++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +== Optimized Physical Plan == +Calc(select=[user, product], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {4.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} ++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} + +- Calc(select=[user, product, ts], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} + +== Optimized Execution Plan == +Calc(select=[user, product]) ++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)]) + +- Calc(select=[user, product, ts]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])", + "pact" : "Data Source", + "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])", + "parallelism" : 1 + }, { + "id" : , + "type" : "Calc(select=[user, product, ts])", + "pact" : "Operator", + "contents" : "Calc(select=[user, product, ts])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])", + "pact" : "Operator", + "contents" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Calc(select=[user, product])", + "pact" : "Operator", + "contents" : "Calc(select=[user, product])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +} +!ok diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 86c2857..95df090 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -93,8 +93,10 @@ "org.apache.calcite.sql.SqlAlienSystemTypeNameSpec" "org.apache.calcite.sql.SqlCreate" "org.apache.calcite.sql.SqlDrop" - "java.util.List" "java.util.ArrayList" + "java.util.HashSet" + "java.util.List" + "java.util.Set" ] # List of new keywords. Example: "DATABASES", "TABLES". If the keyword is not a reserved @@ -103,12 +105,15 @@ keywords: [ "BYTES" "CATALOGS" + "CHANGELOG_MODE" "COMMENT" "DATABASES" "ENFORCED" + "ESTIMATED_COST" "EXTENDED" "FUNCTIONS" "IF" + "JSON_EXECUTION_PLAN" "JAR" "JARS" "LOAD" @@ -460,8 +465,11 @@ # Please keep the keyword in alphabetical order if new keyword is added. nonReservedKeywordsToAdd: [ # not in core, added in Flink + "CHANGELOG_MODE" "ENFORCED" + "ESTIMATED_COST" "IF" + "JSON_EXECUTION_PLAN" "METADATA" "OVERWRITE" "OVERWRITING" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 980d763..f8c0a09 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -1659,16 +1659,48 @@ SqlEndStatementSet SqlEndStatementSet() : SqlNode SqlRichExplain() : { SqlNode stmt; + Set<String> explainDetails = new HashSet<String>(); } { - <EXPLAIN> [ <PLAN> <FOR> ] + <EXPLAIN> + [ + <PLAN> <FOR> + | + ParseExplainDetail(explainDetails) + ( + <COMMA> + ParseExplainDetail(explainDetails) + )* + ] ( stmt = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) | stmt = RichSqlInsert() ) { - return new SqlRichExplain(getPos(), stmt); + return new SqlRichExplain(getPos(), stmt, explainDetails); + } +} + +void ParseExplainDetail(Set<String> explainDetails): +{ +} +{ + ( + <ESTIMATED_COST> + | + <CHANGELOG_MODE> + | + <JSON_EXECUTION_PLAN> + ) + { + if (explainDetails.contains(token.image.toUpperCase())) { + throw SqlUtil.newContextException( + getPos(), + ParserResource.RESOURCE.explainDetailIsDuplicate()); + } else { + explainDetails.add(token.image.toUpperCase()); + } } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java index 716c37f..bc8988b 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java @@ -27,19 +27,30 @@ import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; -/** EXPLAIN (PLAN FOR)* STATEMENT sql call. */ +/** + * EXPLAIN [PLAN FOR | (ESTIMATED_COST | CHANGELOG_MODE | JSON_EXECUTION_PLAN) (,(ESTIMATED_COST | + * CHANGELOG_MODE | JSON_EXECUTION_PLAN))*] STATEMENT sql call. + */ public class SqlRichExplain extends SqlCall { public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN); private SqlNode statement; + private final Set<String> explainDetails; public SqlRichExplain(SqlParserPos pos, SqlNode statement) { + this(pos, statement, new HashSet<>()); + } + + public SqlRichExplain(SqlParserPos pos, SqlNode statement, Set<String> explainDetails) { super(pos); this.statement = statement; + this.explainDetails = explainDetails; } public SqlNode getStatement() { @@ -56,9 +67,16 @@ public class SqlRichExplain extends SqlCall { return Collections.singletonList(statement); } + public Set<String> getExplainDetails() { + return explainDetails; + } + @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("EXPLAIN"); + if (!explainDetails.isEmpty()) { + writer.keyword(String.join(", ", explainDetails)); + } statement.unparse(writer, leftPrec, rightPrec); } @@ -68,7 +86,7 @@ public class SqlRichExplain extends SqlCall { statement = operand; } else { throw new UnsupportedOperationException( - "SqlRichExplain SqlNode only support index equals 1"); + "SqlRichExplain SqlNode only support index equals 0"); } } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java index 34dbde5..433a688 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java @@ -37,4 +37,7 @@ public interface ParserResource { @Resources.BaseMessage( "CREATE SYSTEM FUNCTION is not supported, system functions can only be registered as temporary function, you can use CREATE TEMPORARY SYSTEM FUNCTION instead.") Resources.ExInst<ParseException> createSystemFunctionOnlySupportTemporary(); + + @Resources.BaseMessage("Duplicate EXPLAIN DETAIL is not allowed.") + Resources.ExInst<ParseException> explainDetailIsDuplicate(); } diff --git a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties index d77af88..c43d94f 100644 --- a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties +++ b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties @@ -19,3 +19,4 @@ MultipleWatermarksUnsupported=Multiple WATERMARK statements is not supported yet. OverwriteIsOnlyUsedWithInsert=OVERWRITE expression is only used with INSERT statement. createSystemFunctionOnlySupportTemporary=CREATE SYSTEM FUNCTION is not supported, system functions can only be registered as temporary function, you can use CREATE TEMPORARY SYSTEM FUNCTION instead. +explainDetailIsDuplicate=Duplicate EXPLAIN DETAIL is not allowed. diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 552f7d2..5daa7fc 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -1290,8 +1290,41 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testExplain() { + String sql = "explain select * from emps"; + String expected = "EXPLAIN SELECT *\nFROM `EMPS`"; + this.sql(sql).ok(expected); + } + + @Test + public void testExplainPlanFor() { String sql = "explain plan for select * from emps"; - String expected = "EXPLAIN SELECT *\n" + "FROM `EMPS`"; + String expected = "EXPLAIN SELECT *\nFROM `EMPS`"; + this.sql(sql).ok(expected); + } + + @Test + public void testExplainChangelogMode() { + String sql = "explain changelog_mode select * from emps"; + String expected = "EXPLAIN CHANGELOG_MODE SELECT *\nFROM `EMPS`"; + this.sql(sql).ok(expected); + } + + @Test + public void testExplainEstimatedCost() { + String sql = "explain estimated_cost select * from emps"; + String expected = "EXPLAIN ESTIMATED_COST SELECT *\nFROM `EMPS`"; + this.sql(sql).ok(expected); + } + + @Test + public void testExplainUnion() { + String sql = "explain estimated_cost select * from emps union all select * from emps"; + String expected = + "EXPLAIN ESTIMATED_COST (SELECT *\n" + + "FROM `EMPS`\n" + + "UNION ALL\n" + + "SELECT *\n" + + "FROM `EMPS`)"; this.sql(sql).ok(expected); } @@ -1327,7 +1360,18 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testExplainAsJson() { - // TODO: FLINK-20562 + String sql = "explain json_execution_plan select * from emps"; + String expected = "EXPLAIN JSON_EXECUTION_PLAN SELECT *\n" + "FROM `EMPS`"; + this.sql(sql).ok(expected); + } + + @Test + public void testExplainAllDetails() { + String sql = "explain changelog_mode,json_execution_plan,estimated_cost select * from emps"; + String expected = + "EXPLAIN JSON_EXECUTION_PLAN, CHANGELOG_MODE, ESTIMATED_COST SELECT *\n" + + "FROM `EMPS`"; + this.sql(sql).ok(expected); } @Test @@ -1344,6 +1388,18 @@ public class FlinkSqlParserImplTest extends SqlParserTest { } @Test + public void testExplainPlanForWithExplainDetails() { + String sql = "explain plan for ^json_execution_plan^ upsert into emps1 values (1, 2)"; + this.sql(sql).fails("Non-query expression encountered in illegal context"); + } + + @Test + public void testExplainDuplicateExplainDetails() { + String sql = "explain changelog_mode,^changelog_mode^ select * from emps"; + this.sql(sql).fails("Duplicate EXPLAIN DETAIL is not allowed."); + } + + @Test public void testAddJar() { sql("add Jar './test.sql'").ok("ADD JAR './test.sql'"); sql("add JAR 'file:///path/to/\nwhatever'").ok("ADD JAR 'file:///path/to/\nwhatever'"); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index daaf564..6c5e21a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -1288,9 +1288,15 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { throw new TableException(exMsg, e); } } else if (operation instanceof ExplainOperation) { + ExplainOperation explainOperation = (ExplainOperation) operation; + ExplainDetail[] explainDetails = + explainOperation.getExplainDetails().stream() + .map(ExplainDetail::valueOf) + .toArray(ExplainDetail[]::new); String explanation = explainInternal( - Collections.singletonList(((ExplainOperation) operation).getChild())); + Collections.singletonList(((ExplainOperation) operation).getChild()), + explainDetails); return TableResultImpl.builder() .resultKind(ResultKind.SUCCESS_WITH_CONTENT) .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java index 21241a3..b6e9781 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java @@ -19,16 +19,22 @@ package org.apache.flink.table.operations; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; -/** - * Operation to describe an EXPLAIN statement. NOTES: currently, only default behavior (EXPLAIN - * [PLAN FOR] xx) is supported. - */ +/** Operation to describe an EXPLAIN statement. */ public class ExplainOperation implements Operation { private final Operation child; + private final Set<String> explainDetails; public ExplainOperation(Operation child) { + this(child, new HashSet<>()); + } + + public ExplainOperation(Operation child, Set<String> explainDetails) { this.child = child; + this.explainDetails = explainDetails; } public Operation getChild() { @@ -37,10 +43,23 @@ public class ExplainOperation implements Operation { @Override public String asSummaryString() { + String operationName = "EXPLAIN"; + if (!explainDetails.isEmpty()) { + operationName = + String.format( + "EXPLAIN %s", + explainDetails.stream() + .map(String::toUpperCase) + .collect(Collectors.joining(", "))); + } return OperationUtils.formatWithChildren( - "EXPLAIN", + operationName, Collections.emptyMap(), Collections.singletonList(child), Operation::asSummaryString); } + + public Set<String> getExplainDetails() { + return explainDetails; + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 92c6fe2..4dbff13 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -158,7 +158,6 @@ import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.dialect.CalciteSqlDialect; import org.apache.calcite.sql.parser.SqlParser; @@ -946,16 +945,18 @@ public class SqlToOperationConverter { private Operation convertRichExplain(SqlRichExplain sqlExplain) { Operation operation; SqlNode sqlNode = sqlExplain.getStatement(); + Set<String> explainDetails = sqlExplain.getExplainDetails(); + if (sqlNode instanceof RichSqlInsert) { operation = convertSqlInsert((RichSqlInsert) sqlNode); - } else if (sqlNode instanceof SqlSelect) { + } else if (sqlNode.getKind().belongsTo(SqlKind.QUERY)) { operation = convertSqlQuery(sqlExplain.getStatement()); } else { throw new ValidationException( String.format( "EXPLAIN statement doesn't support %s", sqlNode.getKind().toString())); } - return new ExplainOperation(operation); + return new ExplainOperation(operation, explainDetails); } /** Convert DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier. */ diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 1d1e4ab..7f00ea3 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.operations; import org.apache.flink.sql.parser.ddl.SqlCreateTable; +import org.apache.flink.sql.parser.dql.SqlRichExplain; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.SqlDialect; @@ -576,6 +577,39 @@ public class SqlToOperationConverterTest { } @Test + public void testExplainWithSelect() { + final String sql = "explain select * from t1"; + checkExplainSql(sql); + } + + @Test + public void testExplainWithInsert() { + final String sql = "explain insert into t2 select * from t1"; + checkExplainSql(sql); + } + + @Test + public void testExplainWithUnion() { + final String sql = "explain select * from t1 union select * from t2"; + checkExplainSql(sql); + } + + @Test + public void testExplainWithExplainDetails() { + String sql = "explain changelog_mode, estimated_cost, json_execution_plan select * from t1"; + checkExplainSql(sql); + } + + private void checkExplainSql(String sql) { + final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); + SqlNode node = parser.parse(sql); + assert node instanceof SqlRichExplain; + Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get(); + assert operation instanceof ExplainOperation; + } + + @Test public void testCreateTableWithWatermark() throws FunctionAlreadyExistException, DatabaseNotExistException { CatalogFunction cf = diff --git a/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsAndUnion.out b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsAndUnion.out new file mode 100644 index 0000000..f787da2 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsAndUnion.out @@ -0,0 +1,55 @@ +== Abstract Syntax Tree == +LogicalUnion(all=[true]) +:- LogicalProject(a=[$0], b=[$1], c=[$2]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]]) ++- LogicalProject(a=[$0], b=[$1], c=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [CollectionTableSource(a, b, c)]]]) + +== Optimized Physical Plan == +Union(all=[true], union=[a, b, c], changelogMode=[I]): rowcount = 2.0E8, cumulative cost = {4.0E8 rows, 4.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory} +:- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory} ++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory} + +== Optimized Execution Plan == +Union(all=[true], union=[a, b, c]) +:- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c]) ++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: Custom Source", + "pact" : "Data Source", + "contents" : "Source: Custom Source", + "parallelism" : 1 + }, { + "id" : , + "type" : "SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])", + "pact" : "Operator", + "contents" : "SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Source: Custom Source", + "pact" : "Data Source", + "contents" : "Source: Custom Source", + "parallelism" : 1 + }, { + "id" : , + "type" : "SourceConversion(table=[default_catalog.default_database.MyTable2, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])", + "pact" : "Operator", + "contents" : "SourceConversion(table=[default_catalog.default_database.MyTable2, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsInsert.out b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsInsert.out new file mode 100644 index 0000000..3c2be6d --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsInsert.out @@ -0,0 +1,70 @@ +== Abstract Syntax Tree == +LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e]) ++- LogicalProject(a=[$0], b=[$1]) + +- LogicalFilter(condition=[>($0, 10)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]]) + +== Optimized Physical Plan == +LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e], changelogMode=[NONE]): rowcount = 5.0E7, cumulative cost = {2.0E8 rows, 1.5E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory} ++- Calc(select=[a, b], where=[>(a, 10)], changelogMode=[I]): rowcount = 5.0E7, cumulative cost = {1.5E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory} + +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory} + +== Optimized Execution Plan == +LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e]) ++- Calc(select=[a, b], where=[(a > 10)]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: Custom Source", + "pact" : "Data Source", + "contents" : "Source: Custom Source", + "parallelism" : 1 + }, { + "id" : , + "type" : "SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])", + "pact" : "Operator", + "contents" : "SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Calc(select=[a, b], where=[(a > 10)])", + "pact" : "Operator", + "contents" : "Calc(select=[a, b], where=[(a > 10)])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "SinkConversionToRow", + "pact" : "Operator", + "contents" : "SinkConversionToRow", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Sink: Unnamed", + "pact" : "Data Sink", + "contents" : "Sink: Unnamed", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsSelect.out b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsSelect.out new file mode 100644 index 0000000..385f2f5 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsSelect.out @@ -0,0 +1,45 @@ +== Abstract Syntax Tree == +LogicalProject(a=[$0], b=[$1], c=[$2]) ++- LogicalFilter(condition=[>($0, 10)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]]) + +== Optimized Physical Plan == +Calc(select=[a, b, c], where=[>(a, 10)], changelogMode=[I]): rowcount = 5.0E7, cumulative cost = {1.5E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory} ++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory} + +== Optimized Execution Plan == +Calc(select=[a, b, c], where=[(a > 10)]) ++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : , + "type" : "Source: Custom Source", + "pact" : "Data Source", + "contents" : "Source: Custom Source", + "parallelism" : 1 + }, { + "id" : , + "type" : "SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])", + "pact" : "Operator", + "contents" : "SourceConversion(table=[default_catalog.default_database.MyTable, source: [CollectionTableSource(a, b, c)]], fields=[a, b, c])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : , + "type" : "Calc(select=[a, b, c], where=[(a > 10)])", + "pact" : "Operator", + "contents" : "Calc(select=[a, b, c], where=[(a > 10)])", + "parallelism" : 1, + "predecessors" : [ { + "id" : , + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + } ] +} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index 7409749..7b9f0c8 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -33,7 +33,7 @@ import org.apache.flink.table.module.ModuleEntry import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory._ import org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase.TestUDF import org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.SimpleScalarFunction -import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId +import org.apache.flink.table.planner.utils.TableTestUtil.{replaceStageId, replaceStreamNodeId} import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks} import org.apache.flink.table.types.DataType import org.apache.flink.types.Row @@ -1217,16 +1217,8 @@ class TableEnvironmentTest { val tableResult1 = tableEnv.executeSql(createTableStmt) assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) - val tableResult2 = tableEnv.executeSql("explain plan for select * from MyTable where a > 10") - assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind) - val it = tableResult2.collect() - assertTrue(it.hasNext) - val row = it.next() - assertEquals(1, row.getArity) - val actual = row.getField(0).toString - val expected = TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainSelect.out") - assertEquals(replaceStageId(expected), replaceStageId(actual)) - assertFalse(it.hasNext) + checkExplain("explain plan for select * from MyTable where a > 10", + "/explain/testExecuteSqlWithExplainSelect.out") } @Test @@ -1258,17 +1250,8 @@ class TableEnvironmentTest { val tableResult2 = tableEnv.executeSql(createTableStmt2) assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind) - val tableResult3 = tableEnv.executeSql( - "explain plan for insert into MySink select a, b from MyTable where a > 10") - assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind) - val it = tableResult3.collect() - assertTrue(it.hasNext) - val row = it.next() - assertEquals(1, row.getArity) - val actual = row.getField(0).toString - val expected = TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainInsert.out") - assertEquals(replaceStageId(expected), replaceStageId(actual)) - assertFalse(it.hasNext) + checkExplain("explain plan for insert into MySink select a, b from MyTable where a > 10", + "/explain/testExecuteSqlWithExplainInsert.out") } @Test @@ -1354,7 +1337,7 @@ class TableEnvironmentTest { } @Test - def testTableExplain(): Unit = { + def testExecuteSqlWithExplainDetailsSelect(): Unit = { val createTableStmt = """ |CREATE TABLE MyTable ( @@ -1369,10 +1352,81 @@ class TableEnvironmentTest { val tableResult1 = tableEnv.executeSql(createTableStmt) assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) - val actual = tableEnv.sqlQuery("select * from MyTable where a > 10") - .explain(ExplainDetail.CHANGELOG_MODE) - val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out") - assertEquals(replaceStageId(expected), replaceStageId(actual)) + checkExplain( + "explain changelog_mode, estimated_cost, json_execution_plan " + + "select * from MyTable where a > 10", + "/explain/testExecuteSqlWithExplainDetailsSelect.out"); + } + + @Test + def testExecuteSqlWithExplainDetailsAndUnion(): Unit = { + val createTableStmt = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val createTableStmt2 = + """ + |CREATE TABLE MyTable2 ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult3 = tableEnv.executeSql(createTableStmt2) + assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind) + + checkExplain( + "explain changelog_mode, estimated_cost, json_execution_plan " + + "select * from MyTable union all select * from MyTable2", + "/explain/testExecuteSqlWithExplainDetailsAndUnion.out") + } + + @Test + def testExecuteSqlWithExplainDetailsInsert(): Unit = { + val createTableStmt1 = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = tableEnv.executeSql(createTableStmt1) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val createTableStmt2 = + """ + |CREATE TABLE MySink ( + | d bigint, + | e int + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult2 = tableEnv.executeSql(createTableStmt2) + assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind) + + checkExplain( + "explain changelog_mode, estimated_cost, json_execution_plan " + + "insert into MySink select a, b from MyTable where a > 10", + "/explain/testExecuteSqlWithExplainDetailsInsert.out") } private def testUnsupportedExplain(explain: String): Unit = { @@ -1576,4 +1630,18 @@ class TableEnvironmentTest { assertTrue(source.isInstanceOf[CollectionTableSource]) assertEquals(expectToBeBounded, source.asInstanceOf[CollectionTableSource].isBounded) } + + private def checkExplain(sql: String, resultPath: String): Unit = { + val tableResult2 = tableEnv.executeSql(sql) + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind) + val it = tableResult2.collect() + assertTrue(it.hasNext) + val row = it.next() + assertEquals(1, row.getArity) + val actual = replaceStreamNodeId(row.getField(0).toString.trim) + val expected = replaceStreamNodeId(TableTestUtil + .readFromResource(resultPath).trim) + assertEquals(replaceStageId(expected), replaceStageId(actual)) + assertFalse(it.hasNext) + } }