This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c5c6b76 [FLINK-17112][table] Support DESCRIBE statement in Flink SQL c5c6b76 is described below commit c5c6b7612d0b5492ec4db8233709bf0a1093a1fe Author: Zhenghua Gao <doc...@gmail.com> AuthorDate: Tue May 12 20:41:16 2020 +0800 [FLINK-17112][table] Support DESCRIBE statement in Flink SQL This closes #11892 --- .../table/api/internal/TableEnvironmentImpl.java | 69 +++++++++++- .../flink/table/api/internal/TableResultImpl.java | 22 ++-- .../table/operations/DescribeTableOperation.java | 59 +++++++++++ .../org/apache/flink/table/utils/PrintUtils.java | 15 ++- .../operations/SqlToOperationConverter.java | 12 +++ .../table/planner/calcite/FlinkPlannerImpl.scala | 7 +- .../flink/table/api/TableEnvironmentTest.scala | 116 ++++++++++++++++++++- .../table/sqlexec/SqlToOperationConverter.java | 12 +++ .../flink/table/api/internal/TableEnvImpl.scala | 60 ++++++++++- .../flink/table/calcite/FlinkPlannerImpl.scala | 7 +- .../api/batch/BatchTableEnvironmentTest.scala | 28 +++++ 11 files changed, 382 insertions(+), 25 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 23752c3..de7bd96 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -38,6 +38,7 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.WatermarkSpec; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogFunction; @@ -76,6 +77,7 @@ import org.apache.flink.table.module.Module; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.CatalogQueryOperation; import org.apache.flink.table.operations.CatalogSinkModifyOperation; +import org.apache.flink.table.operations.DescribeTableOperation; import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; @@ -112,13 +114,17 @@ import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.sources.TableSourceValidation; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.utils.PrintUtils; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row; +import org.apache.commons.lang3.StringUtils; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -156,7 +162,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { "CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, " + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, " + "SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, CREATE VIEW, DROP VIEW, SHOW VIEWS, " + - "INSERT."; + "INSERT, DESCRIBE."; /** * Provides necessary methods for {@link ConnectTableDescriptor}. @@ -712,7 +718,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { .resultKind(ResultKind.SUCCESS_WITH_CONTENT) .tableSchema(tableSchema) .data(tableSink.getResultIterator()) - .setPrintStyle(TableResultImpl.PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH)) + .setPrintStyle(TableResultImpl.PrintStyle.tableau( + PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN)) .build(); } catch (Exception e) { throw new TableException("Failed to execute sql", e); @@ -966,6 +973,17 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { .data(Collections.singletonList(Row.of(explanation))) .setPrintStyle(TableResultImpl.PrintStyle.rawContent()) .build(); + } else if (operation instanceof DescribeTableOperation) { + DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation; + Optional<CatalogManager.TableLookupResult> result = + catalogManager.getTable(describeTableOperation.getSqlIdentifier()); + if (result.isPresent()) { + return buildDescribeResult(result.get().getTable().getSchema()); + } else { + throw new ValidationException(String.format( + "Tables or views with the identifier '%s' doesn't exist", + describeTableOperation.getSqlIdentifier().asSummaryString())); + } } else if (operation instanceof QueryOperation) { return executeInternal((QueryOperation) operation); } else { @@ -974,10 +992,53 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } private TableResult buildShowResult(String[] objects) { + return buildResult( + new String[]{"result"}, + new DataType[]{DataTypes.STRING()}, + Arrays.stream(objects).map((c) -> new String[]{c}).toArray(String[][]::new)); + } + + private TableResult buildDescribeResult(TableSchema schema) { + Map<String, String> fieldToWatermark = + schema.getWatermarkSpecs() + .stream() + .collect(Collectors.toMap(WatermarkSpec::getRowtimeAttribute, WatermarkSpec::getWatermarkExpr)); + + Map<String, String> fieldToPrimaryKey = new HashMap<>(); + schema.getPrimaryKey().ifPresent((p) -> { + List<String> columns = p.getColumns(); + columns.forEach((c) -> fieldToPrimaryKey.put(c, String.format("PRI(%s)", String.join(", ", columns)))); + }); + + Object[][] rows = + schema.getTableColumns() + .stream() + .map((c) -> { + LogicalType logicalType = c.getType().getLogicalType(); + return new Object[]{ + c.getName(), + StringUtils.removeEnd(logicalType.toString(), " NOT NULL"), + logicalType.isNullable(), + fieldToPrimaryKey.getOrDefault(c.getName(), null), + c.getExpr().orElse(null), + fieldToWatermark.getOrDefault(c.getName(), null)}; + }).toArray(Object[][]::new); + + return buildResult( + new String[]{"name", "type", "null", "key", "computed column", "watermark"}, + new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.BOOLEAN(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, + rows); + } + + private TableResult buildResult(String[] headers, DataType[] types, Object[][] rows) { return TableResultImpl.builder() .resultKind(ResultKind.SUCCESS_WITH_CONTENT) - .tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build()) - .data(Arrays.stream(objects).map(Row::of).collect(Collectors.toList())) + .tableSchema( + TableSchema.builder().fields( + headers, + types).build()) + .data(Arrays.stream(rows).map(Row::of).collect(Collectors.toList())) + .setPrintStyle(TableResultImpl.PrintStyle.tableau(Integer.MAX_VALUE, "")) .build(); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java index d04bb2e..5c82f5e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java @@ -92,7 +92,9 @@ class TableResultImpl implements TableResult { Iterator<Row> it = collect(); if (printStyle instanceof TableauStyle) { int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth(); - PrintUtils.printAsTableauForm(getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth); + String nullColumn = ((TableauStyle) printStyle).getNullColumn(); + PrintUtils.printAsTableauForm( + getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn); } else if (printStyle instanceof RawContentStyle) { while (it.hasNext()) { System.out.println(String.join(",", PrintUtils.rowToString(it.next()))); @@ -114,7 +116,7 @@ class TableResultImpl implements TableResult { private TableSchema tableSchema = null; private ResultKind resultKind = null; private Iterator<Row> data = null; - private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE); + private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN); private Builder() { } @@ -195,12 +197,13 @@ class TableResultImpl implements TableResult { */ public interface PrintStyle { /** - * Create a tableau print style with given max column width, + * Create a tableau print style with given max column width and null column, * which prints the result schema and content as tableau form. */ - static PrintStyle tableau(int maxColumnWidth) { + static PrintStyle tableau(int maxColumnWidth, String nullColumn) { Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should be greater than 0"); - return new TableauStyle(maxColumnWidth); + Preconditions.checkNotNull(nullColumn, "nullColumn should not be null"); + return new TableauStyle(maxColumnWidth, nullColumn); } /** @@ -217,15 +220,22 @@ class TableResultImpl implements TableResult { * print the result schema and content as tableau form. */ private static final class TableauStyle implements PrintStyle { + private final int maxColumnWidth; + private final String nullColumn; - private TableauStyle(int maxColumnWidth) { + private TableauStyle(int maxColumnWidth, String nullColumn) { this.maxColumnWidth = maxColumnWidth; + this.nullColumn = nullColumn; } int getMaxColumnWidth() { return maxColumnWidth; } + + String getNullColumn() { + return nullColumn; + } } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java new file mode 100644 index 0000000..ecd8bc7 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.table.catalog.ObjectIdentifier; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier statement. + */ +public class DescribeTableOperation implements Operation { + + private final ObjectIdentifier sqlIdentifier; + private final boolean isExtended; + + public DescribeTableOperation(ObjectIdentifier sqlIdentifier, boolean isExtended) { + this.sqlIdentifier = sqlIdentifier; + this.isExtended = isExtended; + } + + public ObjectIdentifier getSqlIdentifier() { + return sqlIdentifier; + } + + public boolean isExtended() { + return isExtended; + } + + @Override + public String asSummaryString() { + Map<String, Object> params = new LinkedHashMap<>(); + params.put("identifier", sqlIdentifier); + params.put("isExtended", isExtended); + return OperationUtils.formatWithChildren( + "DESCRIBE", + params, + Collections.emptyList(), + Operation::asSummaryString); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java index 4de488e..7916fa1 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java @@ -40,7 +40,7 @@ public class PrintUtils { // constants for printing public static final int MAX_COLUMN_WIDTH = 30; - private static final String NULL_COLUMN = "(NULL)"; + public static final String NULL_COLUMN = "(NULL)"; private static final String COLUMN_TRUNCATED_FLAG = "..."; private PrintUtils() { @@ -65,7 +65,7 @@ public class PrintUtils { TableSchema tableSchema, Iterator<Row> it, PrintWriter printWriter) { - printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH); + printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH, NULL_COLUMN); } /** @@ -87,14 +87,15 @@ public class PrintUtils { TableSchema tableSchema, Iterator<Row> it, PrintWriter printWriter, - int maxColumnWidth) { + int maxColumnWidth, + String nullColumn) { List<String[]> rows = new ArrayList<>(); // fill field names first List<TableColumn> columns = tableSchema.getTableColumns(); rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new)); while (it.hasNext()) { - rows.add(rowToString(it.next())); + rows.add(rowToString(it.next(), nullColumn)); } int[] colWidths = columnWidthsByContent(columns, rows, maxColumnWidth); @@ -123,11 +124,15 @@ public class PrintUtils { } public static String[] rowToString(Row row) { + return rowToString(row, NULL_COLUMN); + } + + public static String[] rowToString(Row row, String nullColumn) { final String[] fields = new String[row.getArity()]; for (int i = 0; i < row.getArity(); i++) { final Object field = row.getField(i); if (field == null) { - fields[i] = NULL_COLUMN; + fields[i] = nullColumn; } else { fields[i] = StringUtils.arrayAwareToString(field); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 49138d1..11dece9 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -39,6 +39,7 @@ import org.apache.flink.sql.parser.ddl.SqlUseCatalog; import org.apache.flink.sql.parser.ddl.SqlUseDatabase; import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; import org.apache.flink.sql.parser.dml.RichSqlInsert; +import org.apache.flink.sql.parser.dql.SqlRichDescribeTable; import org.apache.flink.sql.parser.dql.SqlShowCatalogs; import org.apache.flink.sql.parser.dql.SqlShowDatabases; import org.apache.flink.sql.parser.dql.SqlShowFunctions; @@ -64,6 +65,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.table.operations.CatalogSinkModifyOperation; +import org.apache.flink.table.operations.DescribeTableOperation; import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ShowCatalogsOperation; @@ -205,6 +207,8 @@ public class SqlToOperationConverter { return Optional.of(converter.convertShowViews((SqlShowViews) validated)); } else if (validated instanceof SqlExplain) { return Optional.of(converter.convertExplain((SqlExplain) validated)); + } else if (validated instanceof SqlRichDescribeTable) { + return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated)); } else if (validated.getKind().belongsTo(SqlKind.QUERY)) { return Optional.of(converter.convertSqlQuery(validated)); } else { @@ -610,6 +614,14 @@ public class SqlToOperationConverter { return new ExplainOperation(operation); } + /** Convert DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier. */ + private Operation convertDescribeTable(SqlRichDescribeTable sqlRichDescribeTable) { + UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlRichDescribeTable.fullTableName()); + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + + return new DescribeTableOperation(identifier, sqlRichDescribeTable.isExtended()); + } + /** Fallback method for sql query. */ private Operation convertSqlQuery(SqlNode node) { return toQueryOperation(flinkPlanner, node); diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala index 846f536..b38fa92 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala @@ -19,9 +19,10 @@ package org.apache.flink.table.planner.calcite import org.apache.flink.sql.parser.ExtendedSqlNode -import org.apache.flink.sql.parser.dql.{SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews} +import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews} import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader + import com.google.common.collect.ImmutableList import org.apache.calcite.config.NullCollation import org.apache.calcite.plan._ @@ -33,6 +34,7 @@ import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator} import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable} import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter} import org.apache.calcite.tools.{FrameworkConfig, RelConversionException} + import java.lang.{Boolean => JBoolean} import java.util import java.util.function.{Function => JFunction} @@ -126,7 +128,8 @@ class FlinkPlannerImpl( || sqlNode.isInstanceOf[SqlShowDatabases] || sqlNode.isInstanceOf[SqlShowTables] || sqlNode.isInstanceOf[SqlShowFunctions] - || sqlNode.isInstanceOf[SqlShowViews]) { + || sqlNode.isInstanceOf[SqlShowViews] + || sqlNode.isInstanceOf[SqlRichDescribeTable]) { return sqlNode } sqlNode match { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index 93177a9..a890d7c 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -728,9 +728,11 @@ class TableEnvironmentTest { val sourceDDL = """ |CREATE TABLE T1( - | a int, + | a int not null, | b varchar, - | c int + | c int, + | ts AS to_timestamp(b), + | WATERMARK FOR ts AS ts - INTERVAL '1' SECOND |) with ( | 'connector' = 'COLLECTION' |) @@ -975,6 +977,116 @@ class TableEnvironmentTest { } } + @Test + def testDescribeTableOrView(): Unit = { + val sourceDDL = + """ + |CREATE TABLE T1( + | f0 char(10), + | f1 varchar(10), + | f2 string, + | f3 BOOLEAN, + | f4 BINARY(10), + | f5 VARBINARY(10), + | f6 BYTES, + | f7 DECIMAL(10, 3), + | f8 TINYINT, + | f9 SMALLINT, + | f10 INTEGER, + | f11 BIGINT, + | f12 FLOAT, + | f13 DOUBLE, + | f14 DATE, + | f15 TIME, + | f16 TIMESTAMP, + | f17 TIMESTAMP(3), + | f18 TIMESTAMP WITHOUT TIME ZONE, + | f19 TIMESTAMP(3) WITH LOCAL TIME ZONE, + | f20 TIMESTAMP WITH LOCAL TIME ZONE, + | f21 ARRAY<INT>, + | f22 MAP<INT, STRING>, + | f23 ROW<f0 INT, f1 STRING>, + | f24 int not null, + | f25 varchar not null, + | f26 row<f0 int not null, f1 int> not null, + | ts AS to_timestamp(f25), + | PRIMARY KEY(f24, f26) NOT ENFORCED, + | WATERMARK FOR ts AS ts - INTERVAL '1' SECOND + |) with ( + | 'connector' = 'COLLECTION' + |) + """.stripMargin + + val viewDDL = + """ + |CREATE VIEW IF NOT EXISTS T2(d, e, f) AS SELECT f24, f25, f26 FROM T1 + """.stripMargin + tableEnv.executeSql(sourceDDL) + tableEnv.executeSql(viewDDL) + + val tableResult1 = tableEnv.executeSql("describe T1") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult1.getResultKind) + checkData( + util.Arrays.asList( + Row.of("f0", "CHAR(10)", Boolean.box(true), null, null, null), + Row.of("f1", "VARCHAR(10)", Boolean.box(true), null, null, null), + Row.of("f2", "STRING", Boolean.box(true), null, null, null), + Row.of("f3", "BOOLEAN", Boolean.box(true), null, null, null), + Row.of("f4", "BINARY(10)", Boolean.box(true), null, null, null), + Row.of("f5", "VARBINARY(10)", Boolean.box(true), null, null, null), + Row.of("f6", "BYTES", Boolean.box(true), null, null, null), + Row.of("f7", "DECIMAL(10, 3)", Boolean.box(true), null, null, null), + Row.of("f8", "TINYINT", Boolean.box(true), null, null, null), + Row.of("f9", "SMALLINT", Boolean.box(true), null, null, null), + Row.of("f10", "INT", Boolean.box(true), null, null, null), + Row.of("f11", "BIGINT", Boolean.box(true), null, null, null), + Row.of("f12", "FLOAT", Boolean.box(true), null, null, null), + Row.of("f13", "DOUBLE", Boolean.box(true), null, null, null), + Row.of("f14", "DATE", Boolean.box(true), null, null, null), + Row.of("f15", "TIME(0)", Boolean.box(true), null, null, null), + Row.of("f16", "TIMESTAMP(6)", Boolean.box(true), null, null, null), + Row.of("f17", "TIMESTAMP(3)", Boolean.box(true), null, null, null), + Row.of("f18", "TIMESTAMP(6)", Boolean.box(true), null, null, null), + Row.of("f19", "TIMESTAMP(3) WITH LOCAL TIME ZONE", Boolean.box(true), null, null, null), + Row.of("f20", "TIMESTAMP(6) WITH LOCAL TIME ZONE", Boolean.box(true), null, null, null), + Row.of("f21", "ARRAY<INT>", Boolean.box(true), null, null, null), + Row.of("f22", "MAP<INT, STRING>", Boolean.box(true), null, null, null), + Row.of("f23", "ROW<`f0` INT, `f1` STRING>", Boolean.box(true), null, null, null), + Row.of("f24", "INT", Boolean.box(false), "PRI(f24, f26)", null, null), + Row.of("f25", "STRING", Boolean.box(false), null, null, null), + Row.of("f26", "ROW<`f0` INT NOT NULL, `f1` INT>", Boolean.box(false), + "PRI(f24, f26)", null, null), + Row.of("ts", "TIMESTAMP(3)", Boolean.box(true), null, "TO_TIMESTAMP(`f25`)", + "`ts` - INTERVAL '1' SECOND") + ).iterator(), + tableResult1.collect()) + + val tableResult2 = tableEnv.executeSql("describe T2") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind) + checkData( + util.Arrays.asList( + Row.of("d", "INT", Boolean.box(false), null, null, null), + Row.of("e", "STRING", Boolean.box(false), null, null, null), + Row.of("f", "ROW<`f0` INT NOT NULL, `f1` INT>", Boolean.box(false), null, null, null) + ).iterator(), + tableResult2.collect()) + + // temporary view T2(x, y) masks permanent view T2(d, e, f) + val temporaryViewDDL = + """ + |CREATE TEMPORARY VIEW IF NOT EXISTS T2(x, y) AS SELECT f24, f25 FROM T1 + """.stripMargin + tableEnv.executeSql(temporaryViewDDL) + + val tableResult3 = tableEnv.executeSql("describe T2") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind) + checkData( + util.Arrays.asList( + Row.of("x", "INT", Boolean.box(false), null, null, null), + Row.of("y", "STRING", Boolean.box(false), null, null, null)).iterator(), + tableResult3.collect()) + } + private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = { while (expected.hasNext && actual.hasNext) { assertEquals(expected.next(), actual.next()) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java index a465f56..a0239c4 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java @@ -36,6 +36,7 @@ import org.apache.flink.sql.parser.ddl.SqlTableOption; import org.apache.flink.sql.parser.ddl.SqlUseCatalog; import org.apache.flink.sql.parser.ddl.SqlUseDatabase; import org.apache.flink.sql.parser.dml.RichSqlInsert; +import org.apache.flink.sql.parser.dql.SqlRichDescribeTable; import org.apache.flink.sql.parser.dql.SqlShowCatalogs; import org.apache.flink.sql.parser.dql.SqlShowDatabases; import org.apache.flink.sql.parser.dql.SqlShowFunctions; @@ -61,6 +62,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.operations.CatalogSinkModifyOperation; +import org.apache.flink.table.operations.DescribeTableOperation; import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.PlannerQueryOperation; @@ -193,6 +195,8 @@ public class SqlToOperationConverter { return Optional.of(converter.convertShowViews((SqlShowViews) validated)); } else if (validated instanceof SqlExplain) { return Optional.of(converter.convertExplain((SqlExplain) validated)); + } else if (validated instanceof SqlRichDescribeTable) { + return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated)); } else if (validated.getKind().belongsTo(SqlKind.QUERY)) { return Optional.of(converter.convertSqlQuery(validated)); } else { @@ -575,6 +579,14 @@ public class SqlToOperationConverter { return new ExplainOperation(operation); } + /** Convert DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier. */ + private Operation convertDescribeTable(SqlRichDescribeTable sqlRichDescribeTable) { + UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlRichDescribeTable.fullTableName()); + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + + return new DescribeTableOperation(identifier, sqlRichDescribeTable.isExtended()); + } + /** * Create a table schema from {@link SqlCreateTable}. This schema contains computed column * fields, say, we have a create table DDL statement: diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index ab89dda..46fe90e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -49,6 +49,8 @@ import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema import org.apache.calcite.sql.parser.SqlParser import org.apache.calcite.tools.FrameworkConfig +import org.apache.commons.lang3.StringUtils + import _root_.java.lang.{Iterable => JIterable, Long => JLong} import _root_.java.util.function.{Function => JFunction, Supplier => JSupplier} import _root_.java.util.{Optional, Collections => JCollections, HashMap => JHashMap, List => JList, Map => JMap} @@ -140,7 +142,7 @@ abstract class TableEnvImpl( "CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, " + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, USE CATALOG, USE [CATALOG.]DATABASE, " + "SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, CREATE VIEW, DROP VIEW, " + - "SHOW VIEWS, INSERT." + "SHOW VIEWS, INSERT, DESCRIBE." private def isStreamingMode: Boolean = this match { case _: BatchTableEnvImpl => false @@ -626,7 +628,7 @@ abstract class TableEnvImpl( .resultKind(ResultKind.SUCCESS_WITH_CONTENT) .tableSchema(tableSchema) .data(tableSink.getResultIterator) - .setPrintStyle(PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH)) + .setPrintStyle(PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN)) .build } catch { case e: Exception => @@ -809,6 +811,15 @@ abstract class TableEnvImpl( .data(JCollections.singletonList(Row.of(explanation))) .setPrintStyle(PrintStyle.rawContent()) .build + case descOperation: DescribeTableOperation => + val result = catalogManager.getTable(descOperation.getSqlIdentifier) + if (result.isPresent) { + buildDescribeResult(result.get.getTable.getSchema) + } else { + throw new ValidationException(String.format( + "Table or view with identifier '%s' doesn't exist", + descOperation.getSqlIdentifier.asSummaryString())) + } case queryOperation: QueryOperation => executeInternal(queryOperation) @@ -818,10 +829,51 @@ abstract class TableEnvImpl( } private def buildShowResult(objects: Array[String]): TableResult = { + val rows = Array.ofDim[Object](objects.length, 1) + objects.zipWithIndex.foreach { + case (obj, i) => rows(i)(0) = obj + } + buildResult(Array("result"), Array(DataTypes.STRING), rows) + } + + private def buildDescribeResult(schema: TableSchema): TableResult = { + val fieldToWatermark = + schema + .getWatermarkSpecs + .map(w => (w.getRowtimeAttribute, w.getWatermarkExpr)).toMap + val fieldToPrimaryKey = new JHashMap[String, String]() + if (schema.getPrimaryKey.isPresent) { + val columns = schema.getPrimaryKey.get.getColumns.asScala + columns.foreach(c => fieldToPrimaryKey.put(c, s"PRI(${columns.mkString(", ")})")) + } + val data = Array.ofDim[Object](schema.getFieldCount, 6) + schema.getTableColumns.asScala.zipWithIndex.foreach { + case (c, i) => { + val logicalType = c.getType.getLogicalType + data(i)(0) = c.getName + data(i)(1) = StringUtils.removeEnd(logicalType.toString, " NOT NULL") + data(i)(2) = Boolean.box(logicalType.isNullable) + data(i)(3) = fieldToPrimaryKey.getOrDefault(c.getName, null) + data(i)(4) = c.getExpr.orElse(null) + data(i)(5) = fieldToWatermark.getOrDefault(c.getName, null) + } + } + buildResult( + Array("name", "type", "null", "key", "compute column", "watermark"), + Array(DataTypes.STRING, DataTypes.STRING, DataTypes.BOOLEAN, DataTypes.STRING, + DataTypes.STRING, DataTypes.STRING), + data) + } + + private def buildResult( + headers: Array[String], + types: Array[DataType], + rows: Array[Array[Object]]): TableResult = { TableResultImpl.builder() .resultKind(ResultKind.SUCCESS_WITH_CONTENT) - .tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build()) - .data(objects.map(Row.of(_)).toList) + .tableSchema( + TableSchema.builder().fields(headers, types).build()) + .data(rows.map(Row.of(_:_*)).toList) .build() } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala index 3d7dae4..4ac4d09 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala @@ -19,9 +19,10 @@ package org.apache.flink.table.calcite import org.apache.flink.sql.parser.ExtendedSqlNode -import org.apache.flink.sql.parser.dql.{SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews} +import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews} import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.catalog.CatalogReader + import org.apache.calcite.plan.RelOptTable.ViewExpander import org.apache.calcite.plan._ import org.apache.calcite.rel.RelRoot @@ -31,6 +32,7 @@ import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator} import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable} import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter} import org.apache.calcite.tools.{FrameworkConfig, RelConversionException} + import _root_.java.lang.{Boolean => JBoolean} import _root_.java.util import _root_.java.util.function.{Function => JFunction} @@ -124,7 +126,8 @@ class FlinkPlannerImpl( || sqlNode.isInstanceOf[SqlShowDatabases] || sqlNode.isInstanceOf[SqlShowTables] || sqlNode.isInstanceOf[SqlShowFunctions] - || sqlNode.isInstanceOf[SqlShowViews]) { + || sqlNode.isInstanceOf[SqlShowViews] + || sqlNode.isInstanceOf[SqlRichDescribeTable]) { return sqlNode } sqlNode match { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala index dc6ad58..87e0a9f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala @@ -527,6 +527,34 @@ class BatchTableEnvironmentTest extends TableTestBase { } } + @Test + def testExecuteSqlWithDescribe(): Unit = { + val testUtil = batchTestUtil() + val createTableStmt = + """ + |CREATE TABLE tbl1 ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = testUtil.tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val tableResult2 = testUtil.tableEnv.executeSql("DESCRIBE tbl1") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind) + checkData( + java.util.Arrays.asList( + Row.of("a", "BIGINT", Boolean.box(true), null, null, null), + Row.of("b", "INT", Boolean.box(true), null, null, null), + Row.of("c", "STRING", Boolean.box(true), null, null, null) + ).iterator(), + tableResult2.collect()) + } + private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = { while (expected.hasNext && actual.hasNext) { assertEquals(expected.next(), actual.next())