This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a3fd687  [FLINK-13335][sql-parser] Bring the SQL CREATE TABLE DDL 
closer to FLIP-37
a3fd687 is described below

commit a3fd687a515af66df30ce4ee265e0af3c5663820
Author: yuzhao.cyz <yuzhao....@alibaba-inc.com>
AuthorDate: Fri Jul 26 22:37:53 2019 +0800

    [FLINK-13335][sql-parser] Bring the SQL CREATE TABLE DDL closer to FLIP-37
    
    This brings the SQL DDL closer to FLIP-37. However, there are a couple of
    known limitations.
    
    Currently unsupported features:
    - INTERVAL
    - ROW with comments
    - ANY
    - NULL
    - NOT NULL/NULL for top-level types
    - ignoring collation/charset
    - VARCHAR without length (=VARCHAR(1))
    - TIMESTAMP WITH TIME ZONE
    - user-defined types
    - data types in non-DDL parts (e.g. CAST(f AS STRING))
    
    This closes #9243.
---
 docs/dev/table/sql.md                              |   2 +-
 .../src/main/codegen/data/Parser.tdd               |  27 +-
 .../src/main/codegen/includes/parserImpls.ftl      | 294 +++++++++++++-
 .../flink/sql/parser/FlinkSqlDataTypeSpec.java     | 325 +++++++++++++++
 .../type/{SqlMapType.java => SqlBytesType.java}    |  31 +-
 .../apache/flink/sql/parser/type/SqlMapType.java   |   6 +-
 .../type/{SqlMapType.java => SqlMultisetType.java} |  28 +-
 .../apache/flink/sql/parser/type/SqlRowType.java   |  34 +-
 .../type/{SqlMapType.java => SqlStringType.java}   |  31 +-
 .../apache/flink/sql/parser/type/SqlTimeType.java  |  73 ++++
 .../flink/sql/parser/type/SqlTimestampType.java    |  73 ++++
 .../java/org/apache/flink/sql/parser/Fixture.java  | 115 ++++++
 .../flink/sql/parser/FlinkDDLDataTypeTest.java     | 446 +++++++++++++++++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  35 +-
 .../runtime/stream/sql/WindowAggregateITCase.scala |  10 +-
 .../runtime/stream/TimeAttributesITCase.scala      |   2 +-
 16 files changed, 1394 insertions(+), 138 deletions(-)

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 3155915..e607716 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1085,7 +1085,7 @@ Although not every SQL feature is implemented yet, some 
string combinations are
 
 {% highlight sql %}
 
-A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, 
ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, 
ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, 
BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, 
BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, 
CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, 
CHARACTERS, CHARACTER_LENGTH, CHA [...]
+A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, 
ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, 
ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, 
BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, 
BY, BYTES, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, 
CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, 
CHARACTERISTICS, CHARACTERS, CHARACTER_LENG [...]
 
 {% endhighlight %}
 
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 77a8e58..5cefc93 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -31,10 +31,16 @@
     "org.apache.flink.sql.parser.dml.RichSqlInsert",
     "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword",
     "org.apache.flink.sql.parser.type.SqlArrayType",
+    "org.apache.flink.sql.parser.type.SqlBytesType",
     "org.apache.flink.sql.parser.type.SqlMapType",
+    "org.apache.flink.sql.parser.type.SqlMultisetType",
     "org.apache.flink.sql.parser.type.SqlRowType",
+    "org.apache.flink.sql.parser.type.SqlStringType",
+    "org.apache.flink.sql.parser.type.SqlTimeType",
+    "org.apache.flink.sql.parser.type.SqlTimestampType",
     "org.apache.flink.sql.parser.utils.SqlTimeUnit",
     "org.apache.flink.sql.parser.validate.FlinkSqlConformance",
+    "org.apache.flink.sql.parser.FlinkSqlDataTypeSpec",
     "org.apache.flink.sql.parser.SqlProperty",
     "org.apache.calcite.sql.SqlDrop",
     "org.apache.calcite.sql.SqlCreate",
@@ -53,7 +59,9 @@
     "FROM_SOURCE",
     "BOUNDED",
     "DELAY",
-    "OVERWRITE"
+    "OVERWRITE",
+    "STRING",
+    "BYTES"
   ]
 
   # List of keywords from "keywords" section that are not reserved.
@@ -384,13 +392,24 @@
   literalParserMethods: [
   ]
 
-  # List of methods for parsing custom data types.
+  # List of methods for parsing ddl supported data types.
   # Return type of method implementation should be "SqlIdentifier".
   # Example: SqlParseTimeStampZ().
-  dataTypeParserMethods: [
+  flinkDataTypeParserMethods: [
     "SqlArrayType()",
+    "SqlMultisetType()",
     "SqlMapType()",
-    "SqlRowType()"
+    "SqlRowType()",
+    "SqlStringType()",
+    "SqlBytesType()",
+    "SqlTimestampType()",
+    "SqlTimeType()"
+  ]
+
+  # List of methods for parsing custom data types.
+  # Return type of method implementation should be "SqlIdentifier".
+  # Example: SqlParseTimeStampZ().
+  dataTypeParserMethods: [
   ]
 
   # List of methods for parsing builtin function calls.
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 95621fe..ae66846 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -15,6 +15,20 @@
 // limitations under the License.
 -->
 
+/**
+* Parse a nullable option, default to be nullable.
+*/
+boolean NullableOpt() :
+{
+}
+{
+    <NULL> { return true; }
+|
+    <NOT> <NULL> { return false; }
+|
+    { return true; }
+}
+
 void TableColumn(TableCreationContext context) :
 {
 }
@@ -55,14 +69,8 @@ void TableColumn2(List<SqlNode> list) :
 }
 {
     name = SimpleIdentifier()
-    type = DataType()
-    (
-        <NULL> { type = type.withNullable(true); }
-    |
-        <NOT> <NULL> { type = type.withNullable(false); }
-    |
-        { type = type.withNullable(true); }
-    )
+    <#-- #FlinkDataType already takes care of the nullable attribute. -->
+    type = FlinkDataType()
     [ <COMMENT> <QUOTED_STRING> {
         String p = SqlParserUtil.parseString(token.image);
         comment = SqlLiteral.createCharString(p, getPos());
@@ -367,53 +375,295 @@ SqlDrop SqlDropView(Span s, boolean replace) :
     }
 }
 
+SqlIdentifier FlinkCollectionsTypeName() :
+{
+}
+{
+    LOOKAHEAD(2)
+    <MULTISET> {
+        return new SqlIdentifier(SqlTypeName.MULTISET.name(), getPos());
+    }
+|
+    <ARRAY> {
+        return new SqlIdentifier(SqlTypeName.ARRAY.name(), getPos());
+    }
+}
+
+SqlIdentifier FlinkTypeName() :
+{
+    final SqlTypeName sqlTypeName;
+    final SqlIdentifier typeName;
+    final Span s = Span.of();
+}
+{
+    (
+<#-- additional types are included here -->
+<#-- make custom data types in front of Calcite core data types -->
+<#list parser.flinkDataTypeParserMethods as method>
+    <#if (method?index > 0)>
+    |
+    </#if>
+        LOOKAHEAD(2)
+        typeName = ${method}
+</#list>
+    |
+        LOOKAHEAD(2)
+        sqlTypeName = SqlTypeName(s) {
+            typeName = new SqlIdentifier(sqlTypeName.name(), s.end(this));
+        }
+    |
+        LOOKAHEAD(2)
+        typeName = FlinkCollectionsTypeName()
+    |
+        typeName = CompoundIdentifier() {
+            throw new ParseException("UDT in DDL is not supported yet.");
+        }
+    )
+    {
+        return typeName;
+    }
+}
+
+/**
+* Parse a Flink data type with nullable options, NULL -> nullable, NOT NULL -> 
not nullable.
+* Default to be nullable.
+*/
+SqlDataTypeSpec FlinkDataType() :
+{
+    final SqlIdentifier typeName;
+    SqlIdentifier collectionTypeName = null;
+    int scale = -1;
+    int precision = -1;
+    String charSetName = null;
+    final Span s;
+    boolean nullable = true;
+    boolean elementNullable = true;
+}
+{
+    typeName = FlinkTypeName() {
+        s = span();
+    }
+    [
+        <LPAREN>
+        precision = UnsignedIntLiteral()
+        [
+            <COMMA>
+            scale = UnsignedIntLiteral()
+        ]
+        <RPAREN>
+    ]
+    elementNullable = NullableOpt()
+    [
+        collectionTypeName = FlinkCollectionsTypeName()
+        nullable = NullableOpt()
+    ]
+    {
+        if (null != collectionTypeName) {
+            return new FlinkSqlDataTypeSpec(
+                    collectionTypeName,
+                    typeName,
+                    precision,
+                    scale,
+                    charSetName,
+                    nullable,
+                    elementNullable,
+                    s.end(collectionTypeName));
+        }
+        nullable = elementNullable;
+        return new FlinkSqlDataTypeSpec(typeName,
+                precision,
+                scale,
+                charSetName,
+                null,
+                nullable,
+                elementNullable,
+                s.end(this));
+    }
+}
+
+SqlIdentifier SqlStringType() :
+{
+}
+{
+    <STRING> { return new SqlStringType(getPos()); }
+}
+
+SqlIdentifier SqlBytesType() :
+{
+}
+{
+    <BYTES> { return new SqlBytesType(getPos()); }
+}
+
+boolean WithLocalTimeZone() :
+{
+}
+{
+    <WITHOUT> <TIME> <ZONE> { return false; }
+|
+    <WITH>
+    (
+         <LOCAL> <TIME> <ZONE> { return true; }
+    |
+        <TIME> <ZONE> {
+            throw new ParseException("'WITH TIME ZONE' is not supported yet, 
options: " +
+                "'WITHOUT TIME ZONE', 'WITH LOCAL TIME ZONE'.");
+        }
+    )
+|
+    { return false; }
+}
+
+SqlIdentifier SqlTimeType() :
+{
+    int precision = -1;
+    boolean withLocalTimeZone = false;
+}
+{
+    <TIME>
+    (
+        <LPAREN> precision = UnsignedIntLiteral() <RPAREN>
+    |
+        { precision = -1; }
+    )
+    withLocalTimeZone = WithLocalTimeZone()
+    { return new SqlTimeType(getPos(), precision, withLocalTimeZone); }
+}
+
+SqlIdentifier SqlTimestampType() :
+{
+    int precision = -1;
+    boolean withLocalTimeZone = false;
+}
+{
+    <TIMESTAMP>
+    (
+        <LPAREN> precision = UnsignedIntLiteral() <RPAREN>
+    |
+        { precision = -1; }
+    )
+    withLocalTimeZone = WithLocalTimeZone()
+    { return new SqlTimestampType(getPos(), precision, withLocalTimeZone); }
+}
+
 SqlIdentifier SqlArrayType() :
 {
     SqlParserPos pos;
     SqlDataTypeSpec elementType;
+    boolean nullable = true;
 }
 {
     <ARRAY> { pos = getPos(); }
-    <LT> elementType = DataType()
+    <LT>
+    elementType = FlinkDataType()
     <GT>
     {
         return new SqlArrayType(pos, elementType);
     }
 }
 
+SqlIdentifier SqlMultisetType() :
+{
+    SqlParserPos pos;
+    SqlDataTypeSpec elementType;
+    boolean nullable = true;
+}
+{
+    <MULTISET> { pos = getPos(); }
+    <LT>
+    elementType = FlinkDataType()
+    <GT>
+    {
+        return new SqlMultisetType(pos, elementType);
+    }
+}
+
 SqlIdentifier SqlMapType() :
 {
     SqlDataTypeSpec keyType;
     SqlDataTypeSpec valType;
+    boolean nullable = true;
 }
 {
     <MAP>
-    <LT> keyType = DataType()
-    <COMMA> valType = DataType()
+    <LT>
+    keyType = FlinkDataType()
+    <COMMA>
+    valType = FlinkDataType()
     <GT>
     {
         return new SqlMapType(getPos(), keyType, valType);
     }
 }
 
+/**
+* Parse a "name1 type1 ['i'm a comment'], name2 type2 ..." list.
+*/
+void FieldNameTypeCommaList(
+        List<SqlIdentifier> fieldNames,
+        List<SqlDataTypeSpec> fieldTypes,
+        List<SqlCharStringLiteral> comments) :
+{
+    SqlIdentifier fName;
+    SqlDataTypeSpec fType;
+}
+{
+    [
+        fName = SimpleIdentifier()
+        fType = FlinkDataType()
+        {
+            fieldNames.add(fName);
+            fieldTypes.add(fType);
+        }
+        (
+            <QUOTED_STRING> {
+                String p = SqlParserUtil.parseString(token.image);
+                comments.add(SqlLiteral.createCharString(p, getPos()));
+            }
+        |
+            { comments.add(null); }
+        )
+    ]
+    (
+        <COMMA>
+        fName = SimpleIdentifier()
+        fType = FlinkDataType()
+        {
+            fieldNames.add(fName);
+            fieldTypes.add(fType);
+        }
+        (
+            <QUOTED_STRING> {
+                String p = SqlParserUtil.parseString(token.image);
+                comments.add(SqlLiteral.createCharString(p, getPos()));
+            }
+        |
+            { comments.add(null); }
+        )
+    )*
+}
+
+/**
+* Parse Row type, we support both Row(name1 type1, name2 type2) and Row<name1 
type1, name2 type2>.
+* Every item type can have suffix of `NULL` or `NOT NULL` to indicate if this 
type is nullable.
+* i.e. Row(f0 int not null, f1 varchar null).
+*/
 SqlIdentifier SqlRowType() :
 {
-    SqlParserPos pos;
     List<SqlIdentifier> fieldNames = new ArrayList<SqlIdentifier>();
     List<SqlDataTypeSpec> fieldTypes = new ArrayList<SqlDataTypeSpec>();
+    List<SqlCharStringLiteral> comments = new 
ArrayList<SqlCharStringLiteral>();
 }
 {
-    <ROW> { pos = getPos(); SqlIdentifier fName; SqlDataTypeSpec fType;}
-    <LT>
-    fName = SimpleIdentifier() <COLON> fType = DataType()
-    { fieldNames.add(fName); fieldTypes.add(fType); }
+    <ROW>
     (
-        <COMMA>
-        fName = SimpleIdentifier() <COLON> fType = DataType()
-        { fieldNames.add(fName); fieldTypes.add(fType); }
-    )*
-    <GT>
+        <NE>
+    |
+        <LT> FieldNameTypeCommaList(fieldNames, fieldTypes, comments) <GT>
+    |
+        <LPAREN> FieldNameTypeCommaList(fieldNames, fieldTypes, comments) 
<RPAREN>
+    )
     {
-        return new SqlRowType(pos, fieldNames, fieldTypes);
+        return new SqlRowType(getPos(), fieldNames, fieldTypes, comments);
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlDataTypeSpec.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlDataTypeSpec.java
new file mode 100644
index 0000000..a3797d7
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlDataTypeSpec.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.type.ExtendedSqlType;
+import org.apache.flink.sql.parser.type.SqlArrayType;
+import org.apache.flink.sql.parser.type.SqlBytesType;
+import org.apache.flink.sql.parser.type.SqlMapType;
+import org.apache.flink.sql.parser.type.SqlMultisetType;
+import org.apache.flink.sql.parser.type.SqlRowType;
+import org.apache.flink.sql.parser.type.SqlStringType;
+import org.apache.flink.sql.parser.type.SqlTimeType;
+import org.apache.flink.sql.parser.type.SqlTimestampType;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCollation;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.util.Util;
+
+import java.nio.charset.Charset;
+import java.util.Objects;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a SQL data type specification in a parse tree.
+ *
+ * <p>A <code>SqlDataTypeSpec</code> is immutable; once created, you cannot
+ * change any of the fields.</p>
+ *
+ * <p>This class is an extension to {@link SqlDataTypeSpec}, we support
+ * complex type expressions like:</p>
+ *
+ * <blockquote><code>ROW(<br>
+ *   foo NUMBER(5, 2) NOT NULL,<br>
+ *   rec ROW(b BOOLEAN, i MyUDT NOT NULL))</code></blockquote>
+ *
+ * <p>Until <a 
href="https://issues.apache.org/jira/browse/CALCITE-3213";>CALCITE-3213</a>
+ * is resolved, we can remove this class.
+ */
+public class FlinkSqlDataTypeSpec extends SqlDataTypeSpec {
+       // Flag saying if the element type is nullable if this type is a 
collection type.
+       // For collection type, we mean ARRAY and MULTISET type now.
+       private Boolean elementNullable;
+
+       public FlinkSqlDataTypeSpec(
+               SqlIdentifier collectionsTypeName,
+               SqlIdentifier typeName,
+               int precision,
+               int scale,
+               String charSetName,
+               Boolean nullable,
+               Boolean elementNullable,
+               SqlParserPos pos) {
+               super(collectionsTypeName, typeName, precision, scale,
+                       charSetName, null, nullable, pos);
+               this.elementNullable = elementNullable;
+       }
+
+       public FlinkSqlDataTypeSpec(
+               SqlIdentifier collectionsTypeName,
+               SqlIdentifier typeName,
+               int precision,
+               int scale,
+               String charSetName,
+               TimeZone timeZone,
+               Boolean nullable,
+               Boolean elementNullable,
+               SqlParserPos pos) {
+               super(collectionsTypeName, typeName, precision, scale,
+                       charSetName, timeZone, nullable, pos);
+               this.elementNullable = elementNullable;
+       }
+
+       public FlinkSqlDataTypeSpec(
+               SqlIdentifier typeName,
+               int precision,
+               int scale,
+               String charSetName,
+               TimeZone timeZone,
+               Boolean nullable,
+               Boolean elementNullable,
+               SqlParserPos pos) {
+               super(null, typeName, precision, scale,
+                       charSetName, timeZone, nullable, pos);
+               this.elementNullable = elementNullable;
+       }
+
+       @Override
+       public SqlNode clone(SqlParserPos pos) {
+               return (getCollectionsTypeName() != null)
+                       ? new FlinkSqlDataTypeSpec(getCollectionsTypeName(), 
getTypeName(), getPrecision(),
+                       getScale(), getCharSetName(), getNullable(), 
this.elementNullable, pos)
+                       : new FlinkSqlDataTypeSpec(getTypeName(), 
getPrecision(), getScale(),
+                       getCharSetName(), getTimeZone(), getNullable(), 
this.elementNullable, pos);
+       }
+
+       /** Returns a copy of this data type specification with a given
+        * nullability. */
+       @Override
+       public SqlDataTypeSpec withNullable(Boolean nullable) {
+               if (Objects.equals(nullable, this.getNullable())) {
+                       return this;
+               }
+               return new FlinkSqlDataTypeSpec(getCollectionsTypeName(), 
getTypeName(),
+                       getPrecision(), getScale(), getCharSetName(), 
getTimeZone(), nullable,
+                       this.elementNullable, getParserPosition());
+       }
+
+       @Override
+       public RelDataType deriveType(RelDataTypeFactory typeFactory) {
+               // Default to be nullable.
+               return this.deriveType(typeFactory, true);
+       }
+
+       @Override
+       public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+               final SqlIdentifier typeName = getTypeName();
+               String name = typeName.getSimple();
+               if (typeName instanceof ExtendedSqlType) {
+                       typeName.unparse(writer, leftPrec, rightPrec);
+               } else if (SqlTypeName.get(name) != null) {
+                       SqlTypeName sqlTypeName = SqlTypeName.get(name);
+                       writer.keyword(name);
+                       if (sqlTypeName.allowsPrec() && this.getPrecision() >= 
0) {
+                               SqlWriter.Frame frame = 
writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
+                               writer.print(this.getPrecision());
+                               if (sqlTypeName.allowsScale() && 
this.getScale() >= 0) {
+                                       writer.sep(",", true);
+                                       writer.print(this.getScale());
+                               }
+
+                               writer.endList(frame);
+                       }
+
+                       if (this.getCharSetName() != null) {
+                               writer.keyword("CHARACTER SET");
+                               writer.identifier(this.getCharSetName(), false);
+                       }
+
+                       if (this.getCollectionsTypeName() != null) {
+                               // Fix up nullable attribute if this is a 
collection type.
+                               if (elementNullable != null && 
!elementNullable) {
+                                       writer.keyword("NOT NULL");
+                               }
+                               
writer.keyword(this.getCollectionsTypeName().getSimple());
+                       }
+               } else if (name.startsWith("_")) {
+                       writer.keyword(name.substring(1));
+               } else {
+                       this.getTypeName().unparse(writer, leftPrec, rightPrec);
+               }
+               if (getNullable() != null && !getNullable()) {
+                       writer.keyword("NOT NULL");
+               }
+       }
+
+       @Override
+       public RelDataType deriveType(RelDataTypeFactory typeFactory, boolean 
nullable) {
+               final SqlIdentifier typeName = getTypeName();
+               if (!typeName.isSimple()) {
+                       return null;
+               }
+               final String name = typeName.getSimple();
+               final SqlTypeName sqlTypeName = SqlTypeName.get(name);
+               // Try to get Flink custom data type first.
+               RelDataType type = getExtendedType(typeFactory, typeName);
+               if (type == null) {
+                       if (sqlTypeName == null) {
+                               return null;
+                       } else {
+                               // NOTE jvs 15-Jan-2009:  earlier validation is 
supposed to
+                               // have caught these, which is why it's OK for 
them
+                               // to be assertions rather than user-level 
exceptions.
+                               final int precision = getPrecision();
+                               final int scale = getScale();
+                               if ((precision >= 0) && (scale >= 0)) {
+                                       assert 
sqlTypeName.allowsPrecScale(true, true);
+                                       type = 
typeFactory.createSqlType(sqlTypeName, precision, scale);
+                               } else if (precision >= 0) {
+                                       assert sqlTypeName.allowsPrecNoScale();
+                                       type = 
typeFactory.createSqlType(sqlTypeName, precision);
+                               } else {
+                                       assert 
sqlTypeName.allowsNoPrecNoScale();
+                                       type = 
typeFactory.createSqlType(sqlTypeName);
+                               }
+                       }
+               }
+
+               if (SqlTypeUtil.inCharFamily(type)) {
+                       // Applying Syntax rule 10 from SQL:99 spec section 
6.22 "If TD is a
+                       // fixed-length, variable-length or large object 
character string,
+                       // then the collating sequence of the result of the 
<cast
+                       // specification> is the default collating sequence for 
the
+                       // character repertoire of TD and the result of the 
<cast
+                       // specification> has the Coercible coercibility 
characteristic."
+                       SqlCollation collation = SqlCollation.COERCIBLE;
+
+                       Charset charset;
+                       final String charSetName = getCharSetName();
+                       if (null == charSetName) {
+                               charset = typeFactory.getDefaultCharset();
+                       } else {
+                               String javaCharSetName =
+                                       Objects.requireNonNull(
+                                               
SqlUtil.translateCharacterSetName(charSetName), charSetName);
+                               charset = Charset.forName(javaCharSetName);
+                       }
+                       type =
+                               typeFactory.createTypeWithCharsetAndCollation(
+                                       type,
+                                       charset,
+                                       collation);
+               }
+
+               final SqlIdentifier collectionsTypeName = 
getCollectionsTypeName();
+               if (null != collectionsTypeName) {
+                       // Fix the nullability of the element type first.
+                       boolean elementNullable = true;
+                       if (this.elementNullable != null) {
+                               elementNullable = this.elementNullable;
+                       }
+                       type = typeFactory.createTypeWithNullability(type, 
elementNullable);
+
+                       final String collectionName = 
collectionsTypeName.getSimple();
+                       final SqlTypeName collectionsSqlTypeName =
+                               
Objects.requireNonNull(SqlTypeName.get(collectionName),
+                                       collectionName);
+
+                       switch (collectionsSqlTypeName) {
+                       case MULTISET:
+                               type = typeFactory.createMultisetType(type, -1);
+                               break;
+                       case ARRAY:
+                               type = typeFactory.createArrayType(type, -1);
+                               break;
+                       default:
+                               throw Util.unexpected(collectionsSqlTypeName);
+                       }
+               }
+
+               // Fix the nullability of this type.
+               if (this.getNullable() != null) {
+                       nullable = this.getNullable();
+               }
+               type = typeFactory.createTypeWithNullability(type, nullable);
+
+               return type;
+       }
+
+       private RelDataType getExtendedType(RelDataTypeFactory typeFactory, 
SqlIdentifier typeName) {
+               // quick check.
+               if (!(typeName instanceof ExtendedSqlType)) {
+                       return null;
+               }
+               if (typeName instanceof SqlBytesType) {
+                       return typeFactory.createSqlType(SqlTypeName.VARBINARY, 
Integer.MAX_VALUE);
+               } else if (typeName instanceof SqlStringType) {
+                       return typeFactory.createSqlType(SqlTypeName.VARCHAR, 
Integer.MAX_VALUE);
+               } else if (typeName instanceof SqlArrayType) {
+                       final SqlArrayType arrayType = (SqlArrayType) typeName;
+                       return 
typeFactory.createArrayType(arrayType.getElementType()
+                               .deriveType(typeFactory), -1);
+               } else if (typeName instanceof SqlMultisetType) {
+                       final SqlMultisetType multiSetType = (SqlMultisetType) 
typeName;
+                       return 
typeFactory.createMultisetType(multiSetType.getElementType()
+                               .deriveType(typeFactory), -1);
+               } else if (typeName instanceof SqlMapType) {
+                       final SqlMapType mapType = (SqlMapType) typeName;
+                       return typeFactory.createMapType(
+                               mapType.getKeyType().deriveType(typeFactory),
+                               mapType.getValType().deriveType(typeFactory));
+               } else if (typeName instanceof SqlRowType) {
+                       final SqlRowType rowType = (SqlRowType) typeName;
+                       return typeFactory.createStructType(
+                               rowType.getFieldTypes().stream().map(ft -> 
ft.deriveType(typeFactory))
+                                       .collect(Collectors.toList()),
+                               
rowType.getFieldNames().stream().map(SqlIdentifier::getSimple)
+                                       .collect(Collectors.toList()));
+               } else if (typeName instanceof SqlTimeType) {
+                       final SqlTimeType zonedTimeType = (SqlTimeType) 
typeName;
+                       if (zonedTimeType.getPrecision() >= 0) {
+                               return 
typeFactory.createSqlType(zonedTimeType.getSqlTypeName(),
+                                       zonedTimeType.getPrecision());
+                       } else {
+                               // Use default precision.
+                               return 
typeFactory.createSqlType(zonedTimeType.getSqlTypeName());
+                       }
+               } else if (typeName instanceof SqlTimestampType) {
+                       final SqlTimestampType zonedTimestampType = 
(SqlTimestampType) typeName;
+                       if (zonedTimestampType.getPrecision() >= 0) {
+                               return 
typeFactory.createSqlType(zonedTimestampType.getSqlTypeName(),
+                                       zonedTimestampType.getPrecision());
+                       } else {
+                               // Use default precision.
+                               return 
typeFactory.createSqlType(zonedTimestampType.getSqlTypeName());
+                       }
+               }
+               return null;
+       }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlBytesType.java
similarity index 55%
copy from 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
copy to 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlBytesType.java
index 588c993..dfc2d1f 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlBytesType.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,40 +18,21 @@
 
 package org.apache.flink.sql.parser.type;
 
-import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
- * Parse column of Map type.
+ * Parse type "BYTES" which is a synonym of VARBINARY(INT_MAX).
  */
-public class SqlMapType extends SqlIdentifier implements ExtendedSqlType {
+public class SqlBytesType extends SqlIdentifier implements ExtendedSqlType {
 
-       private final SqlDataTypeSpec keyType;
-       private final SqlDataTypeSpec valType;
-
-       public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, 
SqlDataTypeSpec valType) {
-               super(SqlTypeName.MAP.getName(), pos);
-               this.keyType = keyType;
-               this.valType = valType;
-       }
-
-       public SqlDataTypeSpec getKeyType() {
-               return keyType;
-       }
-
-       public SqlDataTypeSpec getValType() {
-               return valType;
+       public SqlBytesType(SqlParserPos pos) {
+               super("BYTES", pos);
        }
 
        @Override
        public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-               writer.keyword("MAP<");
-               ExtendedSqlType.unparseType(keyType, writer, leftPrec, 
rightPrec);
-               writer.sep(",");
-               ExtendedSqlType.unparseType(valType, writer, leftPrec, 
rightPrec);
-               writer.keyword(">");
+               writer.keyword("BYTES");
        }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
index 588c993..fd071c0 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
@@ -48,10 +48,12 @@ public class SqlMapType extends SqlIdentifier implements 
ExtendedSqlType {
 
        @Override
        public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-               writer.keyword("MAP<");
+               writer.keyword("MAP");
+               SqlWriter.Frame frame = 
writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
+               writer.sep(",");
                ExtendedSqlType.unparseType(keyType, writer, leftPrec, 
rightPrec);
                writer.sep(",");
                ExtendedSqlType.unparseType(valType, writer, leftPrec, 
rightPrec);
-               writer.keyword(">");
+               writer.endList(frame);
        }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMultisetType.java
similarity index 64%
copy from 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
copy to 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMultisetType.java
index 588c993..3b6f19c 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMultisetType.java
@@ -25,33 +25,25 @@ import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
- * Parse column of Map type.
+ * Parse column of MULTISET type.
  */
-public class SqlMapType extends SqlIdentifier implements ExtendedSqlType {
+public class SqlMultisetType extends SqlIdentifier implements ExtendedSqlType {
 
-       private final SqlDataTypeSpec keyType;
-       private final SqlDataTypeSpec valType;
+       private final SqlDataTypeSpec elementType;
 
-       public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, 
SqlDataTypeSpec valType) {
-               super(SqlTypeName.MAP.getName(), pos);
-               this.keyType = keyType;
-               this.valType = valType;
+       public SqlMultisetType(SqlParserPos pos, SqlDataTypeSpec elementType) {
+               super(SqlTypeName.MULTISET.getName(), pos);
+               this.elementType = elementType;
        }
 
-       public SqlDataTypeSpec getKeyType() {
-               return keyType;
-       }
-
-       public SqlDataTypeSpec getValType() {
-               return valType;
+       public SqlDataTypeSpec getElementType() {
+               return elementType;
        }
 
        @Override
        public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-               writer.keyword("MAP<");
-               ExtendedSqlType.unparseType(keyType, writer, leftPrec, 
rightPrec);
-               writer.sep(",");
-               ExtendedSqlType.unparseType(valType, writer, leftPrec, 
rightPrec);
+               writer.keyword("MULTISET<");
+               ExtendedSqlType.unparseType(this.elementType, writer, leftPrec, 
rightPrec);
                writer.keyword(">");
        }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java
index e77529e..886125c 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.sql.parser.type;
 
+import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlWriter;
@@ -34,13 +35,16 @@ public class SqlRowType extends SqlIdentifier implements 
ExtendedSqlType {
 
        private final List<SqlIdentifier> fieldNames;
        private final List<SqlDataTypeSpec> fieldTypes;
+       private final List<SqlCharStringLiteral> comments;
 
        public SqlRowType(SqlParserPos pos,
                        List<SqlIdentifier> fieldNames,
-                       List<SqlDataTypeSpec> fieldTypes) {
+                       List<SqlDataTypeSpec> fieldTypes,
+                       List<SqlCharStringLiteral> comments) {
                super(SqlTypeName.ROW.getName(), pos);
                this.fieldNames = fieldNames;
                this.fieldTypes = fieldTypes;
+               this.comments = comments;
        }
 
        public List<SqlIdentifier> getFieldNames() {
@@ -51,6 +55,10 @@ public class SqlRowType extends SqlIdentifier implements 
ExtendedSqlType {
                return fieldTypes;
        }
 
+       public List<SqlCharStringLiteral> getComments() {
+               return comments;
+       }
+
        public int getArity() {
                return fieldNames.size();
        }
@@ -65,14 +73,22 @@ public class SqlRowType extends SqlIdentifier implements 
ExtendedSqlType {
 
        @Override
        public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-               writer.keyword("ROW");
-               SqlWriter.Frame frame = 
writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
-               for (Pair<SqlIdentifier, SqlDataTypeSpec> p : 
Pair.zip(this.fieldNames, this.fieldTypes)) {
-                       writer.sep(",", false);
-                       p.left.unparse(writer, 0, 0);
-                       writer.sep(":");
-                       ExtendedSqlType.unparseType(p.right, writer, leftPrec, 
rightPrec);
+               writer.print("ROW");
+               if (getFieldNames().size() == 0) {
+                       writer.print("<>");
+               } else {
+                       SqlWriter.Frame frame = 
writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
+                       int i = 0;
+                       for (Pair<SqlIdentifier, SqlDataTypeSpec> p : 
Pair.zip(this.fieldNames, this.fieldTypes)) {
+                               writer.sep(",", false);
+                               p.left.unparse(writer, 0, 0);
+                               ExtendedSqlType.unparseType(p.right, writer, 
leftPrec, rightPrec);
+                               if (comments.get(i) != null) {
+                                       comments.get(i).unparse(writer, 
leftPrec, rightPrec);
+                               }
+                               i += 1;
+                       }
+                       writer.endList(frame);
                }
-               writer.endList(frame);
        }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlStringType.java
similarity index 55%
copy from 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
copy to 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlStringType.java
index 588c993..a134b13 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlStringType.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,40 +18,21 @@
 
 package org.apache.flink.sql.parser.type;
 
-import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
- * Parse column of Map type.
+ * Parse type "STRING" which is a synonym of VARCHAR(INT_MAX).
  */
-public class SqlMapType extends SqlIdentifier implements ExtendedSqlType {
+public class SqlStringType extends SqlIdentifier implements ExtendedSqlType {
 
-       private final SqlDataTypeSpec keyType;
-       private final SqlDataTypeSpec valType;
-
-       public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, 
SqlDataTypeSpec valType) {
-               super(SqlTypeName.MAP.getName(), pos);
-               this.keyType = keyType;
-               this.valType = valType;
-       }
-
-       public SqlDataTypeSpec getKeyType() {
-               return keyType;
-       }
-
-       public SqlDataTypeSpec getValType() {
-               return valType;
+       public SqlStringType(SqlParserPos pos) {
+               super("STRING", pos);
        }
 
        @Override
        public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-               writer.keyword("MAP<");
-               ExtendedSqlType.unparseType(keyType, writer, leftPrec, 
rightPrec);
-               writer.sep(",");
-               ExtendedSqlType.unparseType(valType, writer, leftPrec, 
rightPrec);
-               writer.keyword(">");
+               writer.keyword("STRING");
        }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimeType.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimeType.java
new file mode 100644
index 0000000..23855ce
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimeType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.type;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Parse type "TIME WITHOUT TIME ZONE", "TIME(3) WITHOUT TIME ZONE", "TIME 
WITH LOCAL TIME ZONE",
+ * or "TIME(3) WITH LOCAL TIME ZONE".
+ */
+public class SqlTimeType extends SqlIdentifier implements ExtendedSqlType {
+       private final int precision;
+       private final boolean withLocalTimeZone;
+
+       public SqlTimeType(SqlParserPos pos, int precision, boolean 
withLocalTimeZone) {
+               super(getTypeName(withLocalTimeZone), pos);
+               this.precision = precision;
+               this.withLocalTimeZone = withLocalTimeZone;
+       }
+
+       private static String getTypeName(boolean withLocalTimeZone) {
+               if (withLocalTimeZone) {
+                       return SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE.name();
+               } else {
+                       return SqlTypeName.TIME.name();
+               }
+       }
+
+       public SqlTypeName getSqlTypeName() {
+               if (withLocalTimeZone) {
+                       return SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE;
+               } else {
+                       return SqlTypeName.TIME;
+               }
+       }
+
+       public int getPrecision() {
+               return this.precision;
+       }
+
+       @Override
+       public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+               writer.keyword(SqlTypeName.TIME.name());
+               if (this.precision >= 0) {
+                       final SqlWriter.Frame frame =
+                               
writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
+                       writer.print(precision);
+                       writer.endList(frame);
+               }
+               if (this.withLocalTimeZone) {
+                       writer.keyword("WITH LOCAL TIME ZONE");
+               }
+       }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimestampType.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimestampType.java
new file mode 100644
index 0000000..09e2d08
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimestampType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.type;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Parse type "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP(3) WITHOUT TIME ZONE",
+ * "TIMESTAMP WITH LOCAL TIME ZONE", or "TIMESTAMP(3) WITH LOCAL TIME ZONE".
+ */
+public class SqlTimestampType extends SqlIdentifier implements ExtendedSqlType 
{
+       private final int precision;
+       private final boolean withLocalTimeZone;
+
+       public SqlTimestampType(SqlParserPos pos, int precision, boolean 
withLocalTimeZone) {
+               super(getTypeName(withLocalTimeZone), pos);
+               this.precision = precision;
+               this.withLocalTimeZone = withLocalTimeZone;
+       }
+
+       private static String getTypeName(boolean withLocalTimeZone) {
+               if (withLocalTimeZone) {
+                       return 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE.name();
+               } else {
+                       return SqlTypeName.TIMESTAMP.name();
+               }
+       }
+
+       public SqlTypeName getSqlTypeName() {
+               if (withLocalTimeZone) {
+                       return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+               } else {
+                       return SqlTypeName.TIMESTAMP;
+               }
+       }
+
+       public int getPrecision() {
+               return this.precision;
+       }
+
+       @Override
+       public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+               writer.keyword(SqlTypeName.TIMESTAMP.name());
+               if (this.precision >= 0) {
+                       final SqlWriter.Frame frame =
+                               
writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
+                       writer.print(precision);
+                       writer.endList(frame);
+               }
+               if (this.withLocalTimeZone) {
+                       writer.keyword("WITH LOCAL TIME ZONE");
+               }
+       }
+}
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java
new file mode 100644
index 0000000..671958e
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.List;
+
+/**
+ * Type used during tests.
+ */
+public class Fixture {
+       private final RelDataTypeFactory typeFactory;
+
+       final RelDataType char1Type;
+       final RelDataType char33Type;
+       final RelDataType varcharType;
+       final RelDataType varchar33Type;
+       final RelDataType booleanType;
+       final RelDataType binaryType;
+       final RelDataType binary33Type;
+       final RelDataType varbinaryType;
+       final RelDataType varbinary33Type;
+       final RelDataType decimalType;
+       final RelDataType decimalP10S0Type;
+       final RelDataType decimalP10S3Type;
+       final RelDataType tinyintType;
+       final RelDataType smallintType;
+       final RelDataType intType;
+       final RelDataType bigintType;
+       final RelDataType floatType;
+       final RelDataType doubleType;
+       final RelDataType dateType;
+       final RelDataType timeType;
+       final RelDataType time3Type;
+       final RelDataType timestampType;
+       final RelDataType timestamp3Type;
+       final RelDataType timestampWithLocalTimeZoneType;
+       final RelDataType timestamp3WithLocalTimeZoneType;
+       final RelDataType nullType;
+
+       Fixture(RelDataTypeFactory typeFactory) {
+               this.typeFactory = typeFactory;
+               this.char1Type = typeFactory.createSqlType(SqlTypeName.CHAR);
+               this.char33Type = typeFactory.createSqlType(SqlTypeName.CHAR, 
33);
+               this.varcharType = 
typeFactory.createSqlType(SqlTypeName.VARCHAR);
+               this.varchar33Type = 
typeFactory.createSqlType(SqlTypeName.VARCHAR, 33);
+               this.booleanType = 
typeFactory.createSqlType(SqlTypeName.BOOLEAN);
+               this.binaryType = typeFactory.createSqlType(SqlTypeName.BINARY);
+               this.binary33Type = 
typeFactory.createSqlType(SqlTypeName.BINARY, 33);
+               this.varbinaryType = 
typeFactory.createSqlType(SqlTypeName.VARBINARY);
+               this.varbinary33Type = 
typeFactory.createSqlType(SqlTypeName.VARBINARY, 33);
+               this.decimalType = 
typeFactory.createSqlType(SqlTypeName.DECIMAL);
+               this.decimalP10S0Type = 
typeFactory.createSqlType(SqlTypeName.DECIMAL, 10);
+               this.decimalP10S3Type = 
typeFactory.createSqlType(SqlTypeName.DECIMAL, 10, 3);
+               this.tinyintType = 
typeFactory.createSqlType(SqlTypeName.TINYINT);
+               this.smallintType = 
typeFactory.createSqlType(SqlTypeName.SMALLINT);
+               this.intType = typeFactory.createSqlType(SqlTypeName.INTEGER);
+               this.bigintType = typeFactory.createSqlType(SqlTypeName.BIGINT);
+               this.floatType = typeFactory.createSqlType(SqlTypeName.FLOAT);
+               this.doubleType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
+               this.dateType = typeFactory.createSqlType(SqlTypeName.DATE);
+               this.timeType = typeFactory.createSqlType(SqlTypeName.TIME);
+               this.time3Type = typeFactory.createSqlType(SqlTypeName.TIME, 3);
+               this.timestampType = 
typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+               this.timestamp3Type = 
typeFactory.createSqlType(SqlTypeName.TIMESTAMP, 3);
+               this.timestampWithLocalTimeZoneType =
+                       
typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+               this.timestamp3WithLocalTimeZoneType =
+                       
typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3);
+               this.nullType = typeFactory.createSqlType(SqlTypeName.NULL);
+       }
+
+       public RelDataType createSqlType(SqlTypeName sqlTypeName, int 
precision) {
+               return typeFactory.createSqlType(sqlTypeName, precision);
+       }
+
+       public RelDataType createArrayType(RelDataType elementType) {
+               return typeFactory.createArrayType(elementType, -1);
+       }
+
+       public RelDataType createMultisetType(RelDataType elementType) {
+               return typeFactory.createMultisetType(elementType, -1);
+       }
+
+       public RelDataType createMapType(RelDataType keyType, RelDataType 
valType) {
+               return typeFactory.createMapType(keyType, valType);
+       }
+
+       public RelDataType createStructType(List<RelDataType> keyTypes, 
List<String> names) {
+               return typeFactory.createStructType(keyTypes, names);
+       }
+
+       public RelDataType nullable(RelDataType type) {
+               return typeFactory.createTypeWithNullability(type, true);
+       }
+}
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
new file mode 100644
index 0000000..4b4499f
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.dialect.CalciteSqlDialect;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.SqlParserTest;
+import org.apache.calcite.sql.parser.SqlParserUtil;
+import org.apache.calcite.sql.pretty.SqlPrettyWriter;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.test.SqlValidatorTestCase;
+import org.apache.calcite.util.Util;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for all the sup[ported flink DDL data types.
+ */
+@RunWith(Parameterized.class)
+public class FlinkDDLDataTypeTest {
+       private FlinkSqlConformance conformance = FlinkSqlConformance.DEFAULT;
+       private static final RelDataTypeFactory TYPE_FACTORY =
+               new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+       private static final Fixture FIXTURE = new Fixture(TYPE_FACTORY);
+       private static final String DDL_FORMAT = "create table t1 (\n" +
+               "  f0 %s\n" +
+               ") with (\n" +
+               "  k1 = 'v1'\n" +
+               ")";
+
+       @Parameterized.Parameters(name = "{index}: {0}")
+       public static List<TestItem> testData() {
+               return Arrays.asList(
+                       createTestItem("CHAR", nullable(FIXTURE.char1Type), 
"CHAR"),
+                       createTestItem("CHAR NOT NULL", FIXTURE.char1Type, 
"CHAR NOT NULL"),
+                       createTestItem("CHAR   NOT \t\nNULL", 
FIXTURE.char1Type, "CHAR NOT NULL"),
+                       createTestItem("char not null", FIXTURE.char1Type, 
"CHAR NOT NULL"),
+                       createTestItem("CHAR NULL", 
nullable(FIXTURE.char1Type), "CHAR"),
+                       createTestItem("CHAR(33)", 
nullable(FIXTURE.char33Type), "CHAR(33)"),
+                       createTestItem("VARCHAR", 
nullable(FIXTURE.varcharType), "VARCHAR"),
+                       createTestItem("VARCHAR(33)", 
nullable(FIXTURE.varchar33Type), "VARCHAR(33)"),
+                       createTestItem("STRING",
+                               
nullable(FIXTURE.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE)), 
"STRING"),
+                       createTestItem("BOOLEAN", 
nullable(FIXTURE.booleanType), "BOOLEAN"),
+                       createTestItem("BINARY", nullable(FIXTURE.binaryType), 
"BINARY"),
+                       createTestItem("BINARY(33)", 
nullable(FIXTURE.binary33Type), "BINARY(33)"),
+                       createTestItem("VARBINARY", 
nullable(FIXTURE.varbinaryType), "VARBINARY"),
+                       createTestItem("VARBINARY(33)", 
nullable(FIXTURE.varbinary33Type),
+                               "VARBINARY(33)"),
+                       createTestItem("BYTES",
+                               
nullable(FIXTURE.createSqlType(SqlTypeName.VARBINARY, Integer.MAX_VALUE)),
+                               "BYTES"),
+                       createTestItem("DECIMAL", 
nullable(FIXTURE.decimalType), "DECIMAL"),
+                       createTestItem("DEC", nullable(FIXTURE.decimalType), 
"DECIMAL"),
+                       createTestItem("NUMERIC", 
nullable(FIXTURE.decimalType), "DECIMAL"),
+                       createTestItem("DECIMAL(10)", 
nullable(FIXTURE.decimalP10S0Type), "DECIMAL(10)"),
+                       createTestItem("DEC(10)", 
nullable(FIXTURE.decimalP10S0Type), "DECIMAL(10)"),
+                       createTestItem("NUMERIC(10)", 
nullable(FIXTURE.decimalP10S0Type), "DECIMAL(10)"),
+                       createTestItem("DECIMAL(10, 3)", 
nullable(FIXTURE.decimalP10S3Type),
+                               "DECIMAL(10, 3)"),
+                       createTestItem("DEC(10, 3)", 
nullable(FIXTURE.decimalP10S3Type),
+                               "DECIMAL(10, 3)"),
+                       createTestItem("NUMERIC(10, 3)", 
nullable(FIXTURE.decimalP10S3Type),
+                               "DECIMAL(10, 3)"),
+                       createTestItem("TINYINT", 
nullable(FIXTURE.tinyintType), "TINYINT"),
+                       createTestItem("SMALLINT", 
nullable(FIXTURE.smallintType), "SMALLINT"),
+                       createTestItem("INTEGER", nullable(FIXTURE.intType), 
"INTEGER"),
+                       createTestItem("INT", nullable(FIXTURE.intType), 
"INTEGER"),
+                       createTestItem("BIGINT", nullable(FIXTURE.bigintType), 
"BIGINT"),
+                       createTestItem("FLOAT", nullable(FIXTURE.floatType), 
"FLOAT"),
+                       createTestItem("DOUBLE", nullable(FIXTURE.doubleType), 
"DOUBLE"),
+                       createTestItem("DOUBLE PRECISION", 
nullable(FIXTURE.doubleType), "DOUBLE"),
+                       createTestItem("DATE", nullable(FIXTURE.dateType), 
"DATE"),
+                       createTestItem("TIME", nullable(FIXTURE.timeType), 
"TIME"),
+                       createTestItem("TIME WITHOUT TIME ZONE", 
nullable(FIXTURE.timeType), "TIME"),
+                       createTestItem("TIME(3)", nullable(FIXTURE.time3Type), 
"TIME(3)"),
+                       createTestItem("TIME(3) WITHOUT TIME ZONE", 
nullable(FIXTURE.time3Type),
+                               "TIME(3)"),
+                       createTestItem("TIMESTAMP", 
nullable(FIXTURE.timestampType), "TIMESTAMP"),
+                       createTestItem("TIMESTAMP WITHOUT TIME ZONE", 
nullable(FIXTURE.timestampType),
+                               "TIMESTAMP"),
+                       createTestItem("TIMESTAMP(3)", 
nullable(FIXTURE.timestamp3Type), "TIMESTAMP(3)"),
+                       createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE",
+                               nullable(FIXTURE.timestamp3Type), 
"TIMESTAMP(3)"),
+                       createTestItem("TIMESTAMP WITH LOCAL TIME ZONE",
+                               
nullable(FIXTURE.timestampWithLocalTimeZoneType),
+                               "TIMESTAMP WITH LOCAL TIME ZONE"),
+                       createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE",
+                               
nullable(FIXTURE.timestamp3WithLocalTimeZoneType),
+                               "TIMESTAMP(3) WITH LOCAL TIME ZONE"),
+                       createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME 
ZONE>",
+                               
nullable(FIXTURE.createArrayType(nullable(FIXTURE.timestamp3WithLocalTimeZoneType))),
+                               "ARRAY< TIMESTAMP(3) WITH LOCAL TIME ZONE >"),
+                       createTestItem("ARRAY<INT NOT NULL>",
+                               
nullable(FIXTURE.createArrayType(FIXTURE.intType)),
+                               "ARRAY< INTEGER NOT NULL >"),
+                       createTestItem("INT ARRAY",
+                               
nullable(FIXTURE.createArrayType(nullable(FIXTURE.intType))),
+                               "INTEGER ARRAY"),
+                       createTestItem("INT NOT NULL ARRAY",
+                               
nullable(FIXTURE.createArrayType(FIXTURE.intType)),
+                               "INTEGER NOT NULL ARRAY"),
+                       createTestItem("INT ARRAY NOT NULL",
+                               
FIXTURE.createArrayType(nullable(FIXTURE.intType)),
+                               "INTEGER ARRAY NOT NULL"),
+                       createTestItem("MULTISET<INT NOT NULL>",
+                               
nullable(FIXTURE.createMultisetType(FIXTURE.intType)),
+                               "MULTISET< INTEGER NOT NULL >"),
+                       createTestItem("INT MULTISET",
+                               
nullable(FIXTURE.createMultisetType(nullable(FIXTURE.intType))),
+                               "INTEGER MULTISET"),
+                       createTestItem("INT NOT NULL MULTISET",
+                               
nullable(FIXTURE.createMultisetType(FIXTURE.intType)),
+                               "INTEGER NOT NULL MULTISET"),
+                       createTestItem("INT MULTISET NOT NULL",
+                               
FIXTURE.createMultisetType(nullable(FIXTURE.intType)),
+                               "INTEGER MULTISET NOT NULL"),
+                       createTestItem("MAP<BIGINT, BOOLEAN>",
+                               nullable(FIXTURE.createMapType(
+                                       nullable(FIXTURE.bigintType),
+                                       nullable(FIXTURE.booleanType))),
+                               "MAP< BIGINT, BOOLEAN >"),
+                       createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>",
+                               nullable(FIXTURE.createStructType(
+                                       Arrays.asList(FIXTURE.intType, 
nullable(FIXTURE.booleanType)),
+                                       Arrays.asList("f0", "f1"))),
+                               "ROW< `f0` INTEGER NOT NULL, `f1` BOOLEAN >"),
+                       createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)",
+                               nullable(FIXTURE.createStructType(
+                                       Arrays.asList(FIXTURE.intType, 
nullable(FIXTURE.booleanType)),
+                                       Arrays.asList("f0", "f1"))),
+                               "ROW< `f0` INTEGER NOT NULL, `f1` BOOLEAN >"),
+                       createTestItem("ROW<`f0` INT>",
+                               nullable(FIXTURE.createStructType(
+                                       
Collections.singletonList(nullable(FIXTURE.intType)),
+                                       Collections.singletonList("f0"))),
+                               "ROW< `f0` INTEGER >"),
+                       createTestItem("ROW(`f0` INT)",
+                               nullable(FIXTURE.createStructType(
+                                       
Collections.singletonList(nullable(FIXTURE.intType)),
+                                       Collections.singletonList("f0"))),
+                               "ROW< `f0` INTEGER >"),
+                       createTestItem("ROW<>",
+                               nullable(FIXTURE.createStructType(
+                                       Collections.emptyList(),
+                                       Collections.emptyList())),
+                               "ROW<>"),
+                       createTestItem("ROW()",
+                               nullable(FIXTURE.createStructType(
+                                       Collections.emptyList(),
+                                       Collections.emptyList())),
+                               "ROW<>"),
+                       createTestItem("ROW<f0 INT NOT NULL 'This is a 
comment.', "
+                                       + "f1 BOOLEAN 'This as well.'>",
+                               nullable(FIXTURE.createStructType(
+                                       Arrays.asList(FIXTURE.intType, 
nullable(FIXTURE.booleanType)),
+                                       Arrays.asList("f0", "f1"))),
+                               "ROW< `f0` INTEGER NOT NULL 'This is a 
comment.', "
+                                       + "`f1` BOOLEAN 'This as well.' >"),
+
+                       // test parse throws error.
+                       createTestItem("TIMESTAMP WITH TIME ZONE",
+                               "'WITH TIME ZONE' is not supported yet, 
options: "
+                                       + "'WITHOUT TIME ZONE', 'WITH LOCAL 
TIME ZONE'."),
+                       createTestItem("TIMESTAMP(3) WITH TIME ZONE",
+                               "'WITH TIME ZONE' is not supported yet, 
options: "
+                                       + "'WITHOUT TIME ZONE', 'WITH LOCAL 
TIME ZONE'."),
+                       createTestItem("^NULL^",
+                               "(?s).*Encountered \"NULL\" at line 2, column 
6..*"),
+                       createTestItem("cat.db.MyType",
+                               "(?s).*UDT in DDL is not supported yet..*"),
+                       createTestItem("`db`.`MyType`",
+                               "(?s).*UDT in DDL is not supported yet..*"),
+                       createTestItem("MyType",
+                               "(?s).*UDT in DDL is not supported yet..*"),
+                       createTestItem("ARRAY<MyType>",
+                               "(?s).*UDT in DDL is not supported yet..*"),
+                       createTestItem("ROW<f0 MyType, f1 `c`.`d`.`t`>",
+                               "(?s).*UDT in DDL is not supported yet..*"),
+                       createTestItem("^INTERVAL^ YEAR",
+                               "(?s).*Encountered \"INTERVAL\" at line 2, 
column 6..*"),
+                       createTestItem("ANY(^'unknown.class'^, '')",
+                               "(?s).*Encountered \"\\\\'unknown.class\\\\'\" 
at line 2, column 10.\n.*"
+                                       + "Was expecting:\n"
+                                       + "    <UNSIGNED_INTEGER_LITERAL> ...\n"
+                                       + ".*"));
+       }
+
+       private static TestItem createTestItem(Object... args) {
+               assert args.length >= 2;
+               final String testExpr = (String) args[0];
+               TestItem testItem = TestItem.fromTestExpr(testExpr);
+               if (args[1] instanceof String) {
+                       testItem.withExpectedError((String) args[1]);
+               } else if (args[1] instanceof RelDataType) {
+                       testItem.withExpectedType((RelDataType) args[1]);
+               }
+               if (args.length == 3) {
+                       testItem.withExpectedUnparsed((String) args[2]);
+               }
+               return testItem;
+       }
+
+       @Parameterized.Parameter
+       public TestItem testItem;
+
+       @Test
+       public void testDataTypeParsing() {
+               if (testItem.expectedType != null) {
+                       checkType(testItem.testExpr, testItem.expectedType);
+               }
+       }
+
+       @Test
+       public void testThrowsError() {
+               if (testItem.expectedError != null) {
+                       checkFails(testItem.testExpr, testItem.expectedError);
+               }
+       }
+
+       @Test
+       public void testDataTypeUnparsing() {
+               if (testItem.expectedUnparsed != null) {
+                       checkUnparseTo(testItem.testExpr, 
testItem.expectedUnparsed);
+               }
+       }
+
+       private static RelDataType nullable(RelDataType type) {
+               return FIXTURE.nullable(type);
+       }
+
+       private void checkType(String typeExpr, RelDataType expectedType) {
+               this.sql(String.format(DDL_FORMAT, 
typeExpr)).checkType(expectedType);
+       }
+
+       private void checkFails(String typeExpr, String expectedMsgPattern) {
+               sql(String.format(DDL_FORMAT, 
typeExpr)).fails(expectedMsgPattern);
+       }
+
+       private void checkUnparseTo(String typeExpr, String expectedUnparsed) {
+               sql(String.format(DDL_FORMAT, 
typeExpr)).unparsedTo(expectedUnparsed);
+       }
+
+       private Tester getTester() {
+               return new TesterImpl();
+       }
+
+       private Sql sql(String sql) {
+               return new Sql(sql);
+       }
+
+       //~ Inner Classes 
----------------------------------------------------------
+
+       private static class TestItem {
+               private final String testExpr;
+               @Nullable
+               private RelDataType expectedType;
+               @Nullable
+               private String expectedError;
+               @Nullable
+               private String expectedUnparsed;
+
+               private TestItem(String testExpr) {
+                       this.testExpr = testExpr;
+               }
+
+               static TestItem fromTestExpr(String testExpr) {
+                       return new TestItem(testExpr);
+               }
+
+               TestItem withExpectedType(RelDataType expectedType) {
+                       this.expectedType = expectedType;
+                       return this;
+               }
+
+               TestItem withExpectedError(String expectedError) {
+                       this.expectedError = expectedError;
+                       return this;
+               }
+
+               TestItem withExpectedUnparsed(String expectedUnparsed) {
+                       this.expectedUnparsed = expectedUnparsed;
+                       return this;
+               }
+
+               @Override
+               public String toString() {
+                       return this.testExpr;
+               }
+       }
+
+       private class Sql {
+               private final String sql;
+
+               Sql(String sql) {
+                       this.sql = sql;
+               }
+
+               public Sql checkType(RelDataType type) {
+                       getTester().checkType(this.sql, type);
+                       return this;
+               }
+
+               public Sql fails(String expectedMsgPattern) {
+                       getTester().checkFails(this.sql, expectedMsgPattern);
+                       return this;
+               }
+
+               public Sql unparsedTo(String expectedUnparsed) {
+                       getTester().checkUnparsed(this.sql, expectedUnparsed);
+                       return this;
+               }
+       }
+
+       /**
+        * Callback to control how test actions are performed.
+        */
+       protected interface Tester {
+               void checkType(String sql, RelDataType type);
+
+               void checkFails(String sql, String expectedMsgPattern);
+
+               void checkUnparsed(String sql, String expectedUnparsed);
+       }
+
+       /**
+        * Default implementation of {@link SqlParserTest.Tester}.
+        */
+       protected class TesterImpl implements Tester {
+               private SqlParser getSqlParser(String sql) {
+                       return SqlParser.create(sql,
+                               SqlParser.configBuilder()
+                                       
.setParserFactory(FlinkSqlParserImpl.FACTORY)
+                                       .setQuoting(Quoting.BACK_TICK)
+                                       .setUnquotedCasing(Casing.UNCHANGED)
+                                       .setQuotedCasing(Casing.UNCHANGED)
+                                       .setConformance(conformance)
+                                       .build());
+               }
+
+               private SqlDialect getSqlDialect() {
+                       return new CalciteSqlDialect(SqlDialect.EMPTY_CONTEXT
+                               .withQuotedCasing(Casing.UNCHANGED)
+                               .withConformance(conformance)
+                               .withUnquotedCasing(Casing.UNCHANGED)
+                               .withIdentifierQuoteString("`"));
+               }
+
+               public void checkType(String sql, RelDataType type) {
+                       final SqlNode sqlNode = parseStmtAndHandleEx(sql);
+                       assert sqlNode instanceof SqlCreateTable;
+                       final SqlCreateTable sqlCreateTable = (SqlCreateTable) 
sqlNode;
+                       SqlNodeList columns = sqlCreateTable.getColumnList();
+                       assert columns.size() == 1;
+                       RelDataType columnType = ((SqlTableColumn) 
columns.get(0)).getType()
+                               .deriveType(TYPE_FACTORY);
+                       assertEquals(type, columnType);
+               }
+
+               private SqlNode parseStmtAndHandleEx(String sql) {
+                       final SqlNode sqlNode;
+                       try {
+                               sqlNode = getSqlParser(sql).parseStmt();
+                       } catch (SqlParseException e) {
+                               throw new RuntimeException("Error while parsing 
SQL: " + sql, e);
+                       }
+                       return sqlNode;
+               }
+
+               public void checkFails(
+                       String sql,
+                       String expectedMsgPattern) {
+                       SqlParserUtil.StringAndPos sap = 
SqlParserUtil.findPos(sql);
+                       Throwable thrown = null;
+                       try {
+                               final SqlNode sqlNode;
+                               sqlNode = getSqlParser(sap.sql).parseStmt();
+                               Util.discard(sqlNode);
+                       } catch (Throwable ex) {
+                               thrown = ex;
+                       }
+
+                       checkEx(expectedMsgPattern, sap, thrown);
+               }
+
+               public void checkUnparsed(String sql, String expectedUnparsed) {
+                       final SqlNode sqlNode = parseStmtAndHandleEx(sql);
+                       assert sqlNode instanceof SqlCreateTable;
+                       final SqlCreateTable sqlCreateTable = (SqlCreateTable) 
sqlNode;
+                       SqlNodeList columns = sqlCreateTable.getColumnList();
+                       assert columns.size() == 1;
+                       SqlDataTypeSpec dataTypeSpec = ((SqlTableColumn) 
columns.get(0)).getType();
+                       SqlWriter sqlWriter = new 
SqlPrettyWriter(getSqlDialect(), false);
+                       dataTypeSpec.unparse(sqlWriter, 0, 0);
+                       assertEquals(expectedUnparsed, 
sqlWriter.toSqlString().getSql());
+               }
+
+               private void checkEx(String expectedMsgPattern,
+                               SqlParserUtil.StringAndPos sap,
+                               Throwable thrown) {
+                       SqlValidatorTestCase.checkEx(thrown, 
expectedMsgPattern, sap);
+               }
+       }
+}
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index f045817..cc892ab 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -366,7 +366,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
                check("CREATE TABLE tbl1 (\n" +
                        "  a ARRAY<bigint>, \n" +
                        "  b MAP<int, varchar>,\n" +
-                       "  c ROW<cc0:int, cc1: float, cc2: varchar>,\n" +
+                       "  c ROW<cc0 int, cc1 float, cc2 varchar>,\n" +
+                       "  d MULTISET<varchar>,\n" +
                        "  PRIMARY KEY (a, b) \n" +
                        ") with (\n" +
                        "  x = 'y', \n" +
@@ -374,28 +375,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
                        ")\n", "CREATE TABLE `TBL1` (\n" +
                        "  `A`  ARRAY< BIGINT >,\n" +
                        "  `B`  MAP< INTEGER, VARCHAR >,\n" +
-                       "  `C`  ROW< `CC0` : INTEGER, `CC1` : FLOAT, `CC2` : 
VARCHAR >,\n" +
-                       "  PRIMARY KEY (`A`, `B`)\n" +
-                       ") WITH (\n" +
-                       "  `X` = 'y',\n" +
-                       "  `ASD` = 'data'\n" +
-                       ")");
-       }
-
-       @Test
-       public void testCreateTableWithDecimalType() {
-               check("CREATE TABLE tbl1 (\n" +
-                       "  a decimal, \n" +
-                       "  b decimal(10, 0),\n" +
-                       "  c decimal(38, 38),\n" +
-                       "  PRIMARY KEY (a, b) \n" +
-                       ") with (\n" +
-                       "  x = 'y', \n" +
-                       "  asd = 'data'\n" +
-                       ")\n", "CREATE TABLE `TBL1` (\n" +
-                       "  `A`  DECIMAL,\n" +
-                       "  `B`  DECIMAL(10, 0),\n" +
-                       "  `C`  DECIMAL(38, 38),\n" +
+                       "  `C`  ROW< `CC0` INTEGER, `CC1` FLOAT, `CC2` VARCHAR 
>,\n" +
+                       "  `D`  MULTISET< VARCHAR >,\n" +
                        "  PRIMARY KEY (`A`, `B`)\n" +
                        ") WITH (\n" +
                        "  `X` = 'y',\n" +
@@ -408,7 +389,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
                check("CREATE TABLE tbl1 (\n" +
                        "  a ARRAY<ARRAY<bigint>>, \n" +
                        "  b MAP<MAP<int, varchar>, ARRAY<varchar>>,\n" +
-                       "  c ROW<cc0:ARRAY<int>, cc1: float, cc2: varchar>,\n" +
+                       "  c ROW<cc0 ARRAY<int>, cc1 float, cc2 varchar>,\n" +
+                       "  d MULTISET<ARRAY<int>>,\n" +
                        "  PRIMARY KEY (a, b) \n" +
                        ") with (\n" +
                        "  x = 'y', \n" +
@@ -416,7 +398,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
                        ")\n", "CREATE TABLE `TBL1` (\n" +
                        "  `A`  ARRAY< ARRAY< BIGINT > >,\n" +
                        "  `B`  MAP< MAP< INTEGER, VARCHAR >, ARRAY< VARCHAR > 
>,\n" +
-                       "  `C`  ROW< `CC0` : ARRAY< INTEGER >, `CC1` : FLOAT, 
`CC2` : VARCHAR >,\n" +
+                       "  `C`  ROW< `CC0` ARRAY< INTEGER >, `CC1` FLOAT, `CC2` 
VARCHAR >,\n" +
+                       "  `D`  MULTISET< ARRAY< INTEGER > >,\n" +
                        "  PRIMARY KEY (`A`, `B`)\n" +
                        ") WITH (\n" +
                        "  `X` = 'y',\n" +
@@ -437,7 +420,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
                        ")\n", "(?s).*Encountered \"\\(\" at line 4, column 
14.\n" +
                        "Was expecting one of:\n" +
                        "    \"AS\" ...\n" +
-                       "    \"CHARACTER\" ...\n" +
+                       "    \"ARRAY\" ...\n" +
                        ".*");
        }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index 132755f..077fdfd 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -70,7 +70,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
     val sql =
       """
         |SELECT
-        |  string,
+        |  `string`,
         |  HOP_START(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' 
SECOND),
         |  HOP_ROWTIME(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' 
SECOND),
         |  COUNT(1),
@@ -79,7 +79,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
         |  COUNT(DISTINCT `float`),
         |  concat_distinct_agg(name)
         |FROM T1
-        |GROUP BY string, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL 
'0.005' SECOND)
+        |GROUP BY `string`, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL 
'0.005' SECOND)
       """.stripMargin
 
     val sink = new TestingAppendSink
@@ -122,7 +122,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
     val sql =
       """
         |SELECT
-        |  string,
+        |  `string`,
         |  SESSION_START(rowtime, INTERVAL '0.005' SECOND),
         |  SESSION_ROWTIME(rowtime, INTERVAL '0.005' SECOND),
         |  COUNT(1),
@@ -131,7 +131,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
         |  SUM(`int`),
         |  COUNT(DISTINCT name)
         |FROM T1
-        |GROUP BY string, SESSION(rowtime, INTERVAL '0.005' SECOND)
+        |GROUP BY `string`, SESSION(rowtime, INTERVAL '0.005' SECOND)
       """.stripMargin
 
     val sink = new TestingAppendSink
@@ -171,7 +171,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
     val sql =
       """
         |SELECT
-        |  string,
+        |  `string`,
         |  TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start,
         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND),
         |  COUNT(DISTINCT `long`),
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 01a55f1..b332818 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -512,7 +512,7 @@ class TimeAttributesITCase extends AbstractTestBase {
       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
     val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 
'bigdec, 'string)
     tEnv.registerTable("T1", table)
-    val querySql = "select rowtime as ts, string as msg from T1"
+    val querySql = "select rowtime as ts, `string` as msg from T1"
 
     val results = tEnv.sqlQuery(querySql).toAppendStream[Pojo1]
     results.addSink(new StreamITCase.StringSink[Pojo1])

Reply via email to