This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c5c6b76  [FLINK-17112][table] Support DESCRIBE statement in Flink SQL
c5c6b76 is described below

commit c5c6b7612d0b5492ec4db8233709bf0a1093a1fe
Author: Zhenghua Gao <doc...@gmail.com>
AuthorDate: Tue May 12 20:41:16 2020 +0800

    [FLINK-17112][table] Support DESCRIBE statement in Flink SQL
    
    This closes #11892
---
 .../table/api/internal/TableEnvironmentImpl.java   |  69 +++++++++++-
 .../flink/table/api/internal/TableResultImpl.java  |  22 ++--
 .../table/operations/DescribeTableOperation.java   |  59 +++++++++++
 .../org/apache/flink/table/utils/PrintUtils.java   |  15 ++-
 .../operations/SqlToOperationConverter.java        |  12 +++
 .../table/planner/calcite/FlinkPlannerImpl.scala   |   7 +-
 .../flink/table/api/TableEnvironmentTest.scala     | 116 ++++++++++++++++++++-
 .../table/sqlexec/SqlToOperationConverter.java     |  12 +++
 .../flink/table/api/internal/TableEnvImpl.scala    |  60 ++++++++++-
 .../flink/table/calcite/FlinkPlannerImpl.scala     |   7 +-
 .../api/batch/BatchTableEnvironmentTest.scala      |  28 +++++
 11 files changed, 382 insertions(+), 25 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 23752c3..de7bd96 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.WatermarkSpec;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogFunction;
@@ -76,6 +77,7 @@ import org.apache.flink.table.module.Module;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
@@ -112,13 +114,17 @@ import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceValidation;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.utils.PrintUtils;
 import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.types.Row;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -156,7 +162,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                        "CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE 
DATABASE, DROP DATABASE, ALTER DATABASE, " +
                        "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE 
CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, " +
                        "SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW 
FUNCTIONS, CREATE VIEW, DROP VIEW, SHOW VIEWS, " +
-                       "INSERT.";
+                       "INSERT, DESCRIBE.";
 
        /**
         * Provides necessary methods for {@link ConnectTableDescriptor}.
@@ -712,7 +718,8 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                                        
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
                                        .tableSchema(tableSchema)
                                        .data(tableSink.getResultIterator())
-                                       
.setPrintStyle(TableResultImpl.PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH))
+                                       
.setPrintStyle(TableResultImpl.PrintStyle.tableau(
+                                                       
PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN))
                                        .build();
                } catch (Exception e) {
                        throw new TableException("Failed to execute sql", e);
@@ -966,6 +973,17 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                                        
.data(Collections.singletonList(Row.of(explanation)))
                                        
.setPrintStyle(TableResultImpl.PrintStyle.rawContent())
                                        .build();
+               } else if (operation instanceof DescribeTableOperation) {
+                       DescribeTableOperation describeTableOperation = 
(DescribeTableOperation) operation;
+                       Optional<CatalogManager.TableLookupResult> result =
+                                       
catalogManager.getTable(describeTableOperation.getSqlIdentifier());
+                       if (result.isPresent()) {
+                               return 
buildDescribeResult(result.get().getTable().getSchema());
+                       } else {
+                               throw new ValidationException(String.format(
+                                               "Tables or views with the 
identifier '%s' doesn't exist",
+                                               
describeTableOperation.getSqlIdentifier().asSummaryString()));
+                       }
                } else if (operation instanceof QueryOperation) {
                        return executeInternal((QueryOperation) operation);
                } else {
@@ -974,10 +992,53 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
        }
 
        private TableResult buildShowResult(String[] objects) {
+               return buildResult(
+                       new String[]{"result"},
+                       new DataType[]{DataTypes.STRING()},
+                       Arrays.stream(objects).map((c) -> new 
String[]{c}).toArray(String[][]::new));
+       }
+
+       private TableResult buildDescribeResult(TableSchema schema) {
+               Map<String, String> fieldToWatermark =
+                               schema.getWatermarkSpecs()
+                                               .stream()
+                                               
.collect(Collectors.toMap(WatermarkSpec::getRowtimeAttribute, 
WatermarkSpec::getWatermarkExpr));
+
+               Map<String, String> fieldToPrimaryKey = new HashMap<>();
+               schema.getPrimaryKey().ifPresent((p) -> {
+                       List<String> columns = p.getColumns();
+                       columns.forEach((c) -> fieldToPrimaryKey.put(c, 
String.format("PRI(%s)", String.join(", ", columns))));
+               });
+
+               Object[][] rows =
+                       schema.getTableColumns()
+                               .stream()
+                               .map((c) -> {
+                                       LogicalType logicalType = 
c.getType().getLogicalType();
+                                       return new Object[]{
+                                               c.getName(),
+                                               
StringUtils.removeEnd(logicalType.toString(), " NOT NULL"),
+                                               logicalType.isNullable(),
+                                               
fieldToPrimaryKey.getOrDefault(c.getName(), null),
+                                               c.getExpr().orElse(null),
+                                               
fieldToWatermark.getOrDefault(c.getName(), null)};
+                               }).toArray(Object[][]::new);
+
+               return buildResult(
+                       new String[]{"name", "type", "null", "key", "computed 
column", "watermark"},
+                       new DataType[]{DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.BOOLEAN(), DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING()},
+                       rows);
+       }
+
+       private TableResult buildResult(String[] headers, DataType[] types, 
Object[][] rows) {
                return TableResultImpl.builder()
                                .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-                               
.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
-                               
.data(Arrays.stream(objects).map(Row::of).collect(Collectors.toList()))
+                               .tableSchema(
+                                       TableSchema.builder().fields(
+                                               headers,
+                                               types).build())
+                               
.data(Arrays.stream(rows).map(Row::of).collect(Collectors.toList()))
+                               
.setPrintStyle(TableResultImpl.PrintStyle.tableau(Integer.MAX_VALUE, ""))
                                .build();
        }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
index d04bb2e..5c82f5e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
@@ -92,7 +92,9 @@ class TableResultImpl implements TableResult {
                Iterator<Row> it = collect();
                if (printStyle instanceof TableauStyle) {
                        int maxColumnWidth = ((TableauStyle) 
printStyle).getMaxColumnWidth();
-                       PrintUtils.printAsTableauForm(getTableSchema(), it, new 
PrintWriter(System.out), maxColumnWidth);
+                       String nullColumn = ((TableauStyle) 
printStyle).getNullColumn();
+                       PrintUtils.printAsTableauForm(
+                                       getTableSchema(), it, new 
PrintWriter(System.out), maxColumnWidth, nullColumn);
                } else if (printStyle instanceof RawContentStyle) {
                        while (it.hasNext()) {
                                System.out.println(String.join(",", 
PrintUtils.rowToString(it.next())));
@@ -114,7 +116,7 @@ class TableResultImpl implements TableResult {
                private TableSchema tableSchema = null;
                private ResultKind resultKind = null;
                private Iterator<Row> data = null;
-               private PrintStyle printStyle = 
PrintStyle.tableau(Integer.MAX_VALUE);
+               private PrintStyle printStyle = 
PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN);
 
                private Builder() {
                }
@@ -195,12 +197,13 @@ class TableResultImpl implements TableResult {
         */
        public interface PrintStyle {
                /**
-                * Create a tableau print style with given max column width,
+                * Create a tableau print style with given max column width and 
null column,
                 * which prints the result schema and content as tableau form.
                 */
-               static PrintStyle tableau(int maxColumnWidth) {
+               static PrintStyle tableau(int maxColumnWidth, String 
nullColumn) {
                        Preconditions.checkArgument(maxColumnWidth > 0, 
"maxColumnWidth should be greater than 0");
-                       return new TableauStyle(maxColumnWidth);
+                       Preconditions.checkNotNull(nullColumn, "nullColumn 
should not be null");
+                       return new TableauStyle(maxColumnWidth, nullColumn);
                }
 
                /**
@@ -217,15 +220,22 @@ class TableResultImpl implements TableResult {
         * print the result schema and content as tableau form.
         */
        private static final class TableauStyle implements PrintStyle {
+
                private final int maxColumnWidth;
+               private final String nullColumn;
 
-               private TableauStyle(int maxColumnWidth) {
+               private TableauStyle(int maxColumnWidth, String nullColumn) {
                        this.maxColumnWidth = maxColumnWidth;
+                       this.nullColumn = nullColumn;
                }
 
                int getMaxColumnWidth() {
                        return maxColumnWidth;
                }
+
+               String getNullColumn() {
+                       return nullColumn;
+               }
        }
 
        /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java
new file mode 100644
index 0000000..ecd8bc7
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] 
dataBasesName].sqlIdentifier statement.
+ */
+public class DescribeTableOperation implements Operation {
+
+       private final ObjectIdentifier sqlIdentifier;
+       private final boolean isExtended;
+
+       public DescribeTableOperation(ObjectIdentifier sqlIdentifier, boolean 
isExtended) {
+               this.sqlIdentifier = sqlIdentifier;
+               this.isExtended = isExtended;
+       }
+
+       public ObjectIdentifier getSqlIdentifier() {
+               return sqlIdentifier;
+       }
+
+       public boolean isExtended() {
+               return isExtended;
+       }
+
+       @Override
+       public String asSummaryString() {
+               Map<String, Object> params = new LinkedHashMap<>();
+               params.put("identifier", sqlIdentifier);
+               params.put("isExtended", isExtended);
+               return OperationUtils.formatWithChildren(
+                       "DESCRIBE",
+                       params,
+                       Collections.emptyList(),
+                       Operation::asSummaryString);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
index 4de488e..7916fa1 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
@@ -40,7 +40,7 @@ public class PrintUtils {
 
        // constants for printing
        public static final int MAX_COLUMN_WIDTH = 30;
-       private static final String NULL_COLUMN = "(NULL)";
+       public static final String NULL_COLUMN = "(NULL)";
        private static final String COLUMN_TRUNCATED_FLAG = "...";
 
        private PrintUtils() {
@@ -65,7 +65,7 @@ public class PrintUtils {
                        TableSchema tableSchema,
                        Iterator<Row> it,
                        PrintWriter printWriter) {
-               printAsTableauForm(tableSchema, it, printWriter, 
MAX_COLUMN_WIDTH);
+               printAsTableauForm(tableSchema, it, printWriter, 
MAX_COLUMN_WIDTH, NULL_COLUMN);
        }
 
        /**
@@ -87,14 +87,15 @@ public class PrintUtils {
                        TableSchema tableSchema,
                        Iterator<Row> it,
                        PrintWriter printWriter,
-                       int maxColumnWidth) {
+                       int maxColumnWidth,
+                       String nullColumn) {
                List<String[]> rows = new ArrayList<>();
 
                // fill field names first
                List<TableColumn> columns = tableSchema.getTableColumns();
                
rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new));
                while (it.hasNext()) {
-                       rows.add(rowToString(it.next()));
+                       rows.add(rowToString(it.next(), nullColumn));
                }
 
                int[] colWidths = columnWidthsByContent(columns, rows, 
maxColumnWidth);
@@ -123,11 +124,15 @@ public class PrintUtils {
        }
 
        public static String[] rowToString(Row row) {
+               return rowToString(row, NULL_COLUMN);
+       }
+
+       public static String[] rowToString(Row row, String nullColumn) {
                final String[] fields = new String[row.getArity()];
                for (int i = 0; i < row.getArity(); i++) {
                        final Object field = row.getField(i);
                        if (field == null) {
-                               fields[i] = NULL_COLUMN;
+                               fields[i] = nullColumn;
                        } else {
                                fields[i] = 
StringUtils.arrayAwareToString(field);
                        }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 49138d1..11dece9 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -39,6 +39,7 @@ import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
 import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
 import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 import org.apache.flink.sql.parser.dml.RichSqlInsert;
+import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
 import org.apache.flink.sql.parser.dql.SqlShowCatalogs;
 import org.apache.flink.sql.parser.dql.SqlShowDatabases;
 import org.apache.flink.sql.parser.dql.SqlShowFunctions;
@@ -64,6 +65,7 @@ import 
org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ShowCatalogsOperation;
@@ -205,6 +207,8 @@ public class SqlToOperationConverter {
                        return 
Optional.of(converter.convertShowViews((SqlShowViews) validated));
                } else if (validated instanceof SqlExplain) {
                        return 
Optional.of(converter.convertExplain((SqlExplain) validated));
+               } else if (validated instanceof SqlRichDescribeTable) {
+                       return 
Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated));
                } else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
                        return 
Optional.of(converter.convertSqlQuery(validated));
                } else {
@@ -610,6 +614,14 @@ public class SqlToOperationConverter {
                return new ExplainOperation(operation);
        }
 
+       /** Convert DESCRIBE [EXTENDED] [[catalogName.] 
dataBasesName].sqlIdentifier. */
+       private Operation convertDescribeTable(SqlRichDescribeTable 
sqlRichDescribeTable) {
+               UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(sqlRichDescribeTable.fullTableName());
+               ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+               return new DescribeTableOperation(identifier, 
sqlRichDescribeTable.isExtended());
+       }
+
        /** Fallback method for sql query. */
        private Operation convertSqlQuery(SqlNode node) {
                return toQueryOperation(flinkPlanner, node);
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 846f536..b38fa92 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -19,9 +19,10 @@
 package org.apache.flink.table.planner.calcite
 
 import org.apache.flink.sql.parser.ExtendedSqlNode
-import org.apache.flink.sql.parser.dql.{SqlShowCatalogs, SqlShowDatabases, 
SqlShowFunctions, SqlShowTables, SqlShowViews}
+import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, 
SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews}
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader
+
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.config.NullCollation
 import org.apache.calcite.plan._
@@ -33,6 +34,7 @@ import org.apache.calcite.sql.advise.{SqlAdvisor, 
SqlAdvisorValidator}
 import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
 import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
+
 import java.lang.{Boolean => JBoolean}
 import java.util
 import java.util.function.{Function => JFunction}
@@ -126,7 +128,8 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowDatabases]
         || sqlNode.isInstanceOf[SqlShowTables]
         || sqlNode.isInstanceOf[SqlShowFunctions]
-        || sqlNode.isInstanceOf[SqlShowViews]) {
+        || sqlNode.isInstanceOf[SqlShowViews]
+        || sqlNode.isInstanceOf[SqlRichDescribeTable]) {
         return sqlNode
       }
       sqlNode match {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 93177a9..a890d7c 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -728,9 +728,11 @@ class TableEnvironmentTest {
     val sourceDDL =
       """
         |CREATE TABLE T1(
-        |  a int,
+        |  a int not null,
         |  b varchar,
-        |  c int
+        |  c int,
+        |  ts AS to_timestamp(b),
+        |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
         |) with (
         |  'connector' = 'COLLECTION'
         |)
@@ -975,6 +977,116 @@ class TableEnvironmentTest {
     }
   }
 
+  @Test
+  def testDescribeTableOrView(): Unit = {
+    val sourceDDL =
+      """
+        |CREATE TABLE T1(
+        |  f0 char(10),
+        |  f1 varchar(10),
+        |  f2 string,
+        |  f3 BOOLEAN,
+        |  f4 BINARY(10),
+        |  f5 VARBINARY(10),
+        |  f6 BYTES,
+        |  f7 DECIMAL(10, 3),
+        |  f8 TINYINT,
+        |  f9 SMALLINT,
+        |  f10 INTEGER,
+        |  f11 BIGINT,
+        |  f12 FLOAT,
+        |  f13 DOUBLE,
+        |  f14 DATE,
+        |  f15 TIME,
+        |  f16 TIMESTAMP,
+        |  f17 TIMESTAMP(3),
+        |  f18 TIMESTAMP WITHOUT TIME ZONE,
+        |  f19 TIMESTAMP(3) WITH LOCAL TIME ZONE,
+        |  f20 TIMESTAMP WITH LOCAL TIME ZONE,
+        |  f21 ARRAY<INT>,
+        |  f22 MAP<INT, STRING>,
+        |  f23 ROW<f0 INT, f1 STRING>,
+        |  f24 int not null,
+        |  f25 varchar not null,
+        |  f26 row<f0 int not null, f1 int> not null,
+        |  ts AS to_timestamp(f25),
+        |  PRIMARY KEY(f24, f26) NOT ENFORCED,
+        |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+
+    val viewDDL =
+      """
+        |CREATE VIEW IF NOT EXISTS T2(d, e, f) AS SELECT f24, f25, f26 FROM T1
+      """.stripMargin
+    tableEnv.executeSql(sourceDDL)
+    tableEnv.executeSql(viewDDL)
+
+    val tableResult1 = tableEnv.executeSql("describe T1")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult1.getResultKind)
+    checkData(
+      util.Arrays.asList(
+        Row.of("f0", "CHAR(10)", Boolean.box(true), null, null, null),
+        Row.of("f1", "VARCHAR(10)", Boolean.box(true), null, null, null),
+        Row.of("f2", "STRING", Boolean.box(true), null, null, null),
+        Row.of("f3", "BOOLEAN", Boolean.box(true), null, null, null),
+        Row.of("f4", "BINARY(10)", Boolean.box(true), null, null, null),
+        Row.of("f5", "VARBINARY(10)", Boolean.box(true), null, null, null),
+        Row.of("f6", "BYTES", Boolean.box(true), null, null, null),
+        Row.of("f7", "DECIMAL(10, 3)", Boolean.box(true), null, null, null),
+        Row.of("f8", "TINYINT", Boolean.box(true), null, null, null),
+        Row.of("f9", "SMALLINT", Boolean.box(true), null, null, null),
+        Row.of("f10", "INT", Boolean.box(true), null, null, null),
+        Row.of("f11", "BIGINT", Boolean.box(true), null, null, null),
+        Row.of("f12", "FLOAT", Boolean.box(true), null, null, null),
+        Row.of("f13", "DOUBLE", Boolean.box(true), null, null, null),
+        Row.of("f14", "DATE", Boolean.box(true), null, null, null),
+        Row.of("f15", "TIME(0)", Boolean.box(true), null, null, null),
+        Row.of("f16", "TIMESTAMP(6)", Boolean.box(true), null, null, null),
+        Row.of("f17", "TIMESTAMP(3)", Boolean.box(true), null, null, null),
+        Row.of("f18", "TIMESTAMP(6)", Boolean.box(true), null, null, null),
+        Row.of("f19", "TIMESTAMP(3) WITH LOCAL TIME ZONE", Boolean.box(true), 
null, null, null),
+        Row.of("f20", "TIMESTAMP(6) WITH LOCAL TIME ZONE", Boolean.box(true), 
null, null, null),
+        Row.of("f21", "ARRAY<INT>", Boolean.box(true), null, null, null),
+        Row.of("f22", "MAP<INT, STRING>", Boolean.box(true), null, null, null),
+        Row.of("f23", "ROW<`f0` INT, `f1` STRING>", Boolean.box(true), null, 
null, null),
+        Row.of("f24", "INT", Boolean.box(false), "PRI(f24, f26)", null, null),
+        Row.of("f25", "STRING", Boolean.box(false), null, null, null),
+        Row.of("f26", "ROW<`f0` INT NOT NULL, `f1` INT>", Boolean.box(false),
+          "PRI(f24, f26)", null, null),
+        Row.of("ts", "TIMESTAMP(3)", Boolean.box(true), null, 
"TO_TIMESTAMP(`f25`)",
+          "`ts` - INTERVAL '1' SECOND")
+      ).iterator(),
+      tableResult1.collect())
+
+    val tableResult2 = tableEnv.executeSql("describe T2")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    checkData(
+      util.Arrays.asList(
+        Row.of("d", "INT", Boolean.box(false), null, null, null),
+        Row.of("e", "STRING", Boolean.box(false), null, null, null),
+        Row.of("f", "ROW<`f0` INT NOT NULL, `f1` INT>", Boolean.box(false), 
null, null, null)
+      ).iterator(),
+      tableResult2.collect())
+
+    // temporary view T2(x, y) masks permanent view T2(d, e, f)
+    val temporaryViewDDL =
+      """
+        |CREATE TEMPORARY VIEW IF NOT EXISTS T2(x, y) AS SELECT f24, f25 FROM 
T1
+      """.stripMargin
+    tableEnv.executeSql(temporaryViewDDL)
+
+    val tableResult3 = tableEnv.executeSql("describe T2")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+    checkData(
+      util.Arrays.asList(
+        Row.of("x", "INT", Boolean.box(false), null, null, null),
+        Row.of("y", "STRING", Boolean.box(false), null, null, 
null)).iterator(),
+      tableResult3.collect())
+  }
+
   private def checkData(expected: util.Iterator[Row], actual: 
util.Iterator[Row]): Unit = {
     while (expected.hasNext && actual.hasNext) {
       assertEquals(expected.next(), actual.next())
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index a465f56..a0239c4 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -36,6 +36,7 @@ import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
 import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
 import org.apache.flink.sql.parser.dml.RichSqlInsert;
+import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
 import org.apache.flink.sql.parser.dql.SqlShowCatalogs;
 import org.apache.flink.sql.parser.dql.SqlShowDatabases;
 import org.apache.flink.sql.parser.dql.SqlShowFunctions;
@@ -61,6 +62,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.PlannerQueryOperation;
@@ -193,6 +195,8 @@ public class SqlToOperationConverter {
                        return 
Optional.of(converter.convertShowViews((SqlShowViews) validated));
                } else if (validated instanceof SqlExplain) {
                        return 
Optional.of(converter.convertExplain((SqlExplain) validated));
+               } else if (validated instanceof SqlRichDescribeTable) {
+                       return 
Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated));
                } else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
                        return 
Optional.of(converter.convertSqlQuery(validated));
                } else {
@@ -575,6 +579,14 @@ public class SqlToOperationConverter {
                return new ExplainOperation(operation);
        }
 
+       /** Convert DESCRIBE [EXTENDED] [[catalogName.] 
dataBasesName].sqlIdentifier. */
+       private Operation convertDescribeTable(SqlRichDescribeTable 
sqlRichDescribeTable) {
+               UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(sqlRichDescribeTable.fullTableName());
+               ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+               return new DescribeTableOperation(identifier, 
sqlRichDescribeTable.isExtended());
+       }
+
        /**
         * Create a table schema from {@link SqlCreateTable}. This schema 
contains computed column
         * fields, say, we have a create table DDL statement:
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index ab89dda..46fe90e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -49,6 +49,8 @@ import 
org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.tools.FrameworkConfig
 
+import org.apache.commons.lang3.StringUtils
+
 import _root_.java.lang.{Iterable => JIterable, Long => JLong}
 import _root_.java.util.function.{Function => JFunction, Supplier => JSupplier}
 import _root_.java.util.{Optional, Collections => JCollections, HashMap => 
JHashMap, List => JList, Map => JMap}
@@ -140,7 +142,7 @@ abstract class TableEnvImpl(
       "CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, 
ALTER DATABASE, " +
       "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, USE CATALOG, USE 
[CATALOG.]DATABASE, " +
       "SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, CREATE 
VIEW, DROP VIEW, " +
-      "SHOW VIEWS, INSERT."
+      "SHOW VIEWS, INSERT, DESCRIBE."
 
   private def isStreamingMode: Boolean = this match {
     case _: BatchTableEnvImpl => false
@@ -626,7 +628,7 @@ abstract class TableEnvImpl(
         .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
         .tableSchema(tableSchema)
         .data(tableSink.getResultIterator)
-        .setPrintStyle(PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH))
+        .setPrintStyle(PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH, 
PrintUtils.NULL_COLUMN))
         .build
     } catch {
       case e: Exception =>
@@ -809,6 +811,15 @@ abstract class TableEnvImpl(
           .data(JCollections.singletonList(Row.of(explanation)))
           .setPrintStyle(PrintStyle.rawContent())
           .build
+      case descOperation: DescribeTableOperation =>
+        val result = catalogManager.getTable(descOperation.getSqlIdentifier)
+        if (result.isPresent) {
+          buildDescribeResult(result.get.getTable.getSchema)
+        } else {
+          throw new ValidationException(String.format(
+            "Table or view with identifier '%s' doesn't exist",
+            descOperation.getSqlIdentifier.asSummaryString()))
+        }
       case queryOperation: QueryOperation =>
         executeInternal(queryOperation)
 
@@ -818,10 +829,51 @@ abstract class TableEnvImpl(
   }
 
   private def buildShowResult(objects: Array[String]): TableResult = {
+    val rows = Array.ofDim[Object](objects.length, 1)
+    objects.zipWithIndex.foreach {
+      case (obj, i) => rows(i)(0) = obj
+    }
+    buildResult(Array("result"), Array(DataTypes.STRING), rows)
+  }
+
+  private def buildDescribeResult(schema: TableSchema): TableResult = {
+    val fieldToWatermark =
+      schema
+        .getWatermarkSpecs
+        .map(w => (w.getRowtimeAttribute, w.getWatermarkExpr)).toMap
+    val fieldToPrimaryKey = new JHashMap[String, String]()
+    if (schema.getPrimaryKey.isPresent) {
+      val columns = schema.getPrimaryKey.get.getColumns.asScala
+      columns.foreach(c => fieldToPrimaryKey.put(c, 
s"PRI(${columns.mkString(", ")})"))
+    }
+    val data = Array.ofDim[Object](schema.getFieldCount, 6)
+    schema.getTableColumns.asScala.zipWithIndex.foreach {
+      case (c, i) => {
+        val logicalType = c.getType.getLogicalType
+        data(i)(0) = c.getName
+        data(i)(1) = StringUtils.removeEnd(logicalType.toString, " NOT NULL")
+        data(i)(2) = Boolean.box(logicalType.isNullable)
+        data(i)(3) = fieldToPrimaryKey.getOrDefault(c.getName, null)
+        data(i)(4) = c.getExpr.orElse(null)
+        data(i)(5) = fieldToWatermark.getOrDefault(c.getName, null)
+      }
+    }
+    buildResult(
+      Array("name", "type", "null", "key", "compute column", "watermark"),
+      Array(DataTypes.STRING, DataTypes.STRING, DataTypes.BOOLEAN, 
DataTypes.STRING,
+        DataTypes.STRING, DataTypes.STRING),
+      data)
+  }
+
+  private def buildResult(
+      headers: Array[String],
+      types: Array[DataType],
+      rows: Array[Array[Object]]): TableResult = {
     TableResultImpl.builder()
       .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-      .tableSchema(TableSchema.builder().field("result", 
DataTypes.STRING()).build())
-      .data(objects.map(Row.of(_)).toList)
+      .tableSchema(
+        TableSchema.builder().fields(headers, types).build())
+      .data(rows.map(Row.of(_:_*)).toList)
       .build()
   }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index 3d7dae4..4ac4d09 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -19,9 +19,10 @@
 package org.apache.flink.table.calcite
 
 import org.apache.flink.sql.parser.ExtendedSqlNode
-import org.apache.flink.sql.parser.dql.{SqlShowCatalogs, SqlShowDatabases, 
SqlShowFunctions, SqlShowTables, SqlShowViews}
+import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, 
SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews}
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.catalog.CatalogReader
+
 import org.apache.calcite.plan.RelOptTable.ViewExpander
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelRoot
@@ -31,6 +32,7 @@ import org.apache.calcite.sql.advise.{SqlAdvisor, 
SqlAdvisorValidator}
 import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
 import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
+
 import _root_.java.lang.{Boolean => JBoolean}
 import _root_.java.util
 import _root_.java.util.function.{Function => JFunction}
@@ -124,7 +126,8 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowDatabases]
         || sqlNode.isInstanceOf[SqlShowTables]
         || sqlNode.isInstanceOf[SqlShowFunctions]
-        || sqlNode.isInstanceOf[SqlShowViews]) {
+        || sqlNode.isInstanceOf[SqlShowViews]
+        || sqlNode.isInstanceOf[SqlRichDescribeTable]) {
         return sqlNode
       }
       sqlNode match {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index dc6ad58..87e0a9f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -527,6 +527,34 @@ class BatchTableEnvironmentTest extends TableTestBase {
     }
   }
 
+  @Test
+  def testExecuteSqlWithDescribe(): Unit = {
+    val testUtil = batchTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE tbl1 (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = testUtil.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val tableResult2 = testUtil.tableEnv.executeSql("DESCRIBE tbl1")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    checkData(
+      java.util.Arrays.asList(
+        Row.of("a", "BIGINT", Boolean.box(true), null, null, null),
+        Row.of("b", "INT", Boolean.box(true), null, null, null),
+        Row.of("c", "STRING", Boolean.box(true), null, null, null)
+      ).iterator(),
+      tableResult2.collect())
+  }
+
   private def checkData(expected: util.Iterator[Row], actual: 
util.Iterator[Row]): Unit = {
     while (expected.hasNext && actual.hasNext) {
       assertEquals(expected.next(), actual.next())

Reply via email to