This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 1f0c670  [FLINK-13568][sql-parser] Fix DDL CREATE TABLE statement 
doesn't allow STRING data type
1f0c670 is described below

commit 1f0c67087f19b10d1218af30282414f8b032f2a8
Author: yuzhao.cyz <yuzhao....@alibaba-inc.com>
AuthorDate: Tue Aug 6 20:35:15 2019 +0800

    [FLINK-13568][sql-parser] Fix DDL CREATE TABLE statement doesn't allow 
STRING data type
    
    This closes #9354
---
 .../apache/flink/sql/parser/ddl/SqlColumnType.java |  62 -----
 .../flink/sql/parser/ddl/SqlCreateTable.java       |  12 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  13 ++
 .../table/sqlexec/SqlToOperationConverterTest.java | 187 +++++++++++++++
 .../table/sqlexec/SqlToOperationConverterTest.java | 255 +++++++++++++++++++++
 5 files changed, 458 insertions(+), 71 deletions(-)

diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java
deleted file mode 100644
index 3e494a72..0000000
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.sql.parser.ddl;
-
-/**
- * All supported data types in DDL. Used for Create Table DDL validation.
- */
-public enum SqlColumnType {
-       BOOLEAN,
-       TINYINT,
-       SMALLINT,
-       INT,
-       INTEGER,
-       BIGINT,
-       REAL,
-       FLOAT,
-       DOUBLE,
-       DECIMAL,
-       DATE,
-       TIME,
-       TIMESTAMP,
-       VARCHAR,
-       VARBINARY,
-       ANY,
-       ARRAY,
-       MAP,
-       ROW,
-       UNSUPPORTED;
-
-       /** Returns the column type with the string representation. **/
-       public static SqlColumnType getType(String type) {
-               if (type == null) {
-                       return UNSUPPORTED;
-               }
-               try {
-                       return SqlColumnType.valueOf(type.toUpperCase());
-               } catch (IllegalArgumentException var1) {
-                       return UNSUPPORTED;
-               }
-       }
-
-       /** Returns true if this type is unsupported. **/
-       public boolean isUnsupported() {
-               return this.equals(UNSUPPORTED);
-       }
-}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 980aec4..183e2d8 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -135,12 +135,6 @@ public class SqlCreateTable extends SqlCreate implements 
ExtendedSqlNode {
                                if (column instanceof SqlTableColumn) {
                                        SqlTableColumn tableColumn = 
(SqlTableColumn) column;
                                        columnName = 
tableColumn.getName().getSimple();
-                                       String typeName = 
tableColumn.getType().getTypeName().getSimple();
-                                       if 
(SqlColumnType.getType(typeName).isUnsupported()) {
-                                               throw new SqlParseException(
-                                                       
column.getParserPosition(),
-                                                       "Not support type [" + 
typeName + "], at " + column.getParserPosition());
-                                       }
                                } else if (column instanceof SqlBasicCall) {
                                        SqlBasicCall tableColumn = 
(SqlBasicCall) column;
                                        columnName = 
tableColumn.getOperands()[1].toString();
@@ -241,9 +235,9 @@ public class SqlCreateTable extends SqlCreate implements 
ExtendedSqlNode {
 
        @Override
        public void unparse(
-               SqlWriter writer,
-               int leftPrec,
-               int rightPrec) {
+                       SqlWriter writer,
+                       int leftPrec,
+                       int rightPrec) {
                writer.keyword("CREATE TABLE");
                tableName.unparse(writer, leftPrec, rightPrec);
                SqlWriter.Frame frame = 
writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 97bb093..7a494b3 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -408,6 +408,19 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
        }
 
        @Test
+       public void testCreateTableWithUserDefinedType() {
+               final String sql = "create table t(\n" +
+                       "  a catalog1.db1.MyType1,\n" +
+                       "  b db2.MyType2\n" +
+                       ") with (\n" +
+                       "  'k1' = 'v1',\n" +
+                       "  'k2' = 'v2'\n" +
+                       ")";
+               final String errMsg = "UDT in DDL is not supported yet.";
+               checkFails(sql, errMsg);
+       }
+
+       @Test
        public void testInvalidComputedColumn() {
                checkFails("CREATE TABLE sls_stream (\n" +
                        "  a bigint, \n" +
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 6fa3c31..1acdd88 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -48,9 +48,12 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
@@ -190,6 +193,156 @@ public class SqlToOperationConverterTest {
                assertEquals(expectedStaticPartitions, 
sinkModifyOperation.getStaticPartitions());
        }
 
+       @Test // TODO: tweak the tests when FLINK-13604 is fixed.
+       public void testCreateTableWithFullDataTypes() {
+               final List<TestItem> testItems = Arrays.asList(
+                       createTestItem("CHAR", DataTypes.CHAR(1)),
+                       createTestItem("CHAR NOT NULL", 
DataTypes.CHAR(1).notNull()),
+                       createTestItem("CHAR NULL", DataTypes.CHAR(1)),
+                       createTestItem("CHAR(33)", DataTypes.CHAR(33)),
+                       createTestItem("VARCHAR", DataTypes.STRING()),
+                       createTestItem("VARCHAR(33)", DataTypes.VARCHAR(33)),
+                       createTestItem("STRING", DataTypes.STRING()),
+                       createTestItem("BOOLEAN", DataTypes.BOOLEAN()),
+                       createTestItem("BINARY", DataTypes.BINARY(1)),
+                       createTestItem("BINARY(33)", DataTypes.BINARY(33)),
+                       createTestItem("VARBINARY", DataTypes.BYTES()),
+                       createTestItem("VARBINARY(33)", 
DataTypes.VARBINARY(33)),
+                       createTestItem("BYTES", DataTypes.BYTES()),
+                       createTestItem("DECIMAL", DataTypes.DECIMAL(10, 0)),
+                       createTestItem("DEC", DataTypes.DECIMAL(10, 0)),
+                       createTestItem("NUMERIC", DataTypes.DECIMAL(10, 0)),
+                       createTestItem("DECIMAL(10)", DataTypes.DECIMAL(10, 0)),
+                       createTestItem("DEC(10)", DataTypes.DECIMAL(10, 0)),
+                       createTestItem("NUMERIC(10)", DataTypes.DECIMAL(10, 0)),
+                       createTestItem("DECIMAL(10, 3)", DataTypes.DECIMAL(10, 
3)),
+                       createTestItem("DEC(10, 3)", DataTypes.DECIMAL(10, 3)),
+                       createTestItem("NUMERIC(10, 3)", DataTypes.DECIMAL(10, 
3)),
+                       createTestItem("TINYINT", DataTypes.TINYINT()),
+                       createTestItem("SMALLINT", DataTypes.SMALLINT()),
+                       createTestItem("INTEGER", DataTypes.INT()),
+                       createTestItem("INT", DataTypes.INT()),
+                       createTestItem("BIGINT", DataTypes.BIGINT()),
+                       createTestItem("FLOAT", DataTypes.FLOAT()),
+                       createTestItem("DOUBLE", DataTypes.DOUBLE()),
+                       createTestItem("DOUBLE PRECISION", DataTypes.DOUBLE()),
+                       createTestItem("DATE", DataTypes.DATE()),
+                       createTestItem("TIME", DataTypes.TIME()),
+                       createTestItem("TIME WITHOUT TIME ZONE", 
DataTypes.TIME()),
+                       // Expect to be TIME(3).
+                       createTestItem("TIME(3)", DataTypes.TIME()),
+                       // Expect to be TIME(3).
+                       createTestItem("TIME(3) WITHOUT TIME ZONE", 
DataTypes.TIME()),
+                       createTestItem("TIMESTAMP", DataTypes.TIMESTAMP(3)),
+                       createTestItem("TIMESTAMP WITHOUT TIME ZONE", 
DataTypes.TIMESTAMP(3)),
+                       createTestItem("TIMESTAMP(3)", DataTypes.TIMESTAMP(3)),
+                       createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE", 
DataTypes.TIMESTAMP(3)),
+                       createTestItem("TIMESTAMP WITH LOCAL TIME ZONE",
+                               DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+                       createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE",
+                               DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+                       createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME 
ZONE>",
+                               
DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))),
+                       createTestItem("ARRAY<INT NOT NULL>",
+                               DataTypes.ARRAY(DataTypes.INT().notNull())),
+                       createTestItem("INT ARRAY", 
DataTypes.ARRAY(DataTypes.INT())),
+                       createTestItem("INT NOT NULL ARRAY",
+                               DataTypes.ARRAY(DataTypes.INT().notNull())),
+                       createTestItem("INT ARRAY NOT NULL",
+                               DataTypes.ARRAY(DataTypes.INT()).notNull()),
+                       createTestItem("MULTISET<INT NOT NULL>",
+                               DataTypes.MULTISET(DataTypes.INT().notNull())),
+                       createTestItem("INT MULTISET",
+                               DataTypes.MULTISET(DataTypes.INT())),
+                       createTestItem("INT NOT NULL MULTISET",
+                               DataTypes.MULTISET(DataTypes.INT().notNull())),
+                       createTestItem("INT MULTISET NOT NULL",
+                               DataTypes.MULTISET(DataTypes.INT()).notNull()),
+                       createTestItem("MAP<BIGINT, BOOLEAN>",
+                               DataTypes.MAP(DataTypes.BIGINT(), 
DataTypes.BOOLEAN())),
+                       // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
+                       createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>",
+                               DataTypes.ROW(
+                                       DataTypes.FIELD("f0", DataTypes.INT()),
+                                       DataTypes.FIELD("f1", 
DataTypes.BOOLEAN()))),
+                       // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
+                       createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)",
+                               DataTypes.ROW(
+                                       DataTypes.FIELD("f0", DataTypes.INT()),
+                                       DataTypes.FIELD("f1", 
DataTypes.BOOLEAN()))),
+                       createTestItem("ROW<`f0` INT>",
+                               DataTypes.ROW(DataTypes.FIELD("f0", 
DataTypes.INT()))),
+                       createTestItem("ROW(`f0` INT)",
+                               DataTypes.ROW(DataTypes.FIELD("f0", 
DataTypes.INT()))),
+                       createTestItem("ROW<>", DataTypes.ROW()),
+                       createTestItem("ROW()", DataTypes.ROW()),
+                       // Expect to be ROW<`f0` INT NOT NULL '...', `f1` 
BOOLEAN '...'>.
+                       createTestItem("ROW<f0 INT NOT NULL 'This is a 
comment.',"
+                               + " f1 BOOLEAN 'This as well.'>",
+                               DataTypes.ROW(
+                                       DataTypes.FIELD("f0", DataTypes.INT()),
+                                       DataTypes.FIELD("f1", 
DataTypes.BOOLEAN()))),
+                       createTestItem("ARRAY<ROW<f0 INT, f1 BOOLEAN>>",
+                               DataTypes.ARRAY(
+                                       DataTypes.ROW(
+                                               DataTypes.FIELD("f0", 
DataTypes.INT()),
+                                               DataTypes.FIELD("f1", 
DataTypes.BOOLEAN())))),
+                       createTestItem("ROW<f0 INT, f1 BOOLEAN> MULTISET",
+                               DataTypes.MULTISET(
+                                       DataTypes.ROW(
+                                               DataTypes.FIELD("f0", 
DataTypes.INT()),
+                                               DataTypes.FIELD("f1", 
DataTypes.BOOLEAN())))),
+                       createTestItem("MULTISET<ROW<f0 INT, f1 BOOLEAN>>",
+                               DataTypes.MULTISET(
+                                       DataTypes.ROW(
+                                               DataTypes.FIELD("f0", 
DataTypes.INT()),
+                                               DataTypes.FIELD("f1", 
DataTypes.BOOLEAN())))),
+                       createTestItem("ROW<f0 Row<f00 INT, f01 BOOLEAN>, "
+                                       + "f1 INT ARRAY, "
+                                       + "f2 BOOLEAN MULTISET>",
+                               DataTypes.ROW(DataTypes.FIELD("f0",
+                                       DataTypes.ROW(
+                                               DataTypes.FIELD("f00", 
DataTypes.INT()),
+                                               DataTypes.FIELD("f01", 
DataTypes.BOOLEAN()))),
+                                       DataTypes.FIELD("f1", 
DataTypes.ARRAY(DataTypes.INT())),
+                                       DataTypes.FIELD("f2", 
DataTypes.MULTISET(DataTypes.BOOLEAN()))))
+               );
+               StringBuilder buffer = new StringBuilder("create table t1(\n");
+               for (int i = 0; i < testItems.size(); i++) {
+                       buffer.append("f")
+                               .append(i)
+                               .append(" ")
+                               .append(testItems.get(i).testExpr);
+                       if (i == testItems.size() - 1) {
+                               buffer.append(")");
+                       } else {
+                               buffer.append(",\n");
+                       }
+               }
+               final String sql = buffer.toString();
+               final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
+               SqlNode node = planner.parse(sql);
+               assert node instanceof SqlCreateTable;
+               Operation operation = SqlToOperationConverter.convert(planner, 
node);
+               TableSchema schema = ((CreateTableOperation) 
operation).getCatalogTable().getSchema();
+               Object[] expectedDataTypes = testItems.stream().map(item -> 
item.expectedType).toArray();
+               assertArrayEquals(expectedDataTypes, 
schema.getFieldDataTypes());
+       }
+
+       //~ Tool Methods 
----------------------------------------------------------
+
+       private static TestItem createTestItem(Object... args) {
+               assert args.length == 2;
+               final String testExpr = (String) args[0];
+               TestItem testItem = TestItem.fromTestExpr(testExpr);
+               if (args[1] instanceof String) {
+                       testItem.withExpectedError((String) args[1]);
+               } else {
+                       testItem.withExpectedType(args[1]);
+               }
+               return testItem;
+       }
+
        private Operation parse(String sql, FlinkPlannerImpl planner) {
                SqlNode node = planner.parse(sql);
                return SqlToOperationConverter.convert(planner, node);
@@ -200,4 +353,38 @@ public class SqlToOperationConverterTest {
                return 
plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(),
                        catalogManager.getCurrentDatabase());
        }
+
+       //~ Inner Classes 
----------------------------------------------------------
+
+       private static class TestItem {
+               private final String testExpr;
+               @Nullable
+               private Object expectedType;
+               @Nullable
+               private String expectedError;
+
+               private TestItem(String testExpr) {
+                       this.testExpr = testExpr;
+               }
+
+               static TestItem fromTestExpr(String testExpr) {
+                       return new TestItem(testExpr);
+               }
+
+               TestItem withExpectedType(Object expectedType) {
+                       this.expectedType = expectedType;
+                       return this;
+               }
+
+               TestItem withExpectedError(String expectedError) {
+                       this.expectedError = expectedError;
+                       return this;
+               }
+
+               @Override
+               public String toString() {
+                       return this.testExpr;
+               }
+       }
+
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 7225cbc..0424109 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -23,7 +23,9 @@ import org.apache.flink.sql.parser.dml.RichSqlInsert;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
 import org.apache.flink.table.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
@@ -43,14 +45,20 @@ import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
 import org.apache.flink.table.planner.PlanningConfigurationBuilder;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
 
 import org.apache.calcite.sql.SqlNode;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
@@ -74,6 +82,9 @@ public class SqlToOperationConverterTest {
                        new ExpressionBridge<>(functionCatalog,
                                PlannerExpressionConverter.INSTANCE()));
 
+       @Rule
+       public ExpectedException expectedEx = ExpectedException.none();
+
        @Before
        public void before() throws TableAlreadyExistException, 
DatabaseNotExistException {
                final ObjectPath path1 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t1");
@@ -195,9 +206,253 @@ public class SqlToOperationConverterTest {
                assertEquals(expectedStaticPartitions, 
sinkModifyOperation.getStaticPartitions());
        }
 
+       @Test // TODO: tweak the tests when FLINK-13604 is fixed.
+       public void testCreateTableWithFullDataTypes() {
+               final List<TestItem> testItems = Arrays.asList(
+                       // Expect to be DataTypes.CHAR(1).
+                       createTestItem("CHAR", DataTypes.STRING()),
+                       // Expect to be DataTypes.CHAR(1).notNull().
+                       createTestItem("CHAR NOT NULL", DataTypes.STRING()),
+                       // Expect to be DataTypes.CHAR(1).
+                       createTestItem("CHAR NULL", DataTypes.STRING()),
+                       // Expect to be DataTypes.CHAR(33).
+                       createTestItem("CHAR(33)", DataTypes.STRING()),
+                       createTestItem("VARCHAR", DataTypes.STRING()),
+                       // Expect to be DataTypes.VARCHAR(33).
+                       createTestItem("VARCHAR(33)", DataTypes.STRING()),
+                       createTestItem("STRING", DataTypes.STRING()),
+                       createTestItem("BOOLEAN", DataTypes.BOOLEAN()),
+                       // Expect to be DECIMAL(10, 0).
+                       createTestItem("DECIMAL",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+                       // Expect to be DECIMAL(10, 0).
+                       createTestItem("DEC",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+                       // Expect to be DECIMAL(10, 0).
+                       createTestItem("NUMERIC",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+                       // Expect to be DECIMAL(10, 0).
+                       createTestItem("DECIMAL(10)",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+                       // Expect to be DECIMAL(10, 0).
+                       createTestItem("DEC(10)",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+                       // Expect to be DECIMAL(10, 0).
+                       createTestItem("NUMERIC(10)",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+                       // Expect to be DECIMAL(10, 3).
+                       createTestItem("DECIMAL(10, 3)",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+                       // Expect to be DECIMAL(10, 3).
+                       createTestItem("DEC(10, 3)",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+                       // Expect to be DECIMAL(10, 3).
+                       createTestItem("NUMERIC(10, 3)",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+                       createTestItem("TINYINT", DataTypes.TINYINT()),
+                       createTestItem("SMALLINT", DataTypes.SMALLINT()),
+                       createTestItem("INTEGER", DataTypes.INT()),
+                       createTestItem("INT", DataTypes.INT()),
+                       createTestItem("BIGINT", DataTypes.BIGINT()),
+                       createTestItem("FLOAT", DataTypes.FLOAT()),
+                       createTestItem("DOUBLE", DataTypes.DOUBLE()),
+                       createTestItem("DOUBLE PRECISION", DataTypes.DOUBLE()),
+                       createTestItem("DATE",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.SQL_DATE())),
+                       createTestItem("TIME",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())),
+                       createTestItem("TIME WITHOUT TIME ZONE",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())),
+                       // Expect to be Time(3).
+                       createTestItem("TIME(3)",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())),
+                       // Expect to be Time(3).
+                       createTestItem("TIME(3) WITHOUT TIME ZONE",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())),
+                       createTestItem("TIMESTAMP",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())),
+                       createTestItem("TIMESTAMP WITHOUT TIME ZONE",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())),
+                       // Expect to be timestamp(3).
+                       createTestItem("TIMESTAMP(3)",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())),
+                       // Expect to be timestamp(3).
+                       createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE",
+                               
TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())),
+                       // Expect to be ARRAY<INT NOT NULL>.
+                       createTestItem("ARRAY<INT NOT NULL>",
+                               DataTypes.ARRAY(DataTypes.INT())),
+                       createTestItem("INT ARRAY", 
DataTypes.ARRAY(DataTypes.INT())),
+                       // Expect to be ARRAY<INT NOT NULL>.
+                       createTestItem("INT NOT NULL ARRAY",
+                               DataTypes.ARRAY(DataTypes.INT())),
+                       // Expect to be ARRAY<INT> NOT NULL.
+                       createTestItem("INT ARRAY NOT NULL",
+                               DataTypes.ARRAY(DataTypes.INT())),
+                       // Expect to be MULTISET<INT NOT NULL>.
+                       createTestItem("MULTISET<INT NOT NULL>",
+                               DataTypes.MULTISET(DataTypes.INT())),
+                       createTestItem("INT MULTISET", 
DataTypes.MULTISET(DataTypes.INT())),
+                       // Expect to be MULTISET<INT NOT NULL>.
+                       createTestItem("INT NOT NULL MULTISET",
+                               DataTypes.MULTISET(DataTypes.INT())),
+                       // Expect to be MULTISET<INT> NOT NULL.
+                       createTestItem("INT MULTISET NOT NULL",
+                               DataTypes.MULTISET(DataTypes.INT())),
+                       createTestItem("MAP<BIGINT, BOOLEAN>",
+                               DataTypes.MAP(DataTypes.BIGINT(), 
DataTypes.BOOLEAN())),
+                       // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
+                       createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>",
+                               DataTypes.ROW(
+                                       DataTypes.FIELD("f0", DataTypes.INT()),
+                                       DataTypes.FIELD("f1", 
DataTypes.BOOLEAN()))),
+                       // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
+                       createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)",
+                               DataTypes.ROW(
+                                       DataTypes.FIELD("f0", DataTypes.INT()),
+                                       DataTypes.FIELD("f1", 
DataTypes.BOOLEAN()))),
+                       createTestItem("ROW<`f0` INT>",
+                               DataTypes.ROW(DataTypes.FIELD("f0", 
DataTypes.INT()))),
+                       createTestItem("ROW(`f0` INT)",
+                               DataTypes.ROW(DataTypes.FIELD("f0", 
DataTypes.INT()))),
+                       createTestItem("ROW<>", DataTypes.ROW()),
+                       createTestItem("ROW()", DataTypes.ROW()),
+                       // Expect to be ROW<`f0` INT NOT NULL '...', `f1` 
BOOLEAN '...'>.
+                       createTestItem("ROW<f0 INT NOT NULL 'This is a 
comment.', "
+                                       + "f1 BOOLEAN 'This as well.'>",
+                               DataTypes.ROW(
+                                       DataTypes.FIELD("f0", DataTypes.INT()),
+                                       DataTypes.FIELD("f1", 
DataTypes.BOOLEAN()))),
+                       createTestItem("ROW<f0 INT, f1 BOOLEAN> ARRAY",
+                               DataTypes.ARRAY(
+                                       DataTypes.ROW(
+                                               DataTypes.FIELD("f0", 
DataTypes.INT()),
+                                               DataTypes.FIELD("f1", 
DataTypes.BOOLEAN())))),
+                       createTestItem("ARRAY<ROW<f0 INT, f1 BOOLEAN>>",
+                               DataTypes.ARRAY(
+                                       DataTypes.ROW(
+                                               DataTypes.FIELD("f0", 
DataTypes.INT()),
+                                               DataTypes.FIELD("f1", 
DataTypes.BOOLEAN())))),
+                       createTestItem("ROW<f0 INT, f1 BOOLEAN> MULTISET",
+                               DataTypes.MULTISET(
+                                       DataTypes.ROW(
+                                               DataTypes.FIELD("f0", 
DataTypes.INT()),
+                                               DataTypes.FIELD("f1", 
DataTypes.BOOLEAN())))),
+                       createTestItem("MULTISET<ROW<f0 INT, f1 BOOLEAN>>",
+                               DataTypes.MULTISET(
+                                       DataTypes.ROW(
+                                               DataTypes.FIELD("f0", 
DataTypes.INT()),
+                                               DataTypes.FIELD("f1", 
DataTypes.BOOLEAN())))),
+                       createTestItem("ROW<f0 Row<f00 INT, f01 BOOLEAN>, "
+                                       + "f1 INT ARRAY, "
+                                       + "f2 BOOLEAN MULTISET>",
+                               DataTypes.ROW(DataTypes.FIELD("f0",
+                                       DataTypes.ROW(
+                                               DataTypes.FIELD("f00", 
DataTypes.INT()),
+                                               DataTypes.FIELD("f01", 
DataTypes.BOOLEAN()))),
+                                       DataTypes.FIELD("f1", 
DataTypes.ARRAY(DataTypes.INT())),
+                                       DataTypes.FIELD("f2", 
DataTypes.MULTISET(DataTypes.BOOLEAN()))))
+               );
+               StringBuilder buffer = new StringBuilder("create table t1(\n");
+               for (int i = 0; i < testItems.size(); i++) {
+                       buffer.append("f")
+                               .append(i)
+                               .append(" ")
+                               .append(testItems.get(i).testExpr);
+                       if (i == testItems.size() - 1) {
+                               buffer.append(")");
+                       } else {
+                               buffer.append(",\n");
+                       }
+               }
+               final String sql = buffer.toString();
+               final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
+               SqlNode node = planner.parse(sql);
+               assert node instanceof SqlCreateTable;
+               Operation operation = SqlToOperationConverter.convert(planner, 
node);
+               TableSchema schema = ((CreateTableOperation) 
operation).getCatalogTable().getSchema();
+               Object[] expectedDataTypes = testItems.stream().map(item -> 
item.expectedType).toArray();
+               assertArrayEquals(expectedDataTypes, 
schema.getFieldDataTypes());
+       }
+
+       @Test
+       public void testCreateTableWithUnSupportedDataTypes() {
+               final List<TestItem> testItems = Arrays.asList(
+                       createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME 
ZONE>",
+                               "Type is not supported: 
TIMESTAMP_WITH_LOCAL_TIME_ZONE"),
+                       createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE",
+                               "Type is not supported: 
TIMESTAMP_WITH_LOCAL_TIME_ZONE"),
+                       createTestItem("TIMESTAMP WITH LOCAL TIME ZONE",
+                               "Type is not supported: 
TIMESTAMP_WITH_LOCAL_TIME_ZONE"),
+                       createTestItem("BYTES", "Type is not supported: 
VARBINARY"),
+                       createTestItem("VARBINARY(33)", "Type is not supported: 
VARBINARY"),
+                       createTestItem("VARBINARY", "Type is not supported: 
VARBINARY"),
+                       createTestItem("BINARY(33)", "Type is not supported: 
BINARY"),
+                       createTestItem("BINARY", "Type is not supported: 
BINARY")
+               );
+               final String sqlTemplate = "create table t1(\n" +
+                       "  f0 %s)";
+               final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
+               for (TestItem item : testItems) {
+                       String sql = String.format(sqlTemplate, item.testExpr);
+                       SqlNode node = planner.parse(sql);
+                       assert node instanceof SqlCreateTable;
+                       expectedEx.expect(TableException.class);
+                       expectedEx.expectMessage(item.expectedError);
+                       SqlToOperationConverter.convert(planner, node);
+               }
+       }
+
+       //~ Tool Methods 
----------------------------------------------------------
+
+       private static TestItem createTestItem(Object... args) {
+               assert args.length == 2;
+               final String testExpr = (String) args[0];
+               TestItem testItem = TestItem.fromTestExpr(testExpr);
+               if (args[1] instanceof String) {
+                       testItem.withExpectedError((String) args[1]);
+               } else {
+                       testItem.withExpectedType(args[1]);
+               }
+               return testItem;
+       }
+
        private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
                tableConfig.setSqlDialect(sqlDialect);
                return 
planningConfigurationBuilder.createFlinkPlanner(catalogManager.getCurrentCatalog(),
                        catalogManager.getCurrentDatabase());
        }
+
+       //~ Inner Classes 
----------------------------------------------------------
+
+       private static class TestItem {
+               private final String testExpr;
+               @Nullable
+               private Object expectedType;
+               @Nullable
+               private String expectedError;
+
+               private TestItem(String testExpr) {
+                       this.testExpr = testExpr;
+               }
+
+               static TestItem fromTestExpr(String testExpr) {
+                       return new TestItem(testExpr);
+               }
+
+               TestItem withExpectedType(Object expectedType) {
+                       this.expectedType = expectedType;
+                       return this;
+               }
+
+               TestItem withExpectedError(String expectedError) {
+                       this.expectedError = expectedError;
+                       return this;
+               }
+
+               @Override
+               public String toString() {
+                       return this.testExpr;
+               }
+       }
 }

Reply via email to