This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d1c11e  [FLINK-20562][sql-client][table] Support explain details for 
EXPLAIN statement
3d1c11e is described below

commit 3d1c11e433f28817f1bb70d5ed028c81db705f27
Author: zhaown <51357674+chao...@users.noreply.github.com>
AuthorDate: Wed Jul 21 18:41:14 2021 +0800

    [FLINK-20562][sql-client][table] Support explain details for EXPLAIN 
statement
    
    This closes #15317
---
 .../flink/table/client/cli/CliClientITCase.java    |   7 +-
 .../src/test/resources/sql/table.q                 | 238 +++++++++++++++++++++
 .../src/main/codegen/data/Parser.tdd               |  10 +-
 .../src/main/codegen/includes/parserImpls.ftl      |  36 +++-
 .../flink/sql/parser/dql/SqlRichExplain.java       |  22 +-
 .../flink/sql/parser/utils/ParserResource.java     |   3 +
 .../ParserResource.properties                      |   1 +
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  60 +++++-
 .../table/api/internal/TableEnvironmentImpl.java   |   8 +-
 .../flink/table/operations/ExplainOperation.java   |  29 ++-
 .../operations/SqlToOperationConverter.java        |   7 +-
 .../operations/SqlToOperationConverterTest.java    |  34 +++
 .../testExecuteSqlWithExplainDetailsAndUnion.out   |  55 +++++
 .../testExecuteSqlWithExplainDetailsInsert.out     |  70 ++++++
 .../testExecuteSqlWithExplainDetailsSelect.out     |  45 ++++
 .../flink/table/api/TableEnvironmentTest.scala     | 122 ++++++++---
 16 files changed, 703 insertions(+), 44 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
index be8d277..e576d09 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
@@ -302,13 +302,18 @@ public class CliClientITCase extends AbstractTestBase {
             out.append(sqlScript.comment).append(sqlScript.sql);
             if (i < results.size()) {
                 Result result = results.get(i);
-                
out.append(result.content).append(result.highestTag.tag).append("\n");
+                String content = removeStreamNodeId(result.content);
+                out.append(content).append(result.highestTag.tag).append("\n");
             }
         }
 
         return out.toString();
     }
 
+    private static String removeStreamNodeId(String s) {
+        return s.replaceAll("\"id\" : \\d+", "\"id\" : ");
+    }
+
     private static final class Result {
         final String content;
         final Tag highestTag;
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q 
b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index 157808e..31797f3 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -447,3 +447,241 @@ Sink(table=[default_catalog.default_database.orders2], 
fields=[user, product, am
    +- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts])
 
 !ok
+
+# test explain insert with json format
+explain json_execution_plan insert into orders2 select `user`, product, 
amount, ts from orders;
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, 
product, amount, ts])
++- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3])
+   +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL 
SECOND)])
+      +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], 
ptime=[PROCTIME()])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
orders]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, 
amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+   +- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, 
amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: TableSourceScan(table=[[default_catalog, 
default_database, orders]], fields=[user, product, amount, ts])",
+    "pact" : "Data Source",
+    "contents" : "Source: TableSourceScan(table=[[default_catalog, 
default_database, orders]], fields=[user, product, amount, ts])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL 
SECOND)])",
+    "pact" : "Operator",
+    "contents" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 
1000:INTERVAL SECOND)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "NotNullEnforcer(fields=[user])",
+    "pact" : "Operator",
+    "contents" : "NotNullEnforcer(fields=[user])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Sink(table=[default_catalog.default_database.orders2], 
fields=[user, product, amount, ts])",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Sink(table=[default_catalog.default_database.orders2], 
fields=[user, product, amount, ts])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
+!ok
+
+# test explain select with json format
+explain json_execution_plan select `user`, product from orders;
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL 
SECOND)])
+   +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], 
ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: TableSourceScan(table=[[default_catalog, 
default_database, orders]], fields=[user, product, amount, ts])",
+    "pact" : "Data Source",
+    "contents" : "Source: TableSourceScan(table=[[default_catalog, 
default_database, orders]], fields=[user, product, amount, ts])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc(select=[user, product, ts])",
+    "pact" : "Operator",
+    "contents" : "Calc(select=[user, product, ts])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL 
SECOND)])",
+    "pact" : "Operator",
+    "contents" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 
1000:INTERVAL SECOND)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc(select=[user, product])",
+    "pact" : "Operator",
+    "contents" : "Calc(select=[user, product])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
+!ok
+
+# test explain select with ESTIMATED_COST
+explain estimated_cost select `user`, product from orders;
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL 
SECOND)])
+   +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], 
ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product]): rowcount = 1.0E8, cumulative cost = {4.0E8 rows, 
2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)]): 
rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 
network, 0.0 memory}
+   +- Calc(select=[user, product, ts]): rowcount = 1.0E8, cumulative cost = 
{2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts]): rowcount = 1.0E8, cumulative cost = {1.0E8 
rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts])
+
+!ok
+
+# test explain select with CHANGELOG_MODE
+explain changelog_mode select `user`, product from orders;
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL 
SECOND)])
+   +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], 
ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product], changelogMode=[I])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], 
changelogMode=[I])
+   +- Calc(select=[user, product, ts], changelogMode=[I])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts])
+
+!ok
+
+# test explain select with all details
+explain changelog_mode, estimated_cost, json_execution_plan select `user`, 
product from orders;
+== Abstract Syntax Tree ==
+LogicalProject(user=[$0], product=[$1])
++- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL 
SECOND)])
+   +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], 
ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+
+== Optimized Physical Plan ==
+Calc(select=[user, product], changelogMode=[I]): rowcount = 1.0E8, cumulative 
cost = {4.0E8 rows, 2.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], 
changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {3.0E8 rows, 2.0E8 cpu, 
3.6E9 io, 0.0 network, 0.0 memory}
+   +- Calc(select=[user, product, ts], changelogMode=[I]): rowcount = 1.0E8, 
cumulative cost = {2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts], changelogMode=[I]): rowcount = 1.0E8, 
cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
+
+== Optimized Execution Plan ==
+Calc(select=[user, product])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- Calc(select=[user, product, ts])
+      +- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: TableSourceScan(table=[[default_catalog, 
default_database, orders]], fields=[user, product, amount, ts])",
+    "pact" : "Data Source",
+    "contents" : "Source: TableSourceScan(table=[[default_catalog, 
default_database, orders]], fields=[user, product, amount, ts])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc(select=[user, product, ts])",
+    "pact" : "Operator",
+    "contents" : "Calc(select=[user, product, ts])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL 
SECOND)])",
+    "pact" : "Operator",
+    "contents" : "WatermarkAssigner(rowtime=[ts], watermark=[(ts - 
1000:INTERVAL SECOND)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc(select=[user, product])",
+    "pact" : "Operator",
+    "contents" : "Calc(select=[user, product])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
+!ok
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 86c2857..95df090 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -93,8 +93,10 @@
     "org.apache.calcite.sql.SqlAlienSystemTypeNameSpec"
     "org.apache.calcite.sql.SqlCreate"
     "org.apache.calcite.sql.SqlDrop"
-    "java.util.List"
     "java.util.ArrayList"
+    "java.util.HashSet"
+    "java.util.List"
+    "java.util.Set"
   ]
 
   # List of new keywords. Example: "DATABASES", "TABLES". If the keyword is 
not a reserved
@@ -103,12 +105,15 @@
   keywords: [
     "BYTES"
     "CATALOGS"
+    "CHANGELOG_MODE"
     "COMMENT"
     "DATABASES"
     "ENFORCED"
+    "ESTIMATED_COST"
     "EXTENDED"
     "FUNCTIONS"
     "IF"
+    "JSON_EXECUTION_PLAN"
     "JAR"
     "JARS"
     "LOAD"
@@ -460,8 +465,11 @@
   # Please keep the keyword in alphabetical order if new keyword is added.
   nonReservedKeywordsToAdd: [
     # not in core, added in Flink
+    "CHANGELOG_MODE"
     "ENFORCED"
+    "ESTIMATED_COST"
     "IF"
+    "JSON_EXECUTION_PLAN"
     "METADATA"
     "OVERWRITE"
     "OVERWRITING"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 980d763..f8c0a09 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1659,16 +1659,48 @@ SqlEndStatementSet SqlEndStatementSet() :
 SqlNode SqlRichExplain() :
 {
     SqlNode stmt;
+    Set<String> explainDetails = new HashSet<String>();
 }
 {
-    <EXPLAIN> [ <PLAN> <FOR> ]
+    <EXPLAIN> 
+    [
+        <PLAN> <FOR> 
+        |
+            ParseExplainDetail(explainDetails)
+        (
+            <COMMA>
+            ParseExplainDetail(explainDetails)
+        )*
+    ]
     (
         stmt = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
         |
         stmt = RichSqlInsert()
     )
     {
-        return new SqlRichExplain(getPos(), stmt);
+        return new SqlRichExplain(getPos(), stmt, explainDetails);
+    }
+}
+
+void ParseExplainDetail(Set<String> explainDetails):
+{
+}
+{
+    (
+        <ESTIMATED_COST> 
+        | 
+        <CHANGELOG_MODE> 
+        | 
+        <JSON_EXECUTION_PLAN>
+    ) 
+    {
+        if (explainDetails.contains(token.image.toUpperCase())) {
+            throw SqlUtil.newContextException(
+                getPos(),
+                ParserResource.RESOURCE.explainDetailIsDuplicate());
+        } else {
+            explainDetails.add(token.image.toUpperCase());
+        }
     }
 }
 
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java
index 716c37f..bc8988b 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java
@@ -27,19 +27,30 @@ import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
-/** EXPLAIN (PLAN FOR)* STATEMENT sql call. */
+/**
+ * EXPLAIN [PLAN FOR | (ESTIMATED_COST | CHANGELOG_MODE | JSON_EXECUTION_PLAN) 
(,(ESTIMATED_COST |
+ * CHANGELOG_MODE | JSON_EXECUTION_PLAN))*] STATEMENT sql call.
+ */
 public class SqlRichExplain extends SqlCall {
 
     public static final SqlSpecialOperator OPERATOR =
             new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
 
     private SqlNode statement;
+    private final Set<String> explainDetails;
 
     public SqlRichExplain(SqlParserPos pos, SqlNode statement) {
+        this(pos, statement, new HashSet<>());
+    }
+
+    public SqlRichExplain(SqlParserPos pos, SqlNode statement, Set<String> 
explainDetails) {
         super(pos);
         this.statement = statement;
+        this.explainDetails = explainDetails;
     }
 
     public SqlNode getStatement() {
@@ -56,9 +67,16 @@ public class SqlRichExplain extends SqlCall {
         return Collections.singletonList(statement);
     }
 
+    public Set<String> getExplainDetails() {
+        return explainDetails;
+    }
+
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
         writer.keyword("EXPLAIN");
+        if (!explainDetails.isEmpty()) {
+            writer.keyword(String.join(", ", explainDetails));
+        }
         statement.unparse(writer, leftPrec, rightPrec);
     }
 
@@ -68,7 +86,7 @@ public class SqlRichExplain extends SqlCall {
             statement = operand;
         } else {
             throw new UnsupportedOperationException(
-                    "SqlRichExplain SqlNode only support index equals 1");
+                    "SqlRichExplain SqlNode only support index equals 0");
         }
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 34dbde5..433a688 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -37,4 +37,7 @@ public interface ParserResource {
     @Resources.BaseMessage(
             "CREATE SYSTEM FUNCTION is not supported, system functions can 
only be registered as temporary function, you can use CREATE TEMPORARY SYSTEM 
FUNCTION instead.")
     Resources.ExInst<ParseException> 
createSystemFunctionOnlySupportTemporary();
+
+    @Resources.BaseMessage("Duplicate EXPLAIN DETAIL is not allowed.")
+    Resources.ExInst<ParseException> explainDetailIsDuplicate();
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
 
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
index d77af88..c43d94f 100644
--- 
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
+++ 
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
@@ -19,3 +19,4 @@
 MultipleWatermarksUnsupported=Multiple WATERMARK statements is not supported 
yet.
 OverwriteIsOnlyUsedWithInsert=OVERWRITE expression is only used with INSERT 
statement.
 createSystemFunctionOnlySupportTemporary=CREATE SYSTEM FUNCTION is not 
supported, system functions can only be registered as temporary function, you 
can use CREATE TEMPORARY SYSTEM FUNCTION instead.
+explainDetailIsDuplicate=Duplicate EXPLAIN DETAIL is not allowed.
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 552f7d2..5daa7fc 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -1290,8 +1290,41 @@ public class FlinkSqlParserImplTest extends 
SqlParserTest {
 
     @Test
     public void testExplain() {
+        String sql = "explain select * from emps";
+        String expected = "EXPLAIN SELECT *\nFROM `EMPS`";
+        this.sql(sql).ok(expected);
+    }
+
+    @Test
+    public void testExplainPlanFor() {
         String sql = "explain plan for select * from emps";
-        String expected = "EXPLAIN SELECT *\n" + "FROM `EMPS`";
+        String expected = "EXPLAIN SELECT *\nFROM `EMPS`";
+        this.sql(sql).ok(expected);
+    }
+
+    @Test
+    public void testExplainChangelogMode() {
+        String sql = "explain changelog_mode select * from emps";
+        String expected = "EXPLAIN CHANGELOG_MODE SELECT *\nFROM `EMPS`";
+        this.sql(sql).ok(expected);
+    }
+
+    @Test
+    public void testExplainEstimatedCost() {
+        String sql = "explain estimated_cost select * from emps";
+        String expected = "EXPLAIN ESTIMATED_COST SELECT *\nFROM `EMPS`";
+        this.sql(sql).ok(expected);
+    }
+
+    @Test
+    public void testExplainUnion() {
+        String sql = "explain estimated_cost select * from emps union all 
select * from emps";
+        String expected =
+                "EXPLAIN ESTIMATED_COST (SELECT *\n"
+                        + "FROM `EMPS`\n"
+                        + "UNION ALL\n"
+                        + "SELECT *\n"
+                        + "FROM `EMPS`)";
         this.sql(sql).ok(expected);
     }
 
@@ -1327,7 +1360,18 @@ public class FlinkSqlParserImplTest extends 
SqlParserTest {
 
     @Test
     public void testExplainAsJson() {
-        // TODO: FLINK-20562
+        String sql = "explain json_execution_plan select * from emps";
+        String expected = "EXPLAIN JSON_EXECUTION_PLAN SELECT *\n" + "FROM 
`EMPS`";
+        this.sql(sql).ok(expected);
+    }
+
+    @Test
+    public void testExplainAllDetails() {
+        String sql = "explain 
changelog_mode,json_execution_plan,estimated_cost select * from emps";
+        String expected =
+                "EXPLAIN JSON_EXECUTION_PLAN, CHANGELOG_MODE, ESTIMATED_COST 
SELECT *\n"
+                        + "FROM `EMPS`";
+        this.sql(sql).ok(expected);
     }
 
     @Test
@@ -1344,6 +1388,18 @@ public class FlinkSqlParserImplTest extends 
SqlParserTest {
     }
 
     @Test
+    public void testExplainPlanForWithExplainDetails() {
+        String sql = "explain plan for ^json_execution_plan^ upsert into emps1 
values (1, 2)";
+        this.sql(sql).fails("Non-query expression encountered in illegal 
context");
+    }
+
+    @Test
+    public void testExplainDuplicateExplainDetails() {
+        String sql = "explain changelog_mode,^changelog_mode^ select * from 
emps";
+        this.sql(sql).fails("Duplicate EXPLAIN DETAIL is not allowed.");
+    }
+
+    @Test
     public void testAddJar() {
         sql("add Jar './test.sql'").ok("ADD JAR './test.sql'");
         sql("add JAR 'file:///path/to/\nwhatever'").ok("ADD JAR 
'file:///path/to/\nwhatever'");
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index daaf564..6c5e21a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -1288,9 +1288,15 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                 throw new TableException(exMsg, e);
             }
         } else if (operation instanceof ExplainOperation) {
+            ExplainOperation explainOperation = (ExplainOperation) operation;
+            ExplainDetail[] explainDetails =
+                    explainOperation.getExplainDetails().stream()
+                            .map(ExplainDetail::valueOf)
+                            .toArray(ExplainDetail[]::new);
             String explanation =
                     explainInternal(
-                            Collections.singletonList(((ExplainOperation) 
operation).getChild()));
+                            Collections.singletonList(((ExplainOperation) 
operation).getChild()),
+                            explainDetails);
             return TableResultImpl.builder()
                     .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
                     .schema(ResolvedSchema.of(Column.physical("result", 
DataTypes.STRING())))
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
index 21241a3..b6e9781 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
@@ -19,16 +19,22 @@
 package org.apache.flink.table.operations;
 
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
 
-/**
- * Operation to describe an EXPLAIN statement. NOTES: currently, only default 
behavior (EXPLAIN
- * [PLAN FOR] xx) is supported.
- */
+/** Operation to describe an EXPLAIN statement. */
 public class ExplainOperation implements Operation {
     private final Operation child;
+    private final Set<String> explainDetails;
 
     public ExplainOperation(Operation child) {
+        this(child, new HashSet<>());
+    }
+
+    public ExplainOperation(Operation child, Set<String> explainDetails) {
         this.child = child;
+        this.explainDetails = explainDetails;
     }
 
     public Operation getChild() {
@@ -37,10 +43,23 @@ public class ExplainOperation implements Operation {
 
     @Override
     public String asSummaryString() {
+        String operationName = "EXPLAIN";
+        if (!explainDetails.isEmpty()) {
+            operationName =
+                    String.format(
+                            "EXPLAIN %s",
+                            explainDetails.stream()
+                                    .map(String::toUpperCase)
+                                    .collect(Collectors.joining(", ")));
+        }
         return OperationUtils.formatWithChildren(
-                "EXPLAIN",
+                operationName,
                 Collections.emptyMap(),
                 Collections.singletonList(child),
                 Operation::asSummaryString);
     }
+
+    public Set<String> getExplainDetails() {
+        return explainDetails;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 92c6fe2..4dbff13 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -158,7 +158,6 @@ import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.dialect.CalciteSqlDialect;
 import org.apache.calcite.sql.parser.SqlParser;
@@ -946,16 +945,18 @@ public class SqlToOperationConverter {
     private Operation convertRichExplain(SqlRichExplain sqlExplain) {
         Operation operation;
         SqlNode sqlNode = sqlExplain.getStatement();
+        Set<String> explainDetails = sqlExplain.getExplainDetails();
+
         if (sqlNode instanceof RichSqlInsert) {
             operation = convertSqlInsert((RichSqlInsert) sqlNode);
-        } else if (sqlNode instanceof SqlSelect) {
+        } else if (sqlNode.getKind().belongsTo(SqlKind.QUERY)) {
             operation = convertSqlQuery(sqlExplain.getStatement());
         } else {
             throw new ValidationException(
                     String.format(
                             "EXPLAIN statement doesn't support %s", 
sqlNode.getKind().toString()));
         }
-        return new ExplainOperation(operation);
+        return new ExplainOperation(operation, explainDetails);
     }
 
     /** Convert DESCRIBE [EXTENDED] [[catalogName.] 
dataBasesName].sqlIdentifier. */
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 1d1e4ab..7f00ea3 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.operations;
 
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.dql.SqlRichExplain;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.SqlDialect;
@@ -576,6 +577,39 @@ public class SqlToOperationConverterTest {
     }
 
     @Test
+    public void testExplainWithSelect() {
+        final String sql = "explain select * from t1";
+        checkExplainSql(sql);
+    }
+
+    @Test
+    public void testExplainWithInsert() {
+        final String sql = "explain insert into t2 select * from t1";
+        checkExplainSql(sql);
+    }
+
+    @Test
+    public void testExplainWithUnion() {
+        final String sql = "explain select * from t1 union select * from t2";
+        checkExplainSql(sql);
+    }
+
+    @Test
+    public void testExplainWithExplainDetails() {
+        String sql = "explain changelog_mode, estimated_cost, 
json_execution_plan select * from t1";
+        checkExplainSql(sql);
+    }
+
+    private void checkExplainSql(String sql) {
+        final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        SqlNode node = parser.parse(sql);
+        assert node instanceof SqlRichExplain;
+        Operation operation = SqlToOperationConverter.convert(planner, 
catalogManager, node).get();
+        assert operation instanceof ExplainOperation;
+    }
+
+    @Test
     public void testCreateTableWithWatermark()
             throws FunctionAlreadyExistException, DatabaseNotExistException {
         CatalogFunction cf =
diff --git 
a/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsAndUnion.out
 
b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsAndUnion.out
new file mode 100644
index 0000000..f787da2
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsAndUnion.out
@@ -0,0 +1,55 @@
+== Abstract Syntax Tree ==
+LogicalUnion(all=[true])
+:- LogicalProject(a=[$0], b=[$1], c=[$2])
+:  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(a, b, c)]]])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Physical Plan ==
+Union(all=[true], union=[a, b, c], changelogMode=[I]): rowcount = 2.0E8, 
cumulative cost = {4.0E8 rows, 4.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory}
+:- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c], 
changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 
2.4E9 io, 0.0 network, 0.0 memory}
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, 
source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c], 
changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 
2.4E9 io, 0.0 network, 0.0 memory}
+
+== Optimized Execution Plan ==
+Union(all=[true], union=[a, b, c])
+:- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable2, 
source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Custom Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Custom Source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : 
"SourceConversion(table=[default_catalog.default_database.MyTable, source: 
[CollectionTableSource(a, b, c)]], fields=[a, b, c])",
+    "pact" : "Operator",
+    "contents" : 
"SourceConversion(table=[default_catalog.default_database.MyTable, source: 
[CollectionTableSource(a, b, c)]], fields=[a, b, c])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Source: Custom Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Custom Source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : 
"SourceConversion(table=[default_catalog.default_database.MyTable2, source: 
[CollectionTableSource(a, b, c)]], fields=[a, b, c])",
+    "pact" : "Operator",
+    "contents" : 
"SourceConversion(table=[default_catalog.default_database.MyTable2, source: 
[CollectionTableSource(a, b, c)]], fields=[a, b, c])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsInsert.out
 
b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsInsert.out
new file mode 100644
index 0000000..3c2be6d
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsInsert.out
@@ -0,0 +1,70 @@
+== Abstract Syntax Tree ==
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`], 
fields=[d, e])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalFilter(condition=[>($0, 10)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Physical Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, 
e], changelogMode=[NONE]): rowcount = 5.0E7, cumulative cost = {2.0E8 rows, 
1.5E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}
++- Calc(select=[a, b], where=[>(a, 10)], changelogMode=[I]): rowcount = 5.0E7, 
cumulative cost = {1.5E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c], 
changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 
2.4E9 io, 0.0 network, 0.0 memory}
+
+== Optimized Execution Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, b], where=[(a > 10)])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Custom Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Custom Source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : 
"SourceConversion(table=[default_catalog.default_database.MyTable, source: 
[CollectionTableSource(a, b, c)]], fields=[a, b, c])",
+    "pact" : "Operator",
+    "contents" : 
"SourceConversion(table=[default_catalog.default_database.MyTable, source: 
[CollectionTableSource(a, b, c)]], fields=[a, b, c])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc(select=[a, b], where=[(a > 10)])",
+    "pact" : "Operator",
+    "contents" : "Calc(select=[a, b], where=[(a > 10)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "SinkConversionToRow",
+    "pact" : "Operator",
+    "contents" : "SinkConversionToRow",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsSelect.out
 
b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsSelect.out
new file mode 100644
index 0000000..385f2f5
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsSelect.out
@@ -0,0 +1,45 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($0, 10)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Physical Plan ==
+Calc(select=[a, b, c], where=[>(a, 10)], changelogMode=[I]): rowcount = 5.0E7, 
cumulative cost = {1.5E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c], 
changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 
2.4E9 io, 0.0 network, 0.0 memory}
+
+== Optimized Execution Plan ==
+Calc(select=[a, b, c], where=[(a > 10)])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Custom Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Custom Source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : 
"SourceConversion(table=[default_catalog.default_database.MyTable, source: 
[CollectionTableSource(a, b, c)]], fields=[a, b, c])",
+    "pact" : "Operator",
+    "contents" : 
"SourceConversion(table=[default_catalog.default_database.MyTable, source: 
[CollectionTableSource(a, b, c)]], fields=[a, b, c])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc(select=[a, b, c], where=[(a > 10)])",
+    "pact" : "Operator",
+    "contents" : "Calc(select=[a, b, c], where=[(a > 10)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 7409749..7b9f0c8 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -33,7 +33,7 @@ import org.apache.flink.table.module.ModuleEntry
 import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory._
 import org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase.TestUDF
 import 
org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.SimpleScalarFunction
-import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId
+import org.apache.flink.table.planner.utils.TableTestUtil.{replaceStageId, 
replaceStreamNodeId}
 import org.apache.flink.table.planner.utils.{TableTestUtil, 
TestTableSourceSinks}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
@@ -1217,16 +1217,8 @@ class TableEnvironmentTest {
     val tableResult1 = tableEnv.executeSql(createTableStmt)
     assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
 
-    val tableResult2 = tableEnv.executeSql("explain plan for select * from 
MyTable where a > 10")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
-    val it = tableResult2.collect()
-    assertTrue(it.hasNext)
-    val row = it.next()
-    assertEquals(1, row.getArity)
-    val actual = row.getField(0).toString
-    val expected = 
TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainSelect.out")
-    assertEquals(replaceStageId(expected), replaceStageId(actual))
-    assertFalse(it.hasNext)
+    checkExplain("explain plan for select * from MyTable where a > 10",
+      "/explain/testExecuteSqlWithExplainSelect.out")
   }
 
   @Test
@@ -1258,17 +1250,8 @@ class TableEnvironmentTest {
     val tableResult2 = tableEnv.executeSql(createTableStmt2)
     assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
 
-    val tableResult3 = tableEnv.executeSql(
-      "explain plan for insert into MySink select a, b from MyTable where a > 
10")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
-    val it = tableResult3.collect()
-    assertTrue(it.hasNext)
-    val row = it.next()
-    assertEquals(1, row.getArity)
-    val actual = row.getField(0).toString
-    val expected = 
TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainInsert.out")
-    assertEquals(replaceStageId(expected), replaceStageId(actual))
-    assertFalse(it.hasNext)
+    checkExplain("explain plan for insert into MySink select a, b from MyTable 
where a > 10",
+      "/explain/testExecuteSqlWithExplainInsert.out")
   }
 
   @Test
@@ -1354,7 +1337,7 @@ class TableEnvironmentTest {
   }
 
   @Test
-  def testTableExplain(): Unit = {
+  def testExecuteSqlWithExplainDetailsSelect(): Unit = {
     val createTableStmt =
       """
         |CREATE TABLE MyTable (
@@ -1369,10 +1352,81 @@ class TableEnvironmentTest {
     val tableResult1 = tableEnv.executeSql(createTableStmt)
     assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
 
-    val actual = tableEnv.sqlQuery("select * from MyTable where a > 10")
-      .explain(ExplainDetail.CHANGELOG_MODE)
-    val expected = 
TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
-    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    checkExplain(
+      "explain changelog_mode, estimated_cost, json_execution_plan " +
+        "select * from MyTable where a > 10",
+      "/explain/testExecuteSqlWithExplainDetailsSelect.out");
+  }
+
+  @Test
+  def testExecuteSqlWithExplainDetailsAndUnion(): Unit = {
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MyTable2 (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult3 = tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
+
+    checkExplain(
+      "explain changelog_mode, estimated_cost, json_execution_plan " +
+        "select * from MyTable union all select * from MyTable2",
+      "/explain/testExecuteSqlWithExplainDetailsAndUnion.out")
+  }
+
+  @Test
+  def testExecuteSqlWithExplainDetailsInsert(): Unit = {
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    checkExplain(
+      "explain changelog_mode, estimated_cost, json_execution_plan " +
+        "insert into MySink select a, b from MyTable where a > 10",
+      "/explain/testExecuteSqlWithExplainDetailsInsert.out")
   }
 
   private def testUnsupportedExplain(explain: String): Unit = {
@@ -1576,4 +1630,18 @@ class TableEnvironmentTest {
     assertTrue(source.isInstanceOf[CollectionTableSource])
     assertEquals(expectToBeBounded, 
source.asInstanceOf[CollectionTableSource].isBounded)
   }
+
+  private def checkExplain(sql: String, resultPath: String): Unit = {
+    val tableResult2 = tableEnv.executeSql(sql)
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    val it = tableResult2.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = replaceStreamNodeId(row.getField(0).toString.trim)
+    val expected = replaceStreamNodeId(TableTestUtil
+      .readFromResource(resultPath).trim)
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
 }

Reply via email to