This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4959ebfc349bb80f88582f9ef2d60d09f1140737
Author: godfreyhe <godfre...@163.com>
AuthorDate: Fri Apr 24 23:04:33 2020 +0800

    [FLINK-17267] [table] Introduce Table#explain api
    
    This closes #11905
---
 flink-python/pyflink/table/explain_detail.py       |  4 +-
 flink-python/pyflink/table/table.py                | 14 ++++++
 flink-python/pyflink/table/table_environment.py    |  5 +--
 .../{explain_detail.py => tests/test_explain.py}   | 30 ++++++++-----
 .../table/tests/test_table_environment_api.py      |  2 +-
 flink-python/pyflink/util/utils.py                 |  4 +-
 .../org/apache/flink/table/api/ExplainDetail.java  |  4 +-
 .../java/org/apache/flink/table/api/Table.java     | 10 +++++
 .../table/api/internal/TableEnvironmentImpl.java   |  8 +++-
 .../api/internal/TableEnvironmentInternal.java     | 14 ++++++
 .../apache/flink/table/api/internal/TableImpl.java |  6 +++
 .../flink/table/api/internal/TableResultImpl.java  | 51 +++++++++++++++++++---
 .../table/planner/delegation/StreamPlanner.scala   |  2 +-
 .../flink/table/api/TableEnvironmentTest.scala     | 24 +++++++++-
 .../table/api/internal/BatchTableEnvImpl.scala     |  8 ++--
 .../flink/table/api/internal/TableEnvImpl.scala    |  8 ++--
 .../api/batch/BatchTableEnvironmentTest.scala      | 22 ++++++++++
 .../api/stream/StreamTableEnvironmentTest.scala    | 22 ++++++++++
 18 files changed, 202 insertions(+), 36 deletions(-)

diff --git a/flink-python/pyflink/table/explain_detail.py 
b/flink-python/pyflink/table/explain_detail.py
index 48e7ce9..0cbcbe9 100644
--- a/flink-python/pyflink/table/explain_detail.py
+++ b/flink-python/pyflink/table/explain_detail.py
@@ -29,6 +29,6 @@ class ExplainDetail(object):
     # 0.0 memory}
     ESTIMATED_COST = 0
 
-    # The changelog traits produced by a physical rel node.
+    # The changelog mode produced by a physical rel node.
     # e.g. GroupAggregate(..., changelogMode=[I,UA,D])
-    CHANGELOG_TRAITS = 1
+    CHANGELOG_MODE = 1
diff --git a/flink-python/pyflink/table/table.py 
b/flink-python/pyflink/table/table.py
index 728d331..b74797e 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -21,6 +21,7 @@ from pyflink.java_gateway import get_gateway
 from pyflink.table.table_schema import TableSchema
 
 from pyflink.util.utils import to_jarray
+from pyflink.util.utils import to_j_explain_detail_arr
 
 __all__ = ['Table', 'GroupedTable', 'GroupWindowedTable', 'OverWindowedTable', 
'WindowGroupedTable']
 
@@ -718,6 +719,19 @@ class Table(object):
         """
         self._j_table.executeInsert(table_path, overwrite)
 
+    def explain(self, *extra_details):
+        """
+        Returns the AST of this table and the execution plan.
+
+        :param extra_details: The extra explain details which the explain 
result should include,
+                              e.g. estimated cost, changelog mode for streaming
+        :type extra_details: tuple[ExplainDetail] (variable-length arguments 
of ExplainDetail)
+        :return: The statement for which the AST and execution plan will be 
returned.
+        :rtype: str
+        """
+        j_extra_details = to_j_explain_detail_arr(extra_details)
+        return self._j_table.explain(j_extra_details)
+
     def __str__(self):
         return self._j_table.toString()
 
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index 94ff785..91073d8 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -471,13 +471,12 @@ class TableEnvironment(object):
 
     def explain_sql(self, stmt, *extra_details):
         """
-        Returns the AST of the specified statement and the execution plan to 
compute
-        the result of the given statement.
+        Returns the AST of the specified statement and the execution plan.
 
         :param stmt: The statement for which the AST and execution plan will 
be returned.
         :type stmt: str
         :param extra_details: The extra explain details which the explain 
result should include,
-                              e.g. estimated cost, change log trait for 
streaming
+                              e.g. estimated cost, changelog mode for streaming
         :type extra_details: tuple[ExplainDetail] (variable-length arguments 
of ExplainDetail)
         :return: The statement for which the AST and execution plan will be 
returned.
         :rtype: str
diff --git a/flink-python/pyflink/table/explain_detail.py 
b/flink-python/pyflink/table/tests/test_explain.py
similarity index 58%
copy from flink-python/pyflink/table/explain_detail.py
copy to flink-python/pyflink/table/tests/test_explain.py
index 48e7ce9..1dccaba 100644
--- a/flink-python/pyflink/table/explain_detail.py
+++ b/flink-python/pyflink/table/tests/test_explain.py
@@ -16,19 +16,25 @@
 # limitations under the License.
 
################################################################################
 
-__all__ = ['ExplainDetail']
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+from pyflink.table.explain_detail import ExplainDetail
 
 
-class ExplainDetail(object):
-    """
-    ExplainDetail defines the types of details for explain result.
-    """
+class StreamTableExplainTests(PyFlinkStreamTableTestCase):
 
-    # The cost information on physical rel node estimated by optimizer.
-    # e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 
2.4E9 io, 0.0 network,
-    # 0.0 memory}
-    ESTIMATED_COST = 0
+    def test_explain(self):
+        t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
+        result = t.group_by("c").select("a.sum, c as 
b").explain(ExplainDetail.CHANGELOG_MODE)
 
-    # The changelog traits produced by a physical rel node.
-    # e.g. GroupAggregate(..., changelogMode=[I,UA,D])
-    CHANGELOG_TRAITS = 1
+        assert isinstance(result, str)
+
+
+if __name__ == '__main__':
+    import unittest
+
+    try:
+        import xmlrunner
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py 
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index bd279af..96987de 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -265,7 +265,7 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, 
PyFlinkStreamTableTestCa
             source_sink_utils.TestAppendSink(field_names, field_types))
 
         result = t_env.explain_sql(
-            "select a + 1, b, c from %s" % source, 
ExplainDetail.ESTIMATED_COST)
+            "select a + 1, b, c from %s" % source, 
ExplainDetail.CHANGELOG_MODE)
 
         assert isinstance(result, str)
 
diff --git a/flink-python/pyflink/util/utils.py 
b/flink-python/pyflink/util/utils.py
index 29a20da..065b537 100644
--- a/flink-python/pyflink/util/utils.py
+++ b/flink-python/pyflink/util/utils.py
@@ -134,8 +134,8 @@ def to_j_explain_detail_arr(p_extra_details):
     gateway = get_gateway()
 
     def to_j_explain_detail(p_extra_detail):
-        if p_extra_detail == ExplainDetail.CHANGELOG_TRAITS:
-            return 
gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_TRAITS
+        if p_extra_detail == ExplainDetail.CHANGELOG_MODE:
+            return 
gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_MODE
         else:
             return 
gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
index 6e9d014..5dfddc3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
@@ -29,8 +29,8 @@ public enum ExplainDetail {
        ESTIMATED_COST,
 
        /**
-        * The changelog traits produced by a physical rel node.
+        * The changelog mode produced by a physical rel node.
         * e.g. GroupAggregate(..., changelogMode=[I,UA,D])
         */
-       CHANGELOG_TRAITS
+       CHANGELOG_MODE
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 3362ca5..d1239e6 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1473,4 +1473,14 @@ public interface Table {
         * @return The insert operation execution result.
         */
        TableResult executeInsert(String tablePath, boolean overwrite);
+
+       /**
+        * Returns the AST of this table and the execution plan to compute
+        * the result of this table.
+        *
+        * @param extraDetails The extra explain details which the explain 
result should include,
+        *   e.g. estimated cost, change log trait for streaming
+        * @return AST and the execution plan.
+        */
+       String explain(ExplainDetail... extraDetails);
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 610627c..490e416 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -614,6 +614,11 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
        }
 
        @Override
+       public String explainInternal(List<Operation> operations, 
ExplainDetail... extraDetails) {
+               return planner.explain(operations, extraDetails);
+       }
+
+       @Override
        public String[] getCompletionHints(String statement, int position) {
                return planner.getCompletionHints(statement, position);
        }
@@ -873,6 +878,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                                        
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
                                        
.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
                                        
.data(Collections.singletonList(Row.of(explanation)))
+                                       
.setPrintStyle(TableResultImpl.PrintStyle.RAW_CONTENT)
                                        .build();
 
                } else {
@@ -997,7 +1003,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
        protected ExplainDetail[] getExplainDetails(boolean extended) {
                if (extended) {
                        if (isStreamingMode) {
-                               return new ExplainDetail[] { 
ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_TRAITS };
+                               return new ExplainDetail[] { 
ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE };
                        } else {
                                return new ExplainDetail[] { 
ExplainDetail.ESTIMATED_COST };
                        }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
index 2319538..228713c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
@@ -19,11 +19,13 @@
 package org.apache.flink.table.api.internal;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
 
 import java.util.List;
 
@@ -56,4 +58,16 @@ interface TableEnvironmentInternal extends TableEnvironment {
         * @return the affected row counts (-1 means unknown).
         */
        TableResult executeInternal(List<ModifyOperation> operations);
+
+       /**
+        * Returns the AST of this table and the execution plan to compute
+        * the result of this table.
+        *
+        * @param operations The operations to be explained.
+        * @param extraDetails The extra explain details which the explain 
result should include,
+        *   e.g. estimated cost, changelog mode for streaming
+        * @return AST and the execution plan.
+        */
+       String explainInternal(List<Operation> operations, ExplainDetail... 
extraDetails);
+
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 9b2e319..d3a63aa 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.internal;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.AggregatedTable;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.FlatAggregateTable;
 import org.apache.flink.table.api.GroupWindow;
 import org.apache.flink.table.api.GroupWindowedTable;
@@ -565,6 +566,11 @@ public class TableImpl implements Table {
        }
 
        @Override
+       public String explain(ExplainDetail... extraDetails) {
+               return 
tableEnvironment.explainInternal(Collections.singletonList(getQueryOperation()),
 extraDetails);
+       }
+
+       @Override
        public String toString() {
                if (tableName == null) {
                        tableName = "UnnamedTable$" + 
uniqueId.getAndIncrement();
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
index 791ee89..a783976 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.utils.PrintUtils;
@@ -51,16 +52,19 @@ class TableResultImpl implements TableResult {
        private final TableSchema tableSchema;
        private final ResultKind resultKind;
        private final Iterator<Row> data;
+       private final PrintStyle printStyle;
 
        private TableResultImpl(
                        @Nullable JobClient jobClient,
                        TableSchema tableSchema,
                        ResultKind resultKind,
-                       Iterator<Row> data) {
+                       Iterator<Row> data,
+                       PrintStyle printStyle) {
                this.jobClient = jobClient;
                this.tableSchema = Preconditions.checkNotNull(tableSchema, 
"tableSchema should not be null");
                this.resultKind = Preconditions.checkNotNull(resultKind, 
"resultKind should not be null");
                this.data = Preconditions.checkNotNull(data, "data should not 
be null");
+               this.printStyle = Preconditions.checkNotNull(printStyle, 
"printStyle should not be null");
        }
 
        @Override
@@ -86,7 +90,18 @@ class TableResultImpl implements TableResult {
        @Override
        public void print() {
                Iterator<Row> it = collect();
-               PrintUtils.printAsTableauForm(getTableSchema(), it, new 
PrintWriter(System.out));
+               switch (printStyle) {
+                       case TABLEAU:
+                               PrintUtils.printAsTableauForm(getTableSchema(), 
it, new PrintWriter(System.out));
+                               break;
+                       case RAW_CONTENT:
+                               while (it.hasNext()) {
+                                       System.out.println(String.join(",", 
PrintUtils.rowToString(it.next())));
+                               }
+                               break;
+                       default:
+                               throw new TableException("Unsupported print 
style: " + printStyle);
+               }
        }
 
        public static Builder builder() {
@@ -101,6 +116,7 @@ class TableResultImpl implements TableResult {
                private TableSchema tableSchema = null;
                private ResultKind resultKind = null;
                private Iterator<Row> data = null;
+               private PrintStyle printStyle = PrintStyle.TABLEAU;
 
                private Builder() {
                }
@@ -138,7 +154,7 @@ class TableResultImpl implements TableResult {
                }
 
                /**
-                * Specifies an row iterator as the execution result .
+                * Specifies an row iterator as the execution result.
                 *
                 * @param rowIterator a row iterator as the execution result.
                 */
@@ -149,7 +165,7 @@ class TableResultImpl implements TableResult {
                }
 
                /**
-                * Specifies an row list as the execution result .
+                * Specifies an row list as the execution result.
                 *
                 * @param rowList a row list as the execution result.
                 */
@@ -160,11 +176,36 @@ class TableResultImpl implements TableResult {
                }
 
                /**
+                * Specifies print style. Default is {@link PrintStyle#TABLEAU}.
+                */
+               public Builder setPrintStyle(PrintStyle printStyle) {
+                       Preconditions.checkNotNull(printStyle, "printStyle 
should not be null");
+                       this.printStyle = printStyle;
+                       return this;
+               }
+
+               /**
                 * Returns a {@link TableResult} instance.
                 */
                public TableResult build() {
-                       return new TableResultImpl(jobClient, tableSchema, 
resultKind, data);
+                       return new TableResultImpl(jobClient, tableSchema, 
resultKind, data, printStyle);
                }
        }
 
+       /**
+        * PrintStyle defines the styles of printing.
+        */
+       public enum PrintStyle {
+               /**
+                * print the result schema and content as tableau form.
+                */
+               TABLEAU,
+
+               /**
+                * only print the result content as raw form.
+                * column delimiter is ",", row delimiter is "\n".
+                */
+               RAW_CONTENT
+       }
+
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 959de06..5dc6a6f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -114,7 +114,7 @@ class StreamPlanner(
     } else {
       SqlExplainLevel.DIGEST_ATTRIBUTES
     }
-    val withChangelogTraits = 
extraDetails.contains(ExplainDetail.CHANGELOG_TRAITS)
+    val withChangelogTraits = 
extraDetails.contains(ExplainDetail.CHANGELOG_MODE)
     sb.append(ExecNodePlanDumper.dagToString(
       execNodes,
       explainLevel,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 0e197ba..aa6bd2e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -911,7 +911,7 @@ class TableEnvironmentTest {
     assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
 
     val actual = tableEnv.explainSql(
-      "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_TRAITS)
+      "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_MODE)
     val expected = 
TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
     assertEquals(replaceStageId(expected), replaceStageId(actual))
   }
@@ -951,6 +951,28 @@ class TableEnvironmentTest {
     assertEquals(replaceStageId(expected), replaceStageId(actual))
   }
 
+  @Test
+  def testTableExplain(): Unit = {
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = tableEnv.sqlQuery("select * from MyTable where a > 10")
+      .explain(ExplainDetail.CHANGELOG_MODE)
+    val expected = 
TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def testUnsupportedExplain(explain: String): Unit = {
     try {
       tableEnv.executeSql(explain)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index b3caf20..7ba116d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -217,7 +217,7 @@ abstract class BatchTableEnvImpl(
     * @param extended Flag to include detailed optimizer estimates.
     */
   private[flink] def explain(table: Table, extended: Boolean): String = {
-    explain(
+    explainInternal(
       
JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]),
       getExplainDetails(extended): _*)
   }
@@ -225,12 +225,14 @@ abstract class BatchTableEnvImpl(
   override def explain(table: Table): String = explain(table: Table, extended 
= false)
 
   override def explain(extended: Boolean): String = {
-    explain(
+    explainInternal(
       bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava,
       getExplainDetails(extended): _*)
   }
 
-  protected def explain(operations: JList[Operation], extraDetails: 
ExplainDetail*): String = {
+  protected override def explainInternal(
+      operations: JList[Operation],
+      extraDetails: ExplainDetail*): String = {
     require(operations.asScala.nonEmpty, "operations should not be empty")
     val astList = operations.asScala.map {
       case queryOperation: QueryOperation =>
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 1f01186..4c6cbd4 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.operators.DataSink
 import org.apache.flink.core.execution.JobClient
 import org.apache.flink.table.api._
+import org.apache.flink.table.api.internal.TableResultImpl.PrintStyle
 import org.apache.flink.table.calcite.{CalciteParser, FlinkPlannerImpl, 
FlinkRelBuilder}
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.catalog.exceptions.{TableNotExistException => _, 
_}
@@ -762,11 +763,12 @@ abstract class TableEnvImpl(
       case _: ShowViewsOperation =>
         buildShowResult(listViews())
       case explainOperation: ExplainOperation =>
-        val explanation = 
explain(JCollections.singletonList(explainOperation.getChild))
+        val explanation = 
explainInternal(JCollections.singletonList(explainOperation.getChild))
         TableResultImpl.builder.
           resultKind(ResultKind.SUCCESS_WITH_CONTENT)
           .tableSchema(TableSchema.builder.field("result", 
DataTypes.STRING).build)
           .data(JCollections.singletonList(Row.of(explanation)))
+          .setPrintStyle(PrintStyle.RAW_CONTENT)
           .build
 
       case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
@@ -1142,10 +1144,10 @@ abstract class TableEnvImpl(
         "Unsupported SQL query! explainSql() only accepts a single SQL query.")
     }
 
-    explain(operations, extraDetails: _*)
+    explainInternal(operations, extraDetails: _*)
   }
 
-  protected def explain(operations: JList[Operation], extraDetails: 
ExplainDetail*): String
+  protected def explainInternal(operations: JList[Operation], extraDetails: 
ExplainDetail*): String
 
   override def fromValues(values: Expression*): Table = {
     createTable(operationTreeBuilder.values(values: _*))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index c928314..74d820e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -504,6 +504,28 @@ class BatchTableEnvironmentTest extends TableTestBase {
     assertEquals(replaceStageId(expected), replaceStageId(actual))
   }
 
+  @Test
+  def testTableExplain(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = util.tableEnv.sqlQuery("select * from MyTable where a > 
10").explain()
+    val expected = readFromResource("testExplainSqlWithSelect1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: 
String): Unit = {
     try {
       tableEnv.executeSql(explain)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 25bb536..bb710c6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -384,6 +384,28 @@ class StreamTableEnvironmentTest extends TableTestBase {
     assertEquals(replaceStageId(expected), replaceStageId(actual))
   }
 
+  @Test
+  def testTableExplain(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = util.tableEnv.sqlQuery("select * from MyTable where a > 
10").explain()
+    val expected = readFromResource("testExplainSqlWithSelect0.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def prepareSchemaExpressionParser:
     (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = 
{
 

Reply via email to