This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 1f0c670 [FLINK-13568][sql-parser] Fix DDL CREATE TABLE statement doesn't allow STRING data type 1f0c670 is described below commit 1f0c67087f19b10d1218af30282414f8b032f2a8 Author: yuzhao.cyz <yuzhao....@alibaba-inc.com> AuthorDate: Tue Aug 6 20:35:15 2019 +0800 [FLINK-13568][sql-parser] Fix DDL CREATE TABLE statement doesn't allow STRING data type This closes #9354 --- .../apache/flink/sql/parser/ddl/SqlColumnType.java | 62 ----- .../flink/sql/parser/ddl/SqlCreateTable.java | 12 +- .../flink/sql/parser/FlinkSqlParserImplTest.java | 13 ++ .../table/sqlexec/SqlToOperationConverterTest.java | 187 +++++++++++++++ .../table/sqlexec/SqlToOperationConverterTest.java | 255 +++++++++++++++++++++ 5 files changed, 458 insertions(+), 71 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java deleted file mode 100644 index 3e494a72..0000000 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.sql.parser.ddl; - -/** - * All supported data types in DDL. Used for Create Table DDL validation. - */ -public enum SqlColumnType { - BOOLEAN, - TINYINT, - SMALLINT, - INT, - INTEGER, - BIGINT, - REAL, - FLOAT, - DOUBLE, - DECIMAL, - DATE, - TIME, - TIMESTAMP, - VARCHAR, - VARBINARY, - ANY, - ARRAY, - MAP, - ROW, - UNSUPPORTED; - - /** Returns the column type with the string representation. **/ - public static SqlColumnType getType(String type) { - if (type == null) { - return UNSUPPORTED; - } - try { - return SqlColumnType.valueOf(type.toUpperCase()); - } catch (IllegalArgumentException var1) { - return UNSUPPORTED; - } - } - - /** Returns true if this type is unsupported. **/ - public boolean isUnsupported() { - return this.equals(UNSUPPORTED); - } -} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java index 980aec4..183e2d8 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java @@ -135,12 +135,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode { if (column instanceof SqlTableColumn) { SqlTableColumn tableColumn = (SqlTableColumn) column; columnName = tableColumn.getName().getSimple(); - String typeName = tableColumn.getType().getTypeName().getSimple(); - if (SqlColumnType.getType(typeName).isUnsupported()) { - throw new SqlParseException( - column.getParserPosition(), - "Not support type [" + typeName + "], at " + column.getParserPosition()); - } } else if (column instanceof SqlBasicCall) { SqlBasicCall tableColumn = (SqlBasicCall) column; columnName = tableColumn.getOperands()[1].toString(); @@ -241,9 +235,9 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode { @Override public void unparse( - SqlWriter writer, - int leftPrec, - int rightPrec) { + SqlWriter writer, + int leftPrec, + int rightPrec) { writer.keyword("CREATE TABLE"); tableName.unparse(writer, leftPrec, rightPrec); SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")"); diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 97bb093..7a494b3 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -408,6 +408,19 @@ public class FlinkSqlParserImplTest extends SqlParserTest { } @Test + public void testCreateTableWithUserDefinedType() { + final String sql = "create table t(\n" + + " a catalog1.db1.MyType1,\n" + + " b db2.MyType2\n" + + ") with (\n" + + " 'k1' = 'v1',\n" + + " 'k2' = 'v2'\n" + + ")"; + final String errMsg = "UDT in DDL is not supported yet."; + checkFails(sql, errMsg); + } + + @Test public void testInvalidComputedColumn() { checkFails("CREATE TABLE sls_stream (\n" + " a bigint, \n" + diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java index 6fa3c31..1acdd88 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java @@ -48,9 +48,12 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.stream.Collectors; @@ -190,6 +193,156 @@ public class SqlToOperationConverterTest { assertEquals(expectedStaticPartitions, sinkModifyOperation.getStaticPartitions()); } + @Test // TODO: tweak the tests when FLINK-13604 is fixed. + public void testCreateTableWithFullDataTypes() { + final List<TestItem> testItems = Arrays.asList( + createTestItem("CHAR", DataTypes.CHAR(1)), + createTestItem("CHAR NOT NULL", DataTypes.CHAR(1).notNull()), + createTestItem("CHAR NULL", DataTypes.CHAR(1)), + createTestItem("CHAR(33)", DataTypes.CHAR(33)), + createTestItem("VARCHAR", DataTypes.STRING()), + createTestItem("VARCHAR(33)", DataTypes.VARCHAR(33)), + createTestItem("STRING", DataTypes.STRING()), + createTestItem("BOOLEAN", DataTypes.BOOLEAN()), + createTestItem("BINARY", DataTypes.BINARY(1)), + createTestItem("BINARY(33)", DataTypes.BINARY(33)), + createTestItem("VARBINARY", DataTypes.BYTES()), + createTestItem("VARBINARY(33)", DataTypes.VARBINARY(33)), + createTestItem("BYTES", DataTypes.BYTES()), + createTestItem("DECIMAL", DataTypes.DECIMAL(10, 0)), + createTestItem("DEC", DataTypes.DECIMAL(10, 0)), + createTestItem("NUMERIC", DataTypes.DECIMAL(10, 0)), + createTestItem("DECIMAL(10)", DataTypes.DECIMAL(10, 0)), + createTestItem("DEC(10)", DataTypes.DECIMAL(10, 0)), + createTestItem("NUMERIC(10)", DataTypes.DECIMAL(10, 0)), + createTestItem("DECIMAL(10, 3)", DataTypes.DECIMAL(10, 3)), + createTestItem("DEC(10, 3)", DataTypes.DECIMAL(10, 3)), + createTestItem("NUMERIC(10, 3)", DataTypes.DECIMAL(10, 3)), + createTestItem("TINYINT", DataTypes.TINYINT()), + createTestItem("SMALLINT", DataTypes.SMALLINT()), + createTestItem("INTEGER", DataTypes.INT()), + createTestItem("INT", DataTypes.INT()), + createTestItem("BIGINT", DataTypes.BIGINT()), + createTestItem("FLOAT", DataTypes.FLOAT()), + createTestItem("DOUBLE", DataTypes.DOUBLE()), + createTestItem("DOUBLE PRECISION", DataTypes.DOUBLE()), + createTestItem("DATE", DataTypes.DATE()), + createTestItem("TIME", DataTypes.TIME()), + createTestItem("TIME WITHOUT TIME ZONE", DataTypes.TIME()), + // Expect to be TIME(3). + createTestItem("TIME(3)", DataTypes.TIME()), + // Expect to be TIME(3). + createTestItem("TIME(3) WITHOUT TIME ZONE", DataTypes.TIME()), + createTestItem("TIMESTAMP", DataTypes.TIMESTAMP(3)), + createTestItem("TIMESTAMP WITHOUT TIME ZONE", DataTypes.TIMESTAMP(3)), + createTestItem("TIMESTAMP(3)", DataTypes.TIMESTAMP(3)), + createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE", DataTypes.TIMESTAMP(3)), + createTestItem("TIMESTAMP WITH LOCAL TIME ZONE", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), + createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), + createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>", + DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))), + createTestItem("ARRAY<INT NOT NULL>", + DataTypes.ARRAY(DataTypes.INT().notNull())), + createTestItem("INT ARRAY", DataTypes.ARRAY(DataTypes.INT())), + createTestItem("INT NOT NULL ARRAY", + DataTypes.ARRAY(DataTypes.INT().notNull())), + createTestItem("INT ARRAY NOT NULL", + DataTypes.ARRAY(DataTypes.INT()).notNull()), + createTestItem("MULTISET<INT NOT NULL>", + DataTypes.MULTISET(DataTypes.INT().notNull())), + createTestItem("INT MULTISET", + DataTypes.MULTISET(DataTypes.INT())), + createTestItem("INT NOT NULL MULTISET", + DataTypes.MULTISET(DataTypes.INT().notNull())), + createTestItem("INT MULTISET NOT NULL", + DataTypes.MULTISET(DataTypes.INT()).notNull()), + createTestItem("MAP<BIGINT, BOOLEAN>", + DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BOOLEAN())), + // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>. + createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>", + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN()))), + // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>. + createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)", + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN()))), + createTestItem("ROW<`f0` INT>", + DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))), + createTestItem("ROW(`f0` INT)", + DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))), + createTestItem("ROW<>", DataTypes.ROW()), + createTestItem("ROW()", DataTypes.ROW()), + // Expect to be ROW<`f0` INT NOT NULL '...', `f1` BOOLEAN '...'>. + createTestItem("ROW<f0 INT NOT NULL 'This is a comment.'," + + " f1 BOOLEAN 'This as well.'>", + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN()))), + createTestItem("ARRAY<ROW<f0 INT, f1 BOOLEAN>>", + DataTypes.ARRAY( + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN())))), + createTestItem("ROW<f0 INT, f1 BOOLEAN> MULTISET", + DataTypes.MULTISET( + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN())))), + createTestItem("MULTISET<ROW<f0 INT, f1 BOOLEAN>>", + DataTypes.MULTISET( + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN())))), + createTestItem("ROW<f0 Row<f00 INT, f01 BOOLEAN>, " + + "f1 INT ARRAY, " + + "f2 BOOLEAN MULTISET>", + DataTypes.ROW(DataTypes.FIELD("f0", + DataTypes.ROW( + DataTypes.FIELD("f00", DataTypes.INT()), + DataTypes.FIELD("f01", DataTypes.BOOLEAN()))), + DataTypes.FIELD("f1", DataTypes.ARRAY(DataTypes.INT())), + DataTypes.FIELD("f2", DataTypes.MULTISET(DataTypes.BOOLEAN())))) + ); + StringBuilder buffer = new StringBuilder("create table t1(\n"); + for (int i = 0; i < testItems.size(); i++) { + buffer.append("f") + .append(i) + .append(" ") + .append(testItems.get(i).testExpr); + if (i == testItems.size() - 1) { + buffer.append(")"); + } else { + buffer.append(",\n"); + } + } + final String sql = buffer.toString(); + final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + SqlNode node = planner.parse(sql); + assert node instanceof SqlCreateTable; + Operation operation = SqlToOperationConverter.convert(planner, node); + TableSchema schema = ((CreateTableOperation) operation).getCatalogTable().getSchema(); + Object[] expectedDataTypes = testItems.stream().map(item -> item.expectedType).toArray(); + assertArrayEquals(expectedDataTypes, schema.getFieldDataTypes()); + } + + //~ Tool Methods ---------------------------------------------------------- + + private static TestItem createTestItem(Object... args) { + assert args.length == 2; + final String testExpr = (String) args[0]; + TestItem testItem = TestItem.fromTestExpr(testExpr); + if (args[1] instanceof String) { + testItem.withExpectedError((String) args[1]); + } else { + testItem.withExpectedType(args[1]); + } + return testItem; + } + private Operation parse(String sql, FlinkPlannerImpl planner) { SqlNode node = planner.parse(sql); return SqlToOperationConverter.convert(planner, node); @@ -200,4 +353,38 @@ public class SqlToOperationConverterTest { return plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()); } + + //~ Inner Classes ---------------------------------------------------------- + + private static class TestItem { + private final String testExpr; + @Nullable + private Object expectedType; + @Nullable + private String expectedError; + + private TestItem(String testExpr) { + this.testExpr = testExpr; + } + + static TestItem fromTestExpr(String testExpr) { + return new TestItem(testExpr); + } + + TestItem withExpectedType(Object expectedType) { + this.expectedType = expectedType; + return this; + } + + TestItem withExpectedError(String expectedError) { + this.expectedError = expectedError; + return this; + } + + @Override + public String toString() { + return this.testExpr; + } + } + } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java index 7225cbc..0424109 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java @@ -23,7 +23,9 @@ import org.apache.flink.sql.parser.dml.RichSqlInsert; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Types; import org.apache.flink.table.calcite.FlinkPlannerImpl; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogManager; @@ -43,14 +45,20 @@ import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.planner.PlanningConfigurationBuilder; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.TypeConversions; import org.apache.calcite.sql.SqlNode; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; + +import javax.annotation.Nullable; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.stream.Collectors; @@ -74,6 +82,9 @@ public class SqlToOperationConverterTest { new ExpressionBridge<>(functionCatalog, PlannerExpressionConverter.INSTANCE())); + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + @Before public void before() throws TableAlreadyExistException, DatabaseNotExistException { final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1"); @@ -195,9 +206,253 @@ public class SqlToOperationConverterTest { assertEquals(expectedStaticPartitions, sinkModifyOperation.getStaticPartitions()); } + @Test // TODO: tweak the tests when FLINK-13604 is fixed. + public void testCreateTableWithFullDataTypes() { + final List<TestItem> testItems = Arrays.asList( + // Expect to be DataTypes.CHAR(1). + createTestItem("CHAR", DataTypes.STRING()), + // Expect to be DataTypes.CHAR(1).notNull(). + createTestItem("CHAR NOT NULL", DataTypes.STRING()), + // Expect to be DataTypes.CHAR(1). + createTestItem("CHAR NULL", DataTypes.STRING()), + // Expect to be DataTypes.CHAR(33). + createTestItem("CHAR(33)", DataTypes.STRING()), + createTestItem("VARCHAR", DataTypes.STRING()), + // Expect to be DataTypes.VARCHAR(33). + createTestItem("VARCHAR(33)", DataTypes.STRING()), + createTestItem("STRING", DataTypes.STRING()), + createTestItem("BOOLEAN", DataTypes.BOOLEAN()), + // Expect to be DECIMAL(10, 0). + createTestItem("DECIMAL", + TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())), + // Expect to be DECIMAL(10, 0). + createTestItem("DEC", + TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())), + // Expect to be DECIMAL(10, 0). + createTestItem("NUMERIC", + TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())), + // Expect to be DECIMAL(10, 0). + createTestItem("DECIMAL(10)", + TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())), + // Expect to be DECIMAL(10, 0). + createTestItem("DEC(10)", + TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())), + // Expect to be DECIMAL(10, 0). + createTestItem("NUMERIC(10)", + TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())), + // Expect to be DECIMAL(10, 3). + createTestItem("DECIMAL(10, 3)", + TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())), + // Expect to be DECIMAL(10, 3). + createTestItem("DEC(10, 3)", + TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())), + // Expect to be DECIMAL(10, 3). + createTestItem("NUMERIC(10, 3)", + TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())), + createTestItem("TINYINT", DataTypes.TINYINT()), + createTestItem("SMALLINT", DataTypes.SMALLINT()), + createTestItem("INTEGER", DataTypes.INT()), + createTestItem("INT", DataTypes.INT()), + createTestItem("BIGINT", DataTypes.BIGINT()), + createTestItem("FLOAT", DataTypes.FLOAT()), + createTestItem("DOUBLE", DataTypes.DOUBLE()), + createTestItem("DOUBLE PRECISION", DataTypes.DOUBLE()), + createTestItem("DATE", + TypeConversions.fromLegacyInfoToDataType(Types.SQL_DATE())), + createTestItem("TIME", + TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())), + createTestItem("TIME WITHOUT TIME ZONE", + TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())), + // Expect to be Time(3). + createTestItem("TIME(3)", + TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())), + // Expect to be Time(3). + createTestItem("TIME(3) WITHOUT TIME ZONE", + TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())), + createTestItem("TIMESTAMP", + TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())), + createTestItem("TIMESTAMP WITHOUT TIME ZONE", + TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())), + // Expect to be timestamp(3). + createTestItem("TIMESTAMP(3)", + TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())), + // Expect to be timestamp(3). + createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE", + TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())), + // Expect to be ARRAY<INT NOT NULL>. + createTestItem("ARRAY<INT NOT NULL>", + DataTypes.ARRAY(DataTypes.INT())), + createTestItem("INT ARRAY", DataTypes.ARRAY(DataTypes.INT())), + // Expect to be ARRAY<INT NOT NULL>. + createTestItem("INT NOT NULL ARRAY", + DataTypes.ARRAY(DataTypes.INT())), + // Expect to be ARRAY<INT> NOT NULL. + createTestItem("INT ARRAY NOT NULL", + DataTypes.ARRAY(DataTypes.INT())), + // Expect to be MULTISET<INT NOT NULL>. + createTestItem("MULTISET<INT NOT NULL>", + DataTypes.MULTISET(DataTypes.INT())), + createTestItem("INT MULTISET", DataTypes.MULTISET(DataTypes.INT())), + // Expect to be MULTISET<INT NOT NULL>. + createTestItem("INT NOT NULL MULTISET", + DataTypes.MULTISET(DataTypes.INT())), + // Expect to be MULTISET<INT> NOT NULL. + createTestItem("INT MULTISET NOT NULL", + DataTypes.MULTISET(DataTypes.INT())), + createTestItem("MAP<BIGINT, BOOLEAN>", + DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BOOLEAN())), + // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>. + createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>", + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN()))), + // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>. + createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)", + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN()))), + createTestItem("ROW<`f0` INT>", + DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))), + createTestItem("ROW(`f0` INT)", + DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))), + createTestItem("ROW<>", DataTypes.ROW()), + createTestItem("ROW()", DataTypes.ROW()), + // Expect to be ROW<`f0` INT NOT NULL '...', `f1` BOOLEAN '...'>. + createTestItem("ROW<f0 INT NOT NULL 'This is a comment.', " + + "f1 BOOLEAN 'This as well.'>", + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN()))), + createTestItem("ROW<f0 INT, f1 BOOLEAN> ARRAY", + DataTypes.ARRAY( + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN())))), + createTestItem("ARRAY<ROW<f0 INT, f1 BOOLEAN>>", + DataTypes.ARRAY( + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN())))), + createTestItem("ROW<f0 INT, f1 BOOLEAN> MULTISET", + DataTypes.MULTISET( + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN())))), + createTestItem("MULTISET<ROW<f0 INT, f1 BOOLEAN>>", + DataTypes.MULTISET( + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN())))), + createTestItem("ROW<f0 Row<f00 INT, f01 BOOLEAN>, " + + "f1 INT ARRAY, " + + "f2 BOOLEAN MULTISET>", + DataTypes.ROW(DataTypes.FIELD("f0", + DataTypes.ROW( + DataTypes.FIELD("f00", DataTypes.INT()), + DataTypes.FIELD("f01", DataTypes.BOOLEAN()))), + DataTypes.FIELD("f1", DataTypes.ARRAY(DataTypes.INT())), + DataTypes.FIELD("f2", DataTypes.MULTISET(DataTypes.BOOLEAN())))) + ); + StringBuilder buffer = new StringBuilder("create table t1(\n"); + for (int i = 0; i < testItems.size(); i++) { + buffer.append("f") + .append(i) + .append(" ") + .append(testItems.get(i).testExpr); + if (i == testItems.size() - 1) { + buffer.append(")"); + } else { + buffer.append(",\n"); + } + } + final String sql = buffer.toString(); + final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + SqlNode node = planner.parse(sql); + assert node instanceof SqlCreateTable; + Operation operation = SqlToOperationConverter.convert(planner, node); + TableSchema schema = ((CreateTableOperation) operation).getCatalogTable().getSchema(); + Object[] expectedDataTypes = testItems.stream().map(item -> item.expectedType).toArray(); + assertArrayEquals(expectedDataTypes, schema.getFieldDataTypes()); + } + + @Test + public void testCreateTableWithUnSupportedDataTypes() { + final List<TestItem> testItems = Arrays.asList( + createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>", + "Type is not supported: TIMESTAMP_WITH_LOCAL_TIME_ZONE"), + createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE", + "Type is not supported: TIMESTAMP_WITH_LOCAL_TIME_ZONE"), + createTestItem("TIMESTAMP WITH LOCAL TIME ZONE", + "Type is not supported: TIMESTAMP_WITH_LOCAL_TIME_ZONE"), + createTestItem("BYTES", "Type is not supported: VARBINARY"), + createTestItem("VARBINARY(33)", "Type is not supported: VARBINARY"), + createTestItem("VARBINARY", "Type is not supported: VARBINARY"), + createTestItem("BINARY(33)", "Type is not supported: BINARY"), + createTestItem("BINARY", "Type is not supported: BINARY") + ); + final String sqlTemplate = "create table t1(\n" + + " f0 %s)"; + final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + for (TestItem item : testItems) { + String sql = String.format(sqlTemplate, item.testExpr); + SqlNode node = planner.parse(sql); + assert node instanceof SqlCreateTable; + expectedEx.expect(TableException.class); + expectedEx.expectMessage(item.expectedError); + SqlToOperationConverter.convert(planner, node); + } + } + + //~ Tool Methods ---------------------------------------------------------- + + private static TestItem createTestItem(Object... args) { + assert args.length == 2; + final String testExpr = (String) args[0]; + TestItem testItem = TestItem.fromTestExpr(testExpr); + if (args[1] instanceof String) { + testItem.withExpectedError((String) args[1]); + } else { + testItem.withExpectedType(args[1]); + } + return testItem; + } + private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) { tableConfig.setSqlDialect(sqlDialect); return planningConfigurationBuilder.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()); } + + //~ Inner Classes ---------------------------------------------------------- + + private static class TestItem { + private final String testExpr; + @Nullable + private Object expectedType; + @Nullable + private String expectedError; + + private TestItem(String testExpr) { + this.testExpr = testExpr; + } + + static TestItem fromTestExpr(String testExpr) { + return new TestItem(testExpr); + } + + TestItem withExpectedType(Object expectedType) { + this.expectedType = expectedType; + return this; + } + + TestItem withExpectedError(String expectedError) { + this.expectedError = expectedError; + return this; + } + + @Override + public String toString() { + return this.testExpr; + } + } }