This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 78b516c  [FLINK-17431][sql-parser-hive][hive] Implement table DDLs for 
Hive di… (#11935)
78b516c is described below

commit 78b516ca366e303a0e541aa69bebe1a15ad7a3c8
Author: Rui Li <li...@apache.org>
AuthorDate: Wed May 13 11:00:13 2020 +0800

    [FLINK-17431][sql-parser-hive][hive] Implement table DDLs for Hive di… 
(#11935)
    
    * [FLINK-17431][sql-parser-hive][hive] Implement table DDLs for Hive 
dialect part 1
    
    * address comments and fix collection delim
    
    * renaming
    
    * rebase
    
    * remove unused property
    
    * don't support bytes
    
    * add enums for hive constraint traits
    
    * store NN col names and add test
    
    * disallow varchar w/o precision
    
    * fix unparse for describe table
    
    * fix unparse row format
    
    * fix col type unparse
    
    * address comments
---
 .../flink/table/catalog/hive/HiveCatalog.java      |  82 +--
 .../table/catalog/hive/util/HiveTableUtil.java     |  98 ++++
 .../flink/connectors/hive/HiveDialectTest.java     |  74 ++-
 .../flink/table/catalog/hive/HiveCatalogTest.java  |   6 +-
 .../src/main/codegen/data/Parser.tdd               |  21 +
 .../src/main/codegen/includes/parserImpls.ftl      | 548 ++++++++++++++++++++-
 .../flink/sql/parser/hive/ddl/HiveDDLUtils.java    | 221 +++++++++
 .../sql/parser/hive/ddl/SqlAlterHiveDatabase.java  |   2 +-
 .../parser/hive/ddl/SqlAlterHiveDatabaseOwner.java |   4 +-
 .../sql/parser/hive/ddl/SqlCreateHiveDatabase.java |   2 +-
 .../sql/parser/hive/ddl/SqlCreateHiveTable.java    | 484 ++++++++++++++++++
 ...HiveDatabase.java => SqlDescribeHiveTable.java} |  41 +-
 .../parser/hive/ddl/SqlHiveConstraintEnable.java   |  39 ++
 .../sql/parser/hive/ddl/SqlHiveConstraintRely.java |  39 ++
 .../parser/hive/ddl/SqlHiveConstraintTrait.java    |  59 +++
 .../parser/hive/ddl/SqlHiveConstraintValidate.java |  39 ++
 .../hive/type/ExtendedHiveStructTypeNameSpec.java  |  68 +++
 .../parser/hive/FlinkHiveSqlParserImplTest.java    | 106 ++++
 .../flink/sql/parser/ddl/SqlCreateTable.java       |   2 +-
 .../parser/ddl/constraint/SqlTableConstraint.java  |   4 +
 .../flink/sql/parser/dql/SqlRichDescribeTable.java |   2 +-
 .../type/ExtendedSqlCollectionTypeNameSpec.java    |  12 +
 .../parser/type/ExtendedSqlRowTypeNameSpec.java    |   4 +
 23 files changed, 1885 insertions(+), 72 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 128aa21..815fe41 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connectors.hive.HiveTableFactory;
+import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils;
 import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
@@ -70,6 +71,7 @@ import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.conf.Configuration;
@@ -92,9 +94,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
-import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
-import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,6 +105,7 @@ import java.net.MalformedURLException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -113,6 +113,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_COLS;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_CONSTRAINT_TRAITS;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.PK_CONSTRAINT_TRAIT;
 import static 
org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
 import static 
org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveIntStat;
 import static 
org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveLongStat;
@@ -128,8 +131,6 @@ public class HiveCatalog extends AbstractCatalog {
        public static final String DEFAULT_DB = "default";
 
        private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
-       private static final StorageFormatFactory storageFormatFactory = new 
StorageFormatFactory();
-       private static final String DEFAULT_HIVE_TABLE_STORAGE_FORMAT = 
"TextFile";
 
        // Prefix used to distinguish Flink functions from Hive functions.
        // It's appended to Flink function's class name
@@ -373,34 +374,51 @@ public class HiveCatalog extends AbstractCatalog {
                        throw new DatabaseNotExistException(getName(), 
tablePath.getDatabaseName());
                }
 
-               Table hiveTable = instantiateHiveTable(tablePath, table);
+               Table hiveTable = instantiateHiveTable(tablePath, table, 
hiveConf);
 
                UniqueConstraint pkConstraint = null;
                List<String> notNullCols = new ArrayList<>();
                boolean isGeneric = isGenericForCreate(table.getOptions());
                if (!isGeneric) {
                        pkConstraint = 
table.getSchema().getPrimaryKey().orElse(null);
-                       for (int i = 0; i < 
table.getSchema().getFieldDataTypes().length; i++) {
-                               if 
(!table.getSchema().getFieldDataTypes()[i].getLogicalType().isNullable()) {
-                                       
notNullCols.add(table.getSchema().getFieldNames()[i]);
+                       String nnColStr = 
hiveTable.getParameters().remove(NOT_NULL_COLS);
+                       if (nnColStr != null) {
+                               
notNullCols.addAll(Arrays.asList(nnColStr.split(HiveDDLUtils.COL_DELIMITER)));
+                       } else {
+                               for (int i = 0; i < 
table.getSchema().getFieldDataTypes().length; i++) {
+                                       if 
(!table.getSchema().getFieldDataTypes()[i].getLogicalType().isNullable()) {
+                                               
notNullCols.add(table.getSchema().getFieldNames()[i]);
+                                       }
                                }
                        }
                }
 
                try {
                        if (pkConstraint != null || !notNullCols.isEmpty()) {
-                               // for now we just create constraints that are 
DISABLE, NOVALIDATE, RELY
-                               Byte[] pkTraits = new Byte[pkConstraint == null 
? 0 : pkConstraint.getColumns().size()];
-                               Arrays.fill(pkTraits, 
HiveTableUtil.relyConstraint((byte) 0));
-                               Byte[] nnTraits = new Byte[notNullCols.size()];
-                               Arrays.fill(nnTraits, 
HiveTableUtil.relyConstraint((byte) 0));
+                               // extract constraint traits from table 
properties
+                               String pkTraitStr = 
hiveTable.getParameters().remove(PK_CONSTRAINT_TRAIT);
+                               byte pkTrait = pkTraitStr == null ? 
HiveDDLUtils.defaultTrait() : Byte.parseByte(pkTraitStr);
+                               List<Byte> pkTraits =
+                                               
Collections.nCopies(pkConstraint == null ? 0 : 
pkConstraint.getColumns().size(), pkTrait);
+
+                               List<Byte> nnTraits;
+                               String nnTraitsStr = 
hiveTable.getParameters().remove(NOT_NULL_CONSTRAINT_TRAITS);
+                               if (nnTraitsStr != null) {
+                                       String[] traits = 
nnTraitsStr.split(HiveDDLUtils.COL_DELIMITER);
+                                       
Preconditions.checkArgument(traits.length == notNullCols.size(),
+                                                       "Number of NOT NULL 
columns and constraint traits mismatch");
+                                       nnTraits = 
Arrays.stream(traits).map(Byte::new).collect(Collectors.toList());
+                               } else {
+                                       nnTraits = 
Collections.nCopies(notNullCols.size(), HiveDDLUtils.defaultTrait());
+                               }
+
                                client.createTableWithConstraints(
                                                hiveTable,
                                                hiveConf,
                                                pkConstraint,
-                                               Arrays.asList(pkTraits),
+                                               pkTraits,
                                                notNullCols,
-                                               Arrays.asList(nnTraits));
+                                               nnTraits);
                        } else {
                                client.createTable(hiveTable);
                        }
@@ -466,7 +484,7 @@ public class HiveCatalog extends AbstractCatalog {
                                        existingTable.getClass().getName(), 
newCatalogTable.getClass().getName()));
                }
 
-               Table newTable = instantiateHiveTable(tablePath, 
newCatalogTable);
+               Table newTable = instantiateHiveTable(tablePath, 
newCatalogTable, hiveConf);
 
                // client.alter_table() requires a valid location
                // thus, if new table doesn't have that, it reuses location of 
the old table
@@ -621,7 +639,7 @@ public class HiveCatalog extends AbstractCatalog {
        }
 
        @VisibleForTesting
-       protected static Table instantiateHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+       protected static Table instantiateHiveTable(ObjectPath tablePath, 
CatalogBaseTable table, HiveConf hiveConf) {
                if (!(table instanceof CatalogTableImpl) && !(table instanceof 
CatalogViewImpl)) {
                        throw new CatalogException(
                                        "HiveCatalog only supports 
CatalogTableImpl and CatalogViewImpl");
@@ -641,7 +659,7 @@ public class HiveCatalog extends AbstractCatalog {
 
                // Hive table's StorageDescriptor
                StorageDescriptor sd = hiveTable.getSd();
-               setStorageFormat(sd, properties);
+               HiveTableUtil.setDefaultStorageFormat(sd, hiveConf);
 
                if (isGeneric) {
                        DescriptorProperties tableSchemaProps = new 
DescriptorProperties(true);
@@ -653,7 +671,9 @@ public class HiveCatalog extends AbstractCatalog {
 
                        properties.putAll(tableSchemaProps.asMap());
                        properties = maskFlinkProperties(properties);
+                       hiveTable.setParameters(properties);
                } else {
+                       HiveTableUtil.initiateTableFromProperties(hiveTable, 
properties, hiveConf);
                        List<FieldSchema> allColumns = 
HiveTableUtil.createHiveColumns(table.getSchema());
                        // Table columns and partition keys
                        if (table instanceof CatalogTableImpl) {
@@ -673,6 +693,8 @@ public class HiveCatalog extends AbstractCatalog {
                        } else {
                                sd.setCols(allColumns);
                        }
+                       // Table properties
+                       hiveTable.getParameters().putAll(properties);
                }
 
                if (table instanceof CatalogViewImpl) {
@@ -685,23 +707,9 @@ public class HiveCatalog extends AbstractCatalog {
                        hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
                }
 
-               // Table properties
-               hiveTable.setParameters(properties);
-
                return hiveTable;
        }
 
-       private static void setStorageFormat(StorageDescriptor sd, Map<String, 
String> properties) {
-               // TODO: allow user to specify storage format. Simply use text 
format for now
-               String storageFormatName = DEFAULT_HIVE_TABLE_STORAGE_FORMAT;
-               StorageFormatDescriptor storageFormatDescriptor = 
storageFormatFactory.get(storageFormatName);
-               checkArgument(storageFormatDescriptor != null, "Unknown storage 
format " + storageFormatName);
-               sd.setInputFormat(storageFormatDescriptor.getInputFormat());
-               sd.setOutputFormat(storageFormatDescriptor.getOutputFormat());
-               String serdeLib = storageFormatDescriptor.getSerde();
-               sd.getSerdeInfo().setSerializationLib(serdeLib != null ? 
serdeLib : LazySimpleSerDe.class.getName());
-       }
-
        /**
         * Filter out Hive-created properties, and return Flink-created 
properties.
         * Note that 'is_generic' is a special key and this method will leave 
it as-is.
@@ -770,7 +778,7 @@ public class HiveCatalog extends AbstractCatalog {
                        }
                } catch (TException e) {
                        throw new CatalogException(
-                               String.format("Failed to create partition %s of 
table %s", partitionSpec, tablePath));
+                               String.format("Failed to create partition %s of 
table %s", partitionSpec, tablePath), e);
                }
        }
 
@@ -900,7 +908,7 @@ public class HiveCatalog extends AbstractCatalog {
                checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
                checkNotNull(newPartition, "New partition cannot be null");
 
-               boolean isGeneric = 
Boolean.valueOf(newPartition.getProperties().get(CatalogConfig.IS_GENERIC));
+               boolean isGeneric = 
isGenericForGet(newPartition.getProperties());
 
                if (isGeneric) {
                        throw new CatalogException("Currently only supports 
non-generic CatalogPartition");
@@ -942,8 +950,8 @@ public class HiveCatalog extends AbstractCatalog {
 
        // make sure both table and partition are generic, or neither is
        private static void ensureTableAndPartitionMatch(Table hiveTable, 
CatalogPartition catalogPartition) {
-               boolean tableIsGeneric = 
Boolean.valueOf(hiveTable.getParameters().get(CatalogConfig.IS_GENERIC));
-               boolean partitionIsGeneric = 
Boolean.valueOf(catalogPartition.getProperties().get(CatalogConfig.IS_GENERIC));
+               boolean tableIsGeneric = 
isGenericForGet(hiveTable.getParameters());
+               boolean partitionIsGeneric = 
isGenericForGet(catalogPartition.getProperties());
 
                if (tableIsGeneric != partitionIsGeneric) {
                        throw new CatalogException(String.format("Cannot handle 
%s partition for %s table",
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index a696708..656a93a 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.catalog.hive.util;
 
+import 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
@@ -33,9 +34,16 @@ import 
org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileStorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 import java.util.ArrayList;
@@ -44,6 +52,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_INFO_PROP_PREFIX;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_LIB_CLASS_NAME;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_FILE_FORMAT;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_INPUT_FORMAT;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_OUTPUT_FORMAT;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_IS_EXTERNAL;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_LOCATION_URI;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Utils to for Hive-backed table.
@@ -54,6 +72,8 @@ public class HiveTableUtil {
        private static final byte HIVE_CONSTRAINT_VALIDATE = 1 << 1;
        private static final byte HIVE_CONSTRAINT_RELY = 1;
 
+       private static final StorageFormatFactory storageFormatFactory = new 
StorageFormatFactory();
+
        private HiveTableUtil() {
        }
 
@@ -175,6 +195,84 @@ public class HiveTableUtil {
                return Optional.of(String.join(" and ", filters));
        }
 
+       /**
+        * Extract DDL semantics from properties and use it to initiate the 
table. The related properties will be removed
+        * from the map after they're used.
+        */
+       public static void initiateTableFromProperties(Table hiveTable, 
Map<String, String> properties, HiveConf hiveConf) {
+               extractExternal(hiveTable, properties);
+               extractRowFormat(hiveTable.getSd(), properties);
+               extractStoredAs(hiveTable.getSd(), properties, hiveConf);
+               extractLocation(hiveTable.getSd(), properties);
+       }
+
+       private static void extractExternal(Table hiveTable, Map<String, 
String> properties) {
+               boolean external = 
Boolean.parseBoolean(properties.remove(TABLE_IS_EXTERNAL));
+               if (external) {
+                       
hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString());
+                       // follow Hive to set this property
+                       hiveTable.getParameters().put("EXTERNAL", "TRUE");
+               }
+       }
+
+       public static void extractLocation(StorageDescriptor sd, Map<String, 
String> properties) {
+               String location = properties.remove(TABLE_LOCATION_URI);
+               if (location != null) {
+                       sd.setLocation(location);
+               }
+       }
+
+       public static void extractRowFormat(StorageDescriptor sd, Map<String, 
String> properties) {
+               String serdeLib = properties.remove(SERDE_LIB_CLASS_NAME);
+               if (serdeLib != null) {
+                       sd.getSerdeInfo().setSerializationLib(serdeLib);
+               }
+               List<String> serdeProps = properties.keySet().stream()
+                               .filter(p -> 
p.startsWith(SERDE_INFO_PROP_PREFIX))
+                               .collect(Collectors.toList());
+               for (String prop : serdeProps) {
+                       String value = properties.remove(prop);
+                       // there was a typo of this property in hive, and was 
fixed in 3.0.0 -- https://issues.apache.org/jira/browse/HIVE-16922
+                       String key = 
prop.equals(HiveTableRowFormat.COLLECTION_DELIM) ?
+                                       serdeConstants.COLLECTION_DELIM : 
prop.substring(SERDE_INFO_PROP_PREFIX.length());
+                       sd.getSerdeInfo().getParameters().put(key, value);
+               }
+       }
+
+       private static void extractStoredAs(StorageDescriptor sd, Map<String, 
String> properties, HiveConf hiveConf) {
+               String storageFormat = properties.remove(STORED_AS_FILE_FORMAT);
+               String inputFormat = properties.remove(STORED_AS_INPUT_FORMAT);
+               String outputFormat = 
properties.remove(STORED_AS_OUTPUT_FORMAT);
+               if (storageFormat == null && inputFormat == null) {
+                       return;
+               }
+               if (storageFormat != null) {
+                       setStorageFormat(sd, storageFormat, hiveConf);
+               } else {
+                       sd.setInputFormat(inputFormat);
+                       sd.setOutputFormat(outputFormat);
+               }
+       }
+
+       public static void setStorageFormat(StorageDescriptor sd, String 
format, HiveConf hiveConf) {
+               StorageFormatDescriptor storageFormatDescriptor = 
storageFormatFactory.get(format);
+               checkArgument(storageFormatDescriptor != null, "Unknown storage 
format " + format);
+               sd.setInputFormat(storageFormatDescriptor.getInputFormat());
+               sd.setOutputFormat(storageFormatDescriptor.getOutputFormat());
+               String serdeLib = storageFormatDescriptor.getSerde();
+               if (serdeLib == null && storageFormatDescriptor instanceof 
RCFileStorageFormatDescriptor) {
+                       serdeLib = 
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE);
+               }
+               if (serdeLib != null) {
+                       sd.getSerdeInfo().setSerializationLib(serdeLib);
+               }
+       }
+
+       public static void setDefaultStorageFormat(StorageDescriptor sd, 
HiveConf hiveConf) {
+               
sd.getSerdeInfo().setSerializationLib(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTSERDE));
+               setStorageFormat(sd, 
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT), hiveConf);
+       }
+
        private static class ExpressionExtractor implements 
ExpressionVisitor<String> {
 
                // maps a supported function to its name
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
index 9f65c46..b874154 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java
@@ -18,25 +18,40 @@
 
 package org.apache.flink.connectors.hive;
 
+import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable;
+import org.apache.flink.table.HiveVersionTestUtil;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.util.FileUtils;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
 import org.junit.After;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test Hive syntax when Hive dialect is used.
@@ -109,7 +124,64 @@ public class HiveDialectTest {
                        String newLocation = warehouse + "/db1_new_location";
                        tableEnv.executeSql(String.format("alter database db1 
set location '%s'", newLocation));
                        db = hiveCatalog.getHiveDatabase("db1");
-                       assertEquals(newLocation, new 
URI(db.getLocationUri()).getPath());
+                       assertEquals(newLocation, 
locationPath(db.getLocationUri()));
                }
        }
+
+       @Test
+       public void testCreateTable() throws Exception {
+               String location = warehouse + "/external_location";
+               tableEnv.sqlUpdate(String.format(
+                               "create external table tbl1 (d decimal(10,0),ts 
timestamp) partitioned by (p string) location '%s' tblproperties('k1'='v1')", 
location));
+               Table hiveTable = hiveCatalog.getHiveTable(new 
ObjectPath("default", "tbl1"));
+               assertEquals(TableType.EXTERNAL_TABLE.toString(), 
hiveTable.getTableType());
+               assertEquals(1, hiveTable.getPartitionKeysSize());
+               assertEquals(location, 
locationPath(hiveTable.getSd().getLocation()));
+               assertEquals("v1", hiveTable.getParameters().get("k1"));
+               
assertFalse(hiveTable.getParameters().containsKey(SqlCreateHiveTable.TABLE_LOCATION_URI));
+
+               tableEnv.sqlUpdate("create table tbl2 (s 
struct<ts:timestamp,bin:binary>) stored as orc");
+               hiveTable = hiveCatalog.getHiveTable(new ObjectPath("default", 
"tbl2"));
+               assertEquals(TableType.MANAGED_TABLE.toString(), 
hiveTable.getTableType());
+               assertEquals(OrcSerde.class.getName(), 
hiveTable.getSd().getSerdeInfo().getSerializationLib());
+               assertEquals(OrcInputFormat.class.getName(), 
hiveTable.getSd().getInputFormat());
+               assertEquals(OrcOutputFormat.class.getName(), 
hiveTable.getSd().getOutputFormat());
+
+               tableEnv.sqlUpdate("create table tbl3 (m map<timestamp,binary>) 
partitioned by (p1 bigint,p2 tinyint) " +
+                               "row format serde 
'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe'");
+               hiveTable = hiveCatalog.getHiveTable(new ObjectPath("default", 
"tbl3"));
+               assertEquals(2, hiveTable.getPartitionKeysSize());
+               assertEquals(LazyBinarySerDe.class.getName(), 
hiveTable.getSd().getSerdeInfo().getSerializationLib());
+
+               tableEnv.sqlUpdate("create table tbl4 (x int,y smallint) row 
format delimited fields terminated by '|' lines terminated by '\n'");
+               hiveTable = hiveCatalog.getHiveTable(new ObjectPath("default", 
"tbl4"));
+               assertEquals("|", 
hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.FIELD_DELIM));
+               assertEquals("\n", 
hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.LINE_DELIM));
+
+               tableEnv.sqlUpdate("create table tbl5 (m map<bigint,string>) 
row format delimited collection items terminated by ';' " +
+                               "map keys terminated by ':'");
+               hiveTable = hiveCatalog.getHiveTable(new ObjectPath("default", 
"tbl5"));
+               assertEquals(";", 
hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.COLLECTION_DELIM));
+               assertEquals(":", 
hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.MAPKEY_DELIM));
+       }
+
+       @Test
+       public void testCreateTableWithConstraints() throws Exception {
+               Assume.assumeTrue(HiveVersionTestUtil.HIVE_310_OR_LATER);
+               tableEnv.sqlUpdate("create table tbl (x int,y int not null 
disable novalidate rely,z int not null disable novalidate norely," +
+                               "constraint pk_name primary key (x) rely)");
+               CatalogTable catalogTable = (CatalogTable) 
hiveCatalog.getTable(new ObjectPath("default", "tbl"));
+               TableSchema tableSchema = catalogTable.getSchema();
+               assertTrue("PK not present", 
tableSchema.getPrimaryKey().isPresent());
+               assertEquals("pk_name", 
tableSchema.getPrimaryKey().get().getName());
+               assertFalse("PK cannot be null", 
tableSchema.getFieldDataTypes()[0].getLogicalType().isNullable());
+               assertFalse("RELY NOT NULL should be reflected in schema",
+                               
tableSchema.getFieldDataTypes()[1].getLogicalType().isNullable());
+               assertTrue("NORELY NOT NULL shouldn't be reflected in schema",
+                               
tableSchema.getFieldDataTypes()[2].getLogicalType().isNullable());
+       }
+
+       private static String locationPath(String locationURI) throws 
URISyntaxException {
+               return new URI(locationURI).getPath();
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
index 23ccb45..44a5e5e 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -52,7 +52,8 @@ public class HiveCatalogTest {
                                schema,
                                new 
FileSystem().path("/test_path").toProperties(),
                                null
-                       ));
+                       ),
+                       HiveTestUtils.createHiveConf());
 
                Map<String, String> prop = hiveTable.getParameters();
                assertEquals(prop.remove(CatalogConfig.IS_GENERIC), 
String.valueOf("true"));
@@ -71,7 +72,8 @@ public class HiveCatalogTest {
                                schema,
                                map,
                                null
-                       ));
+                       ),
+                       HiveTestUtils.createHiveConf());
 
                Map<String, String> prop = hiveTable.getParameters();
                assertEquals(prop.remove(CatalogConfig.IS_GENERIC), 
String.valueOf(false));
diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
index 2ff7ebe..73af9f7 100644
--- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd
@@ -28,6 +28,19 @@
     "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner"
     "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseProps"
     "org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase"
+    "org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable"
+    
"org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableCreationContext"
+    
"org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat"
+    "org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs"
+    "org.apache.flink.sql.parser.hive.ddl.SqlDescribeHiveTable"
+    "org.apache.flink.sql.parser.hive.ddl.SqlHiveConstraintEnable"
+    "org.apache.flink.sql.parser.hive.ddl.SqlHiveConstraintRely"
+    "org.apache.flink.sql.parser.hive.ddl.SqlHiveConstraintTrait"
+    "org.apache.flink.sql.parser.hive.ddl.SqlHiveConstraintValidate"
+    "org.apache.flink.sql.parser.hive.type.ExtendedHiveStructTypeNameSpec"
+    "org.apache.flink.sql.parser.ddl.constraint.SqlConstraintEnforcement"
+    "org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint"
+    "org.apache.flink.sql.parser.ddl.constraint.SqlUniqueSpec"
     "org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
     "org.apache.flink.sql.parser.ddl.SqlAlterTable"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableProperties"
@@ -481,6 +494,8 @@
     "SqlUseDatabase()"
     "SqlAlterDatabase()"
     "SqlDescribeDatabase()"
+    "SqlShowTables()"
+    "SqlRichDescribeTable()"
   ]
 
   # List of methods for parsing custom literals.
@@ -493,6 +508,10 @@
   # Return type of method implementation should be "SqlTypeNameSpec".
   # Example: SqlParseTimeStampZ().
   dataTypeParserMethods: [
+    "ExtendedSqlBasicTypeName()"
+    "CustomizedCollectionsTypeName()"
+    "SqlMapTypeName()"
+    "ExtendedSqlRowTypeName()"
   ]
 
   # List of methods for parsing builtin function calls.
@@ -511,11 +530,13 @@
   # Each must accept arguments "(SqlParserPos pos, boolean replace)".
   createStatementParserMethods: [
     "SqlCreateDatabase"
+    "SqlCreateTemporary"
   ]
 
   # List of methods for parsing extensions to "DROP" calls.
   # Each must accept arguments "(Span s)".
   dropStatementParserMethods: [
+    "SqlDropTable"
     "SqlDropDatabase"
   ]
 
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
index 70e6dbb..da3175d 100644
--- 
a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
+++ 
b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl
@@ -218,4 +218,550 @@ SqlNode TableOption() :
     {
         return new SqlTableOption(key, value, getPos());
     }
-}
\ No newline at end of file
+}
+
+
+SqlCreate SqlCreateTemporary(Span s, boolean replace) :
+{
+  boolean isTemporary = false;
+  SqlCreate create;
+}
+{
+  [ <TEMPORARY>   {isTemporary = true;} ]
+
+  create = SqlCreateTable(s, isTemporary)
+  {
+    return create;
+  }
+}
+
+/**
+* Parse a "Show Tables" metadata query command.
+*/
+SqlShowTables SqlShowTables() :
+{
+}
+{
+    <SHOW> <TABLES>
+    {
+        return new SqlShowTables(getPos());
+    }
+}
+
+/**
+ * Here we add Rich in className to distinguish from calcite's original 
SqlDescribeTable.
+ */
+SqlRichDescribeTable SqlRichDescribeTable() :
+{
+    SqlIdentifier tableName;
+    SqlParserPos pos;
+    boolean extended = false;
+    boolean formatted = false;
+}
+{
+    <DESCRIBE> { pos = getPos();}
+    [ LOOKAHEAD(2)
+      ( <EXTENDED> { extended = true; }
+        |
+        <FORMATTED> { formatted = true; }
+      )
+    ]
+    tableName = CompoundIdentifier()
+    {
+        return new SqlDescribeHiveTable(pos, tableName, extended, formatted);
+    }
+}
+
+SqlCreate SqlCreateTable(Span s, boolean isTemporary) :
+{
+    final SqlParserPos startPos = s.pos();
+    SqlIdentifier tableName;
+    SqlNodeList primaryKeyList = SqlNodeList.EMPTY;
+    List<SqlNodeList> uniqueKeysList = new ArrayList<SqlNodeList>();
+    SqlNodeList columnList = SqlNodeList.EMPTY;
+       SqlCharStringLiteral comment = null;
+
+    SqlNodeList propertyList;
+    SqlNodeList partitionColumns = SqlNodeList.EMPTY;
+    SqlParserPos pos = startPos;
+    boolean isExternal = false;
+    HiveTableRowFormat rowFormat = null;
+    HiveTableStoredAs storedAs = null;
+    SqlCharStringLiteral location = null;
+    HiveTableCreationContext ctx = new HiveTableCreationContext();
+}
+{
+    [ <EXTERNAL> { isExternal = true; } ]
+    <TABLE> { propertyList = new SqlNodeList(getPos()); }
+
+    tableName = CompoundIdentifier()
+    [
+        <LPAREN> { pos = getPos(); }
+        TableColumn(ctx)
+        (
+            <COMMA> TableColumn(ctx)
+        )*
+        {
+            pos = pos.plus(getPos());
+            columnList = new SqlNodeList(ctx.columnList, pos);
+        }
+        <RPAREN>
+    ]
+    [ <COMMENT> <QUOTED_STRING> {
+        comment = createStringLiteral(token.image, getPos());
+    }]
+    [
+        <PARTITIONED> <BY>
+        <LPAREN>
+          {
+            List<SqlNode> partCols = new ArrayList();
+            if ( columnList == SqlNodeList.EMPTY ) {
+              columnList = new SqlNodeList(pos.plus(getPos()));
+            }
+          }
+          PartColumnDef(partCols)
+          (
+            <COMMA> PartColumnDef(partCols)
+          )*
+          {
+            partitionColumns = new SqlNodeList(partCols, pos.plus(getPos()));
+          }
+        <RPAREN>
+    ]
+    [
+      <ROW> <FORMAT>
+      rowFormat = TableRowFormat(getPos())
+    ]
+    [
+      <STORED> <AS>
+      storedAs = TableStoredAs(getPos())
+    ]
+    [
+      <LOCATION> <QUOTED_STRING>
+      { location = createStringLiteral(token.image, getPos()); }
+    ]
+    [
+        <TBLPROPERTIES>
+        {
+          SqlNodeList props = TableProperties();
+          for (SqlNode node : props) {
+            propertyList.add(node);
+          }
+        }
+    ]
+    {
+        return new SqlCreateHiveTable(startPos.plus(getPos()),
+                tableName,
+                columnList,
+                ctx,
+                propertyList,
+                partitionColumns,
+                comment,
+                isTemporary,
+                isExternal,
+                rowFormat,
+                storedAs,
+                location);
+    }
+}
+
+SqlDrop SqlDropTable(Span s, boolean replace) :
+{
+    SqlIdentifier tableName = null;
+    boolean ifExists = false;
+}
+{
+    <TABLE>
+
+    (
+        <IF> <EXISTS> { ifExists = true; }
+    |
+        { ifExists = false; }
+    )
+
+    tableName = CompoundIdentifier()
+
+    {
+         return new SqlDropTable(s.pos(), tableName, ifExists, false);
+    }
+}
+
+void TableColumn2(List<SqlNode> list) :
+{
+    SqlParserPos pos;
+    SqlIdentifier name;
+    SqlDataTypeSpec type;
+    SqlCharStringLiteral comment = null;
+}
+{
+    name = SimpleIdentifier()
+    type = ExtendedDataType()
+    [ <COMMENT> <QUOTED_STRING> {
+        comment = createStringLiteral(token.image, getPos());
+    }]
+    {
+        SqlTableColumn tableColumn = new SqlTableColumn(name, type, null, 
comment, getPos());
+        list.add(tableColumn);
+    }
+}
+
+void PartColumnDef(List<SqlNode> list) :
+{
+    SqlParserPos pos;
+    SqlIdentifier name;
+    SqlDataTypeSpec type;
+    SqlCharStringLiteral comment = null;
+}
+{
+    name = SimpleIdentifier()
+    type = DataType()
+    [ <COMMENT> <QUOTED_STRING> {
+        comment = createStringLiteral(token.image, getPos());
+    }]
+    {
+        type = type.withNullable(true);
+        SqlTableColumn tableColumn = new SqlTableColumn(name, type, null, 
comment, getPos());
+        list.add(tableColumn);
+    }
+}
+
+void TableColumn(HiveTableCreationContext context) :
+{
+}
+{
+    (LOOKAHEAD(2)
+        TableColumnWithConstraint(context)
+    |
+        TableConstraint(context)
+    )
+}
+
+/** Parses a table constraint for CREATE TABLE. */
+void TableConstraint(HiveTableCreationContext context) :
+{
+  SqlIdentifier constraintName = null;
+  final SqlLiteral spec;
+  final SqlNodeList columns;
+}
+{
+  [ constraintName = ConstraintName() ]
+  spec = TableConstraintSpec()
+  columns = ParenthesizedSimpleIdentifierList()
+  context.pkTrait = ConstraintTrait()
+  {
+      SqlTableConstraint tableConstraint = new SqlTableConstraint(
+                                            constraintName,
+                                            spec,
+                                            columns,
+                                            
SqlConstraintEnforcement.NOT_ENFORCED.symbol(getPos()),
+                                            true,
+                                            getPos());
+      context.constraints.add(tableConstraint);
+  }
+}
+
+SqlLiteral TableConstraintSpec() :
+{
+    SqlLiteral spec;
+}
+{
+    <PRIMARY> <KEY>
+    {
+        spec = SqlUniqueSpec.PRIMARY_KEY.symbol(getPos());
+        return spec;
+    }
+}
+
+SqlIdentifier ConstraintName() :
+{
+    SqlIdentifier constraintName;
+}
+{
+    <CONSTRAINT> constraintName = SimpleIdentifier() {
+        return constraintName;
+    }
+}
+
+void TableColumnWithConstraint(HiveTableCreationContext context) :
+{
+    SqlParserPos pos;
+    SqlIdentifier name;
+    SqlDataTypeSpec type;
+    SqlCharStringLiteral comment = null;
+    SqlHiveConstraintTrait constraintTrait;
+}
+{
+    name = SimpleIdentifier()
+    type = ExtendedDataType()
+    constraintTrait = ConstraintTrait()
+    {
+        // we have NOT NULL column constraint here
+        if (!type.getNullable()) {
+            if (context.notNullTraits == null) {
+                context.notNullTraits = new ArrayList();
+                context.notNullCols = new ArrayList();
+            }
+            context.notNullTraits.add(constraintTrait);
+            context.notNullCols.add(name);
+        }
+        SqlTableColumn tableColumn = new SqlTableColumn(name, type, null, 
comment, getPos());
+        context.columnList.add(tableColumn);
+    }
+    [ <COMMENT> <QUOTED_STRING> {
+        comment = createStringLiteral(token.image, getPos());
+    }]
+}
+
+SqlHiveConstraintTrait ConstraintTrait() :
+{
+  // a constraint is by default ENABLE NOVALIDATE RELY
+  SqlLiteral enable = SqlHiveConstraintEnable.ENABLE.symbol(getPos());
+  SqlLiteral validate = SqlHiveConstraintValidate.NOVALIDATE.symbol(getPos());
+  SqlLiteral rely = SqlHiveConstraintRely.RELY.symbol(getPos());
+}
+{
+  [
+    <ENABLE>  { enable = SqlHiveConstraintEnable.ENABLE.symbol(getPos()); }
+    |
+    <DISABLE> { enable = SqlHiveConstraintEnable.DISABLE.symbol(getPos()); }
+  ]
+  [
+    <NOVALIDATE>  { validate = 
SqlHiveConstraintValidate.NOVALIDATE.symbol(getPos()); }
+    |
+    <VALIDATE> { validate = 
SqlHiveConstraintValidate.VALIDATE.symbol(getPos()); }
+  ]
+  [
+    <RELY>  { rely = SqlHiveConstraintRely.RELY.symbol(getPos()); }
+    |
+    <NORELY> { rely = SqlHiveConstraintRely.NORELY.symbol(getPos()); }
+  ]
+  {  return new SqlHiveConstraintTrait(enable, validate, rely); }
+}
+
+/**
+* Different with {@link #DataType()}, we support a [ NULL | NOT NULL ] suffix 
syntax for both the
+* collection element data type and the data type itself.
+*
+* <p>See {@link #SqlDataTypeSpec} for the syntax details of {@link 
#DataType()}.
+*/
+SqlDataTypeSpec ExtendedDataType() :
+{
+    SqlTypeNameSpec typeName;
+    final Span s;
+    boolean elementNullable = true;
+    boolean nullable = true;
+}
+{
+    <#-- #DataType does not take care of the nullable attribute. -->
+    typeName = TypeName() {
+        s = span();
+    }
+    (
+        LOOKAHEAD(3)
+        elementNullable = NullableOptDefaultTrue()
+        typeName = ExtendedCollectionsTypeName(typeName, elementNullable)
+    )*
+    nullable = NullableOptDefaultTrue()
+    {
+        return new SqlDataTypeSpec(typeName, 
s.end(this)).withNullable(nullable);
+    }
+}
+
+HiveTableStoredAs TableStoredAs(SqlParserPos pos) :
+{
+  SqlIdentifier fileFormat = null;
+  SqlCharStringLiteral inputFormat = null;
+  SqlCharStringLiteral outputFormat = null;
+}
+{
+  (
+    LOOKAHEAD(2)
+    <INPUTFORMAT> <QUOTED_STRING> { inputFormat = 
createStringLiteral(token.image, getPos()); }
+    <OUTPUTFORMAT> <QUOTED_STRING> { outputFormat = 
createStringLiteral(token.image, getPos()); }
+    { return HiveTableStoredAs.ofInputOutputFormat(pos, inputFormat, 
outputFormat); }
+    |
+    fileFormat = SimpleIdentifier()
+    { return HiveTableStoredAs.ofFileFormat(pos, fileFormat); }
+  )
+}
+
+HiveTableRowFormat TableRowFormat(SqlParserPos pos) :
+{
+  SqlCharStringLiteral fieldsTerminator = null;
+  SqlCharStringLiteral escape = null;
+  SqlCharStringLiteral collectionTerminator = null;
+  SqlCharStringLiteral mapKeyTerminator = null;
+  SqlCharStringLiteral linesTerminator = null;
+  SqlCharStringLiteral nullAs = null;
+  SqlCharStringLiteral serdeClass = null;
+  SqlNodeList serdeProps = null;
+}
+{
+  (
+    <DELIMITED>
+      [ <FIELDS> <TERMINATED> <BY> <QUOTED_STRING>
+        { fieldsTerminator = createStringLiteral(token.image, getPos()); }
+        [ <ESCAPED> <BY> <QUOTED_STRING> { escape = 
createStringLiteral(token.image, getPos()); } ]
+      ]
+      [ <COLLECTION> <ITEMS> <TERMINATED> <BY> <QUOTED_STRING> { 
collectionTerminator = createStringLiteral(token.image, getPos()); } ]
+      [ <MAP> <KEYS> <TERMINATED> <BY> <QUOTED_STRING> { mapKeyTerminator = 
createStringLiteral(token.image, getPos()); } ]
+      [ <LINES> <TERMINATED> <BY> <QUOTED_STRING> { linesTerminator = 
createStringLiteral(token.image, getPos()); } ]
+      [ <NULL> <DEFINED> <AS> <QUOTED_STRING> { nullAs = 
createStringLiteral(token.image, getPos()); } ]
+      { return HiveTableRowFormat.withDelimited(pos, fieldsTerminator, escape, 
collectionTerminator, mapKeyTerminator, linesTerminator, nullAs); }
+    |
+    <SERDE> <QUOTED_STRING>
+    {
+      serdeClass = createStringLiteral(token.image, getPos());
+    }
+    [ <WITH> <SERDEPROPERTIES> serdeProps = TableProperties() ]
+    { return HiveTableRowFormat.withSerDe(pos, serdeClass, serdeProps); }
+  )
+}
+
+/**
+* A sql type name extended basic data type, it has a counterpart basic
+* sql type name but always represents as a special alias compared with the 
standard name.
+*
+* <p>For example, STRING is synonym of VARCHAR(INT_MAX).
+*/
+SqlTypeNameSpec ExtendedSqlBasicTypeName() :
+{
+    final SqlTypeName typeName;
+    final String typeAlias;
+    int precision = -1;
+}
+{
+    <STRING>
+    {
+        typeName = SqlTypeName.VARCHAR;
+        typeAlias = token.image;
+        precision = Integer.MAX_VALUE;
+        return new SqlAlienSystemTypeNameSpec(typeAlias, typeName, precision, 
getPos());
+    }
+}
+
+SqlTypeNameSpec CustomizedCollectionsTypeName() :
+{
+    final SqlTypeName collectionTypeName;
+    final SqlTypeNameSpec elementTypeName;
+    boolean elementNullable = true;
+}
+{
+    (
+        <ARRAY> {
+            collectionTypeName = SqlTypeName.ARRAY;
+        }
+    |
+        <MULTISET> {
+            collectionTypeName = SqlTypeName.MULTISET;
+        }
+    )
+    <LT>
+    elementTypeName = TypeName()
+    elementNullable = NullableOptDefaultTrue()
+    <GT>
+    {
+        return new ExtendedSqlCollectionTypeNameSpec(
+            elementTypeName,
+            elementNullable,
+            collectionTypeName,
+            false,
+            getPos());
+    }
+}
+
+SqlTypeNameSpec ExtendedCollectionsTypeName(
+        SqlTypeNameSpec elementTypeName,
+        boolean elementNullable) :
+{
+    final SqlTypeName collectionTypeName;
+}
+{
+    (
+        <MULTISET> { collectionTypeName = SqlTypeName.MULTISET; }
+    |
+         <ARRAY> { collectionTypeName = SqlTypeName.ARRAY; }
+    )
+    {
+        return new ExtendedSqlCollectionTypeNameSpec(
+             elementTypeName,
+             elementNullable,
+             collectionTypeName,
+             true,
+             getPos());
+    }
+}
+
+SqlTypeNameSpec SqlMapTypeName() :
+{
+    SqlDataTypeSpec keyType;
+    SqlDataTypeSpec valType;
+    boolean nullable = true;
+}
+{
+    <MAP>
+    <LT>
+    keyType = ExtendedDataType()
+    <COMMA>
+    valType = ExtendedDataType()
+    <GT>
+    {
+        return new SqlMapTypeNameSpec(keyType, valType, getPos());
+    }
+}
+
+void ExtendedFieldNameTypeCommaList(
+        List<SqlIdentifier> fieldNames,
+        List<SqlDataTypeSpec> fieldTypes,
+        List<SqlCharStringLiteral> comments) :
+{
+    SqlIdentifier fName;
+    SqlDataTypeSpec fType;
+    boolean nullable;
+}
+{
+    fName = SimpleIdentifier() <COLON> fType = ExtendedDataType()
+    {
+        fieldNames.add(fName);
+        fieldTypes.add(fType);
+    }
+    (
+        <QUOTED_STRING> {
+            comments.add(createStringLiteral(token.image, getPos()));
+        }
+    |
+        { comments.add(null); }
+    )
+    (
+        <COMMA>
+        fName = SimpleIdentifier() <COLON> fType = ExtendedDataType()
+        {
+            fieldNames.add(fName);
+            fieldTypes.add(fType);
+        }
+        (
+            <QUOTED_STRING> {
+                comments.add(createStringLiteral(token.image, getPos()));
+            }
+        |
+            { comments.add(null); }
+        )
+    )*
+}
+
+SqlTypeNameSpec ExtendedSqlRowTypeName() :
+{
+    List<SqlIdentifier> fieldNames = new ArrayList<SqlIdentifier>();
+    List<SqlDataTypeSpec> fieldTypes = new ArrayList<SqlDataTypeSpec>();
+    List<SqlCharStringLiteral> comments = new 
ArrayList<SqlCharStringLiteral>();
+}
+{
+    <STRUCT> <LT> ExtendedFieldNameTypeCommaList(fieldNames, fieldTypes, 
comments) <GT>
+    {
+        return new ExtendedHiveStructTypeNameSpec(
+            getPos(),
+            fieldNames,
+            fieldTypes,
+            comments);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java
 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java
index a367e1e..bfbd33a 100644
--- 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java
+++ 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java
@@ -18,31 +18,65 @@
 
 package org.apache.flink.sql.parser.hive.ddl;
 
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.sql.parser.hive.impl.ParseException;
+import org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec;
+import org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec;
+import org.apache.flink.sql.parser.type.SqlMapTypeNameSpec;
 import org.apache.flink.table.catalog.config.CatalogConfig;
 
+import org.apache.calcite.sql.SqlBasicTypeNameSpec;
+import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlTypeNameSpec;
 import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import static 
org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
 import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase.DATABASE_LOCATION_URI;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_INFO_PROP_PREFIX;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_LIB_CLASS_NAME;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_FILE_FORMAT;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_INPUT_FORMAT;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_OUTPUT_FORMAT;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_CONSTRAINT_TRAITS;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.PK_CONSTRAINT_TRAIT;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_IS_EXTERNAL;
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_LOCATION_URI;
 
 /**
  * Util methods for Hive DDL Sql nodes.
  */
 public class HiveDDLUtils {
 
+       // assume ';' cannot be used in column identifiers or type names, 
otherwise we need to implement escaping
+       public static final String COL_DELIMITER = ";";
+
+       private static final byte HIVE_CONSTRAINT_ENABLE = 1 << 2;
+       private static final byte HIVE_CONSTRAINT_VALIDATE = 1 << 1;
+       private static final byte HIVE_CONSTRAINT_RELY = 1;
+
        private static final Set<String> RESERVED_DB_PROPERTIES = new 
HashSet<>();
+       private static final Set<String> RESERVED_TABLE_PROPERTIES = new 
HashSet<>();
+       private static final List<String> RESERVED_TABLE_PROP_PREFIX = new 
ArrayList<>();
 
        static {
                RESERVED_DB_PROPERTIES.addAll(Arrays.asList(ALTER_DATABASE_OP, 
DATABASE_LOCATION_URI));
+
+               
RESERVED_TABLE_PROPERTIES.addAll(Arrays.asList(TABLE_LOCATION_URI,
+                               TABLE_IS_EXTERNAL, PK_CONSTRAINT_TRAIT, 
NOT_NULL_CONSTRAINT_TRAITS,
+                               STORED_AS_FILE_FORMAT, STORED_AS_INPUT_FORMAT, 
STORED_AS_OUTPUT_FORMAT, SERDE_LIB_CLASS_NAME));
+
+               RESERVED_TABLE_PROP_PREFIX.add(SERDE_INFO_PROP_PREFIX);
        }
 
        private HiveDDLUtils() {
@@ -52,6 +86,12 @@ public class HiveDDLUtils {
                return checkReservedProperties(RESERVED_DB_PROPERTIES, props, 
"Databases");
        }
 
+       public static SqlNodeList checkReservedTableProperties(SqlNodeList 
props) throws ParseException {
+               props = checkReservedProperties(RESERVED_TABLE_PROPERTIES, 
props, "Tables");
+               props = checkReservedPrefix(RESERVED_TABLE_PROP_PREFIX, props, 
"Tables");
+               return props;
+       }
+
        public static SqlNodeList ensureNonGeneric(SqlNodeList props) throws 
ParseException {
                for (SqlNode node : props) {
                        if (node instanceof SqlTableOption && ((SqlTableOption) 
node).getKeyString().equalsIgnoreCase(CatalogConfig.IS_GENERIC)) {
@@ -63,6 +103,28 @@ public class HiveDDLUtils {
                return props;
        }
 
+       private static SqlNodeList checkReservedPrefix(List<String> reserved, 
SqlNodeList properties, String metaType) throws ParseException {
+               if (properties == null) {
+                       return null;
+               }
+               Set<String> match = new HashSet<>();
+               for (SqlNode node : properties) {
+                       if (node instanceof SqlTableOption) {
+                               String key = ((SqlTableOption) 
node).getKeyString();
+                               for (String prefix : reserved) {
+                                       if (key.startsWith(prefix)) {
+                                               match.add(key);
+                                       }
+                               }
+                       }
+               }
+               if (!match.isEmpty()) {
+                       throw new ParseException(String.format(
+                                       "Properties %s have reserved prefix and 
shouldn't be used for Hive %s", match, metaType));
+               }
+               return properties;
+       }
+
        private static SqlNodeList checkReservedProperties(Set<String> 
reservedProperties, SqlNodeList properties,
                        String metaType) throws ParseException {
                if (properties == null) {
@@ -91,4 +153,163 @@ public class HiveDDLUtils {
        public static SqlTableOption toTableOption(String key, String value, 
SqlParserPos pos) {
                return new SqlTableOption(SqlLiteral.createCharString(key, 
pos), SqlLiteral.createCharString(value, pos), pos);
        }
+
+       public static void convertDataTypes(SqlNodeList columns) throws 
ParseException {
+               if (columns != null) {
+                       for (SqlNode node : columns) {
+                               convertDataTypes((SqlTableColumn) node);
+                       }
+               }
+       }
+
+       // Check and convert data types to comply with HiveQL, e.g. TIMESTAMP 
and BINARY
+       public static void convertDataTypes(SqlTableColumn column) throws 
ParseException {
+               column.setType(convertDataTypes(column.getType()));
+       }
+
+       private static SqlDataTypeSpec convertDataTypes(SqlDataTypeSpec 
typeSpec) throws ParseException {
+               SqlTypeNameSpec nameSpec = typeSpec.getTypeNameSpec();
+               SqlTypeNameSpec convertedNameSpec = convertDataTypes(nameSpec);
+               if (nameSpec != convertedNameSpec) {
+                       typeSpec = new SqlDataTypeSpec(convertedNameSpec, 
typeSpec.getTimeZone(), typeSpec.getNullable(),
+                                       typeSpec.getParserPosition());
+               }
+               return typeSpec;
+       }
+
+       private static SqlTypeNameSpec convertDataTypes(SqlTypeNameSpec 
nameSpec) throws ParseException {
+               if (nameSpec instanceof SqlBasicTypeNameSpec) {
+                       SqlBasicTypeNameSpec basicNameSpec = 
(SqlBasicTypeNameSpec) nameSpec;
+                       if 
(basicNameSpec.getTypeName().getSimple().equalsIgnoreCase(SqlTypeName.TIMESTAMP.name()))
 {
+                               if (basicNameSpec.getPrecision() < 0) {
+                                       nameSpec = new 
SqlBasicTypeNameSpec(SqlTypeName.TIMESTAMP, 9, basicNameSpec.getScale(),
+                                                       
basicNameSpec.getCharSetName(), basicNameSpec.getParserPos());
+                               }
+                       } else if 
(basicNameSpec.getTypeName().getSimple().equalsIgnoreCase(SqlTypeName.BINARY.name()))
 {
+                               if (basicNameSpec.getPrecision() < 0) {
+                                       nameSpec = new 
SqlBasicTypeNameSpec(SqlTypeName.VARBINARY, Integer.MAX_VALUE, 
basicNameSpec.getScale(),
+                                                       
basicNameSpec.getCharSetName(), basicNameSpec.getParserPos());
+                               }
+                       } else if 
(basicNameSpec.getTypeName().getSimple().equalsIgnoreCase(SqlTypeName.VARCHAR.name()))
 {
+                               if (basicNameSpec.getPrecision() < 0) {
+                                       throw new ParseException("VARCHAR 
precision is mandatory");
+                               }
+                       }
+               } else if (nameSpec instanceof 
ExtendedSqlCollectionTypeNameSpec) {
+                       ExtendedSqlCollectionTypeNameSpec collectionNameSpec = 
(ExtendedSqlCollectionTypeNameSpec) nameSpec;
+                       SqlTypeNameSpec elementNameSpec = 
collectionNameSpec.getElementTypeName();
+                       SqlTypeNameSpec convertedElementNameSpec = 
convertDataTypes(elementNameSpec);
+                       if (convertedElementNameSpec != elementNameSpec) {
+                               nameSpec = new 
ExtendedSqlCollectionTypeNameSpec(convertedElementNameSpec,
+                                               
collectionNameSpec.elementNullable(), 
collectionNameSpec.getCollectionTypeName(),
+                                               
collectionNameSpec.unparseAsStandard(), collectionNameSpec.getParserPos());
+                       }
+               } else if (nameSpec instanceof SqlMapTypeNameSpec) {
+                       SqlMapTypeNameSpec mapNameSpec = (SqlMapTypeNameSpec) 
nameSpec;
+                       SqlDataTypeSpec keyTypeSpec = mapNameSpec.getKeyType();
+                       SqlDataTypeSpec valTypeSpec = mapNameSpec.getValType();
+                       SqlDataTypeSpec convertedKeyTypeSpec = 
convertDataTypes(keyTypeSpec);
+                       SqlDataTypeSpec convertedValTypeSpec = 
convertDataTypes(valTypeSpec);
+                       if (keyTypeSpec != convertedKeyTypeSpec || valTypeSpec 
!= convertedValTypeSpec) {
+                               nameSpec = new 
SqlMapTypeNameSpec(convertedKeyTypeSpec, convertedValTypeSpec, 
nameSpec.getParserPos());
+                       }
+               } else if (nameSpec instanceof ExtendedSqlRowTypeNameSpec) {
+                       ExtendedSqlRowTypeNameSpec rowNameSpec = 
(ExtendedSqlRowTypeNameSpec) nameSpec;
+                       List<SqlDataTypeSpec> fieldTypeSpecs = 
rowNameSpec.getFieldTypes();
+                       List<SqlDataTypeSpec> convertedFieldTypeSpecs = new 
ArrayList<>(fieldTypeSpecs.size());
+                       boolean updated = false;
+                       for (SqlDataTypeSpec fieldTypeSpec : fieldTypeSpecs) {
+                               SqlDataTypeSpec convertedFieldTypeSpec = 
convertDataTypes(fieldTypeSpec);
+                               if (fieldTypeSpec != convertedFieldTypeSpec) {
+                                       updated = true;
+                               }
+                               
convertedFieldTypeSpecs.add(convertedFieldTypeSpec);
+                       }
+                       if (updated) {
+                               nameSpec = new 
ExtendedSqlRowTypeNameSpec(nameSpec.getParserPos(), rowNameSpec.getFieldNames(),
+                                               convertedFieldTypeSpecs, 
rowNameSpec.getComments(), rowNameSpec.unparseAsStandard());
+                       }
+               }
+               return nameSpec;
+       }
+
+       // a constraint is by default ENABLE NOVALIDATE RELY
+       public static byte defaultTrait() {
+               byte res = enableConstraint((byte) 0);
+               res = relyConstraint(res);
+               return res;
+       }
+
+       // returns a constraint trait that requires ENABLE
+       public static byte enableConstraint(byte trait) {
+               return (byte) (trait | HIVE_CONSTRAINT_ENABLE);
+       }
+
+       // returns a constraint trait that doesn't require ENABLE
+       public static byte disableConstraint(byte trait) {
+               return (byte) (trait & (~HIVE_CONSTRAINT_ENABLE));
+       }
+
+       // returns a constraint trait that requires VALIDATE
+       public static byte validateConstraint(byte trait) {
+               return (byte) (trait | HIVE_CONSTRAINT_VALIDATE);
+       }
+
+       // returns a constraint trait that doesn't require VALIDATE
+       public static byte noValidateConstraint(byte trait) {
+               return (byte) (trait & (~HIVE_CONSTRAINT_VALIDATE));
+       }
+
+       // returns a constraint trait that requires RELY
+       public static byte relyConstraint(byte trait) {
+               return (byte) (trait | HIVE_CONSTRAINT_RELY);
+       }
+
+       // returns a constraint trait that doesn't require RELY
+       public static byte noRelyConstraint(byte trait) {
+               return (byte) (trait & (~HIVE_CONSTRAINT_RELY));
+       }
+
+       // returns whether a trait requires ENABLE constraint
+       public static boolean requireEnableConstraint(byte trait) {
+               return (trait & HIVE_CONSTRAINT_ENABLE) != 0;
+       }
+
+       // returns whether a trait requires VALIDATE constraint
+       public static boolean requireValidateConstraint(byte trait) {
+               return (trait & HIVE_CONSTRAINT_VALIDATE) != 0;
+       }
+
+       // returns whether a trait requires RELY constraint
+       public static boolean requireRelyConstraint(byte trait) {
+               return (trait & HIVE_CONSTRAINT_RELY) != 0;
+       }
+
+       public static byte encodeConstraintTrait(SqlHiveConstraintTrait trait) {
+               byte res = 0;
+               if (trait.isEnable()) {
+                       res = enableConstraint(res);
+               }
+               if (trait.isValidate()) {
+                       res = validateConstraint(res);
+               }
+               if (trait.isRely()) {
+                       res = relyConstraint(res);
+               }
+               return res;
+       }
+
+       public static SqlNodeList deepCopyColList(SqlNodeList colList) {
+               SqlNodeList res = new SqlNodeList(colList.getParserPosition());
+               for (SqlNode node : colList) {
+                       SqlTableColumn col = (SqlTableColumn) node;
+                       res.add(new SqlTableColumn(
+                                       col.getName(),
+                                       col.getType(),
+                                       col.getConstraint().orElse(null),
+                                       col.getComment().orElse(null),
+                                       col.getParserPosition()));
+               }
+               return res;
+       }
 }
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java
 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java
index 01a47fe..d3aee55 100644
--- 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java
+++ 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java
@@ -30,7 +30,7 @@ import org.apache.calcite.sql.parser.SqlParserPos;
  */
 public abstract class SqlAlterHiveDatabase extends SqlAlterDatabase {
 
-       public static final String ALTER_DATABASE_OP = "alter.database.op";
+       public static final String ALTER_DATABASE_OP = "hive.alter.database.op";
 
        protected final SqlNodeList originPropList;
 
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabaseOwner.java
 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabaseOwner.java
index 67ac40e..27aba2f 100644
--- 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabaseOwner.java
+++ 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabaseOwner.java
@@ -31,8 +31,8 @@ import org.apache.calcite.sql.parser.SqlParserPos;
  */
 public class SqlAlterHiveDatabaseOwner extends SqlAlterHiveDatabase {
 
-       public static final String DATABASE_OWNER_NAME = "database.owner.name";
-       public static final String DATABASE_OWNER_TYPE = "database.owner.type";
+       public static final String DATABASE_OWNER_NAME = 
"hive.database.owner.name";
+       public static final String DATABASE_OWNER_TYPE = 
"hive.database.owner.type";
        public static final String USER_OWNER = "user";
        public static final String ROLE_OWNER = "role";
 
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java
 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java
index 628b397..d7e7bd5 100644
--- 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java
+++ 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java
@@ -36,7 +36,7 @@ import org.apache.calcite.sql.parser.SqlParserPos;
  */
 public class SqlCreateHiveDatabase extends SqlCreateDatabase {
 
-       public static final String DATABASE_LOCATION_URI = 
"database.location_uri";
+       public static final String DATABASE_LOCATION_URI = 
"hive.database.location-uri";
 
        private SqlNodeList originPropList;
        private final SqlCharStringLiteral location;
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
new file mode 100644
index 0000000..3fcec91
--- /dev/null
+++ 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.hive.impl.ParseException;
+import org.apache.flink.table.catalog.config.CatalogConfig;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * CREATE Table DDL for Hive dialect.
+ */
+public class SqlCreateHiveTable extends SqlCreateTable {
+
+       public static final String TABLE_LOCATION_URI = "hive.location-uri";
+       public static final String TABLE_IS_EXTERNAL = "hive.is-external";
+       public static final String PK_CONSTRAINT_TRAIT = 
"hive.pk.constraint.trait";
+       public static final String NOT_NULL_CONSTRAINT_TRAITS = 
"hive.not.null.constraint.traits";
+       public static final String NOT_NULL_COLS = "hive.not.null.cols";
+
+       private final HiveTableCreationContext creationContext;
+       private final SqlNodeList originPropList;
+       private final boolean isExternal;
+       private final HiveTableRowFormat rowFormat;
+       private final HiveTableStoredAs storedAs;
+       private final SqlCharStringLiteral location;
+       private final SqlNodeList origColList;
+       private final SqlNodeList origPartColList;
+
+       public SqlCreateHiveTable(SqlParserPos pos, SqlIdentifier tableName, 
SqlNodeList columnList,
+                       HiveTableCreationContext creationContext, SqlNodeList 
propertyList,
+                       SqlNodeList partColList, @Nullable SqlCharStringLiteral 
comment, boolean isTemporary, boolean isExternal,
+                       HiveTableRowFormat rowFormat, HiveTableStoredAs 
storedAs, SqlCharStringLiteral location) throws ParseException {
+
+               super(pos, tableName, columnList, creationContext.constraints,
+                               
HiveDDLUtils.checkReservedTableProperties(propertyList), 
extractPartColIdentifiers(partColList), null,
+                               comment, null, isTemporary);
+
+               this.origColList = HiveDDLUtils.deepCopyColList(columnList);
+               this.origPartColList = partColList != null ?
+                               HiveDDLUtils.deepCopyColList(partColList) :
+                               SqlNodeList.EMPTY;
+
+               HiveDDLUtils.convertDataTypes(columnList);
+               HiveDDLUtils.convertDataTypes(partColList);
+               originPropList = new SqlNodeList(propertyList.getList(), 
propertyList.getParserPosition());
+               // mark it as a hive table
+               HiveDDLUtils.ensureNonGeneric(propertyList);
+               
propertyList.add(HiveDDLUtils.toTableOption(CatalogConfig.IS_GENERIC, "false", 
pos));
+               // set external
+               this.isExternal = isExternal;
+               if (isExternal) {
+                       
propertyList.add(HiveDDLUtils.toTableOption(TABLE_IS_EXTERNAL, "true", pos));
+               }
+               // add partition cols to col list
+               if (partColList != null) {
+                       for (SqlNode partCol : partColList) {
+                               columnList.add(partCol);
+                       }
+               }
+               // set PRIMARY KEY
+               this.creationContext = creationContext;
+               for (SqlTableConstraint tableConstraint : 
creationContext.constraints) {
+                       if (!tableConstraint.isPrimaryKey()) {
+                               throw new ParseException("Only PrimaryKey table 
constraint is supported at the moment");
+                       } else {
+                               // PK list is taken care of by super class, we 
need to set trait here
+                               propertyList.add(HiveDDLUtils.toTableOption(
+                                               PK_CONSTRAINT_TRAIT,
+                                               
String.valueOf(HiveDDLUtils.encodeConstraintTrait(creationContext.pkTrait)),
+                                               
propertyList.getParserPosition()));
+                       }
+               }
+               // set NOT NULL
+               if (creationContext.notNullTraits != null) {
+                       // set traits
+                       String notNullTraits = 
creationContext.notNullTraits.stream()
+                                       
.map(HiveDDLUtils::encodeConstraintTrait)
+                                       .map(Object::toString)
+                                       
.collect(Collectors.joining(HiveDDLUtils.COL_DELIMITER));
+                       propertyList.add(HiveDDLUtils.toTableOption(
+                                       NOT_NULL_CONSTRAINT_TRAITS, 
notNullTraits, propertyList.getParserPosition()));
+                       // set col names
+                       String notNullCols = 
creationContext.notNullCols.stream()
+                                       .map(SqlIdentifier::getSimple)
+                                       
.collect(Collectors.joining(HiveDDLUtils.COL_DELIMITER));
+                       
propertyList.add(HiveDDLUtils.toTableOption(NOT_NULL_COLS, notNullCols, 
propertyList.getParserPosition()));
+               }
+               // set row format
+               this.rowFormat = rowFormat;
+               if (rowFormat != null) {
+                       for (SqlNode node : rowFormat.toPropList()) {
+                               propertyList.add(node);
+                       }
+               }
+               // set stored as
+               this.storedAs = storedAs;
+               if (storedAs != null) {
+                       for (SqlNode node : storedAs.toPropList()) {
+                               propertyList.add(node);
+                       }
+               }
+               // set location
+               this.location = location;
+               if (location != null) {
+                       
propertyList.add(HiveDDLUtils.toTableOption(TABLE_LOCATION_URI, location, 
location.getParserPosition()));
+               }
+       }
+
+       @Override
+       public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+               writer.keyword("CREATE");
+               if (isTemporary()) {
+                       writer.keyword("TEMPORARY");
+               }
+               if (isExternal) {
+                       writer.keyword("EXTERNAL");
+               }
+               writer.keyword("TABLE");
+               if (ifNotExists) {
+                       writer.keyword("IF NOT EXISTS");
+               }
+               getTableName().unparse(writer, leftPrec, rightPrec);
+               // columns
+               SqlWriter.Frame frame = 
writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
+               unparseColumns(creationContext,
+                               origColList,
+                               writer, leftPrec, rightPrec);
+               for (SqlTableConstraint tableConstraint : 
creationContext.constraints) {
+                       printIndent(writer);
+                       
tableConstraint.getConstraintNameIdentifier().ifPresent(name -> {
+                               writer.keyword("CONSTRAINT");
+                               name.unparse(writer, leftPrec, rightPrec);
+                       });
+                       writer.keyword("PRIMARY KEY");
+                       SqlWriter.Frame pkFrame = writer.startList("(", ")");
+                       tableConstraint.getColumns().unparse(writer, leftPrec, 
rightPrec);
+                       writer.endList(pkFrame);
+                       creationContext.pkTrait.unparse(writer, leftPrec, 
rightPrec);
+               }
+               writer.newlineAndIndent();
+               writer.endList(frame);
+               // table comment
+               getComment().ifPresent(c -> {
+                       writer.keyword("COMMENT");
+                       c.unparse(writer, leftPrec, rightPrec);
+               });
+               // partitions
+               if (origPartColList.size() > 0) {
+                       writer.newlineAndIndent();
+                       writer.keyword("PARTITIONED BY");
+                       SqlWriter.Frame partitionedByFrame = 
writer.startList("(", ")");
+                       unparseColumns(creationContext,
+                                       origPartColList,
+                                       writer, leftPrec, rightPrec);
+                       writer.newlineAndIndent();
+                       writer.endList(partitionedByFrame);
+               }
+               // row format
+               unparseRowFormat(writer, leftPrec, rightPrec);
+               // stored as
+               unparseStoredAs(writer, leftPrec, rightPrec);
+               // location
+               if (location != null) {
+                       writer.newlineAndIndent();
+                       writer.keyword("LOCATION");
+                       location.unparse(writer, leftPrec, rightPrec);
+               }
+               // properties
+               if (originPropList.size() > 0) {
+                       writer.newlineAndIndent();
+                       writer.keyword("TBLPROPERTIES");
+                       unparsePropList(originPropList, writer, leftPrec, 
rightPrec);
+               }
+       }
+
+       private void unparseStoredAs(SqlWriter writer, int leftPrec, int 
rightPrec) {
+               if (storedAs == null) {
+                       return;
+               }
+               writer.newlineAndIndent();
+               writer.keyword("STORED AS");
+               if (storedAs.fileFormat != null) {
+                       storedAs.fileFormat.unparse(writer, leftPrec, 
rightPrec);
+               } else {
+                       writer.keyword("INPUTFORMAT");
+                       storedAs.intputFormat.unparse(writer, leftPrec, 
rightPrec);
+                       writer.keyword("OUTPUTFORMAT");
+                       storedAs.outputFormat.unparse(writer, leftPrec, 
rightPrec);
+               }
+       }
+
+       private void unparseRowFormat(SqlWriter writer, int leftPrec, int 
rightPrec) {
+               if (rowFormat == null) {
+                       return;
+               }
+               writer.newlineAndIndent();
+               writer.keyword("ROW FORMAT");
+               if (rowFormat.serdeClass != null) {
+                       writer.keyword("SERDE");
+                       rowFormat.serdeClass.unparse(writer, leftPrec, 
rightPrec);
+                       if (rowFormat.serdeProps != null) {
+                               writer.keyword("WITH SERDEPROPERTIES");
+                               unparsePropList(rowFormat.serdeProps, writer, 
leftPrec, rightPrec);
+                       }
+               } else {
+                       writer.keyword("DELIMITED");
+                       SqlCharStringLiteral fieldDelim = 
rowFormat.delimitPropToValue.get(HiveTableRowFormat.FIELD_DELIM);
+                       SqlCharStringLiteral escape = 
rowFormat.delimitPropToValue.get(HiveTableRowFormat.ESCAPE_CHAR);
+                       if (fieldDelim != null) {
+                               writer.newlineAndIndent();
+                               writer.print("  ");
+                               writer.keyword("FIELDS TERMINATED BY");
+                               fieldDelim.unparse(writer, leftPrec, rightPrec);
+                               if (escape != null) {
+                                       writer.keyword("ESCAPED BY");
+                                       escape.unparse(writer, leftPrec, 
rightPrec);
+                               }
+                       }
+                       SqlCharStringLiteral collectionDelim = 
rowFormat.delimitPropToValue.get(HiveTableRowFormat.COLLECTION_DELIM);
+                       if (collectionDelim != null) {
+                               writer.newlineAndIndent();
+                               writer.print("  ");
+                               writer.keyword("COLLECTION ITEMS TERMINATED 
BY");
+                               collectionDelim.unparse(writer, leftPrec, 
rightPrec);
+                       }
+                       SqlCharStringLiteral mapKeyDelim = 
rowFormat.delimitPropToValue.get(HiveTableRowFormat.MAPKEY_DELIM);
+                       if (mapKeyDelim != null) {
+                               writer.newlineAndIndent();
+                               writer.print("  ");
+                               writer.keyword("MAP KEYS TERMINATED BY");
+                               mapKeyDelim.unparse(writer, leftPrec, 
rightPrec);
+                       }
+                       SqlCharStringLiteral lineDelim = 
rowFormat.delimitPropToValue.get(HiveTableRowFormat.LINE_DELIM);
+                       if (lineDelim != null) {
+                               writer.newlineAndIndent();
+                               writer.print("  ");
+                               writer.keyword("LINES TERMINATED BY");
+                               lineDelim.unparse(writer, leftPrec, rightPrec);
+                       }
+                       SqlCharStringLiteral nullAs = 
rowFormat.delimitPropToValue.get(HiveTableRowFormat.SERIALIZATION_NULL_FORMAT);
+                       if (nullAs != null) {
+                               writer.newlineAndIndent();
+                               writer.print("  ");
+                               writer.keyword("NULL DEFINED AS");
+                               nullAs.unparse(writer, leftPrec, rightPrec);
+                       }
+               }
+       }
+
+       private void unparsePropList(SqlNodeList propList, SqlWriter writer, 
int leftPrec, int rightPrec) {
+               SqlWriter.Frame withFrame = writer.startList("(", ")");
+               for (SqlNode property : propList) {
+                       printIndent(writer);
+                       property.unparse(writer, leftPrec, rightPrec);
+               }
+               writer.newlineAndIndent();
+               writer.endList(withFrame);
+       }
+
+       private void unparseColumns(HiveTableCreationContext context, 
SqlNodeList columns,
+                       SqlWriter writer, int leftPrec, int rightPrec) {
+               List<SqlHiveConstraintTrait> notNullTraits = 
context.notNullTraits;
+               int traitIndex = 0;
+               for (SqlNode node : columns) {
+                       printIndent(writer);
+                       SqlTableColumn column = (SqlTableColumn) node;
+                       column.getName().unparse(writer, leftPrec, rightPrec);
+                       writer.print(" ");
+                       column.getType().unparse(writer, leftPrec, rightPrec);
+                       if (column.getType().getNullable() != null && 
!column.getType().getNullable()) {
+                               writer.keyword("NOT NULL");
+                               notNullTraits.get(traitIndex++).unparse(writer, 
leftPrec, rightPrec);
+                       }
+                       column.getComment().ifPresent(c -> {
+                               writer.keyword("COMMENT");
+                               c.unparse(writer, leftPrec, rightPrec);
+                       });
+               }
+       }
+
+       // Extract the identifiers from partition col list -- that's what 
SqlCreateTable expects for partition keys
+       private static SqlNodeList extractPartColIdentifiers(SqlNodeList 
partCols) {
+               if (partCols == null) {
+                       return null;
+               }
+               SqlNodeList res = new SqlNodeList(partCols.getParserPosition());
+               for (SqlNode node : partCols) {
+                       SqlTableColumn partCol = (SqlTableColumn) node;
+                       res.add(partCol.getName());
+               }
+               return res;
+       }
+
+       /**
+        * Creation context for a Hive table.
+        */
+       public static class HiveTableCreationContext extends 
TableCreationContext {
+               public SqlHiveConstraintTrait pkTrait = null;
+               public List<SqlHiveConstraintTrait> notNullTraits = null;
+               // PK cols are also considered not null, so we need to remember 
explicit NN cols
+               public List<SqlIdentifier> notNullCols = null;
+       }
+
+       /**
+        * To represent STORED AS in CREATE TABLE DDL.
+        */
+       public static class HiveTableStoredAs {
+
+               public static final String STORED_AS_FILE_FORMAT = 
"hive.storage.file-format";
+               public static final String STORED_AS_INPUT_FORMAT = 
"hive.stored.as.input.format";
+               public static final String STORED_AS_OUTPUT_FORMAT = 
"hive.stored.as.output.format";
+
+               private final SqlParserPos pos;
+               private final SqlIdentifier fileFormat;
+               private final SqlCharStringLiteral intputFormat;
+               private final SqlCharStringLiteral outputFormat;
+
+               private HiveTableStoredAs(SqlParserPos pos, SqlIdentifier 
fileFormat, SqlCharStringLiteral intputFormat,
+                               SqlCharStringLiteral outputFormat) throws 
ParseException {
+                       this.pos = pos;
+                       this.fileFormat = fileFormat;
+                       this.intputFormat = intputFormat;
+                       this.outputFormat = outputFormat;
+                       validate();
+               }
+
+               private void validate() throws ParseException {
+                       if (fileFormat != null) {
+                               if (intputFormat != null || outputFormat != 
null) {
+                                       throw new ParseException("Both file 
format and input/output format are specified");
+                               }
+                       } else {
+                               if (intputFormat == null || outputFormat == 
null) {
+                                       throw new ParseException("Neither file 
format nor input/output format is specified");
+                               }
+                       }
+               }
+
+               public SqlNodeList toPropList() {
+                       SqlNodeList res = new SqlNodeList(pos);
+                       if (fileFormat != null) {
+                               
res.add(HiveDDLUtils.toTableOption(STORED_AS_FILE_FORMAT, 
fileFormat.getSimple(), fileFormat.getParserPosition()));
+                       } else {
+                               
res.add(HiveDDLUtils.toTableOption(STORED_AS_INPUT_FORMAT, intputFormat, 
intputFormat.getParserPosition()));
+                               
res.add(HiveDDLUtils.toTableOption(STORED_AS_OUTPUT_FORMAT, outputFormat, 
outputFormat.getParserPosition()));
+                       }
+                       return res;
+               }
+
+               public static HiveTableStoredAs ofFileFormat(SqlParserPos pos, 
SqlIdentifier fileFormat) throws ParseException {
+                       return new HiveTableStoredAs(pos, fileFormat, null, 
null);
+               }
+
+               public static HiveTableStoredAs 
ofInputOutputFormat(SqlParserPos pos, SqlCharStringLiteral intputFormat,
+                               SqlCharStringLiteral outputFormat) throws 
ParseException {
+                       return new HiveTableStoredAs(pos, null, intputFormat, 
outputFormat);
+               }
+       }
+
+       /**
+        * To represent ROW FORMAT in CREATE TABLE DDL.
+        */
+       public static class HiveTableRowFormat {
+
+               public static final String SERDE_LIB_CLASS_NAME = 
"hive.serde.lib.class.name";
+               public static final String SERDE_INFO_PROP_PREFIX = 
"hive.serde.info.prop.";
+               private static final String FIELD_DELIM = 
SERDE_INFO_PROP_PREFIX + "field.delim";
+               public static final String COLLECTION_DELIM = 
SERDE_INFO_PROP_PREFIX + "collection.delim";
+               private static final String ESCAPE_CHAR = 
SERDE_INFO_PROP_PREFIX + "escape.delim";
+               private static final String MAPKEY_DELIM = 
SERDE_INFO_PROP_PREFIX + "mapkey.delim";
+               private static final String LINE_DELIM = SERDE_INFO_PROP_PREFIX 
+ "line.delim";
+               private static final String SERIALIZATION_NULL_FORMAT = 
SERDE_INFO_PROP_PREFIX + "serialization.null.format";
+
+               private final SqlParserPos pos;
+               private final Map<String, SqlCharStringLiteral> 
delimitPropToValue = new LinkedHashMap<>();
+               private final SqlCharStringLiteral serdeClass;
+               private final SqlNodeList serdeProps;
+
+               private HiveTableRowFormat(SqlParserPos pos, 
SqlCharStringLiteral fieldsTerminator, SqlCharStringLiteral escape,
+                               SqlCharStringLiteral collectionTerminator, 
SqlCharStringLiteral mapKeyTerminator,
+                               SqlCharStringLiteral linesTerminator, 
SqlCharStringLiteral nullAs, SqlCharStringLiteral serdeClass,
+                               SqlNodeList serdeProps) throws ParseException {
+                       this.pos = pos;
+                       if (fieldsTerminator != null) {
+                               delimitPropToValue.put(FIELD_DELIM, 
fieldsTerminator);
+                       }
+                       if (escape != null) {
+                               delimitPropToValue.put(ESCAPE_CHAR, escape);
+                       }
+                       if (collectionTerminator != null) {
+                               delimitPropToValue.put(COLLECTION_DELIM, 
collectionTerminator);
+                       }
+                       if (mapKeyTerminator != null) {
+                               delimitPropToValue.put(MAPKEY_DELIM, 
mapKeyTerminator);
+                       }
+                       if (linesTerminator != null) {
+                               delimitPropToValue.put(LINE_DELIM, 
linesTerminator);
+                       }
+                       if (nullAs != null) {
+                               
delimitPropToValue.put(SERIALIZATION_NULL_FORMAT, nullAs);
+                       }
+                       this.serdeClass = serdeClass;
+                       this.serdeProps = serdeProps;
+                       validate();
+               }
+
+               private void validate() throws ParseException {
+                       if (!delimitPropToValue.isEmpty()) {
+                               if (serdeClass != null || serdeProps != null) {
+                                       throw new ParseException("Both 
DELIMITED and SERDE specified");
+                               }
+                       } else {
+                               if (serdeClass == null) {
+                                       throw new ParseException("Neither 
DELIMITED nor SERDE specified");
+                               }
+                       }
+               }
+
+               public SqlNodeList toPropList() {
+                       SqlNodeList list = new SqlNodeList(pos);
+                       if (serdeClass != null) {
+                               
list.add(HiveDDLUtils.toTableOption(SERDE_LIB_CLASS_NAME, serdeClass, pos));
+                               if (serdeProps != null) {
+                                       for (SqlNode sqlNode : serdeProps) {
+                                               SqlTableOption option = 
(SqlTableOption) sqlNode;
+                                               
list.add(HiveDDLUtils.toTableOption(SERDE_INFO_PROP_PREFIX + 
option.getKeyString(),
+                                                               
option.getValue(), pos));
+                                       }
+                               }
+                       } else {
+                               for (String prop : delimitPropToValue.keySet()) 
{
+                                       
list.add(HiveDDLUtils.toTableOption(prop, delimitPropToValue.get(prop), pos));
+                               }
+                       }
+                       return list;
+               }
+
+               public static HiveTableRowFormat withDelimited(SqlParserPos 
pos, SqlCharStringLiteral fieldsTerminator,
+                               SqlCharStringLiteral escape, 
SqlCharStringLiteral collectionTerminator,
+                               SqlCharStringLiteral mapKeyTerminator, 
SqlCharStringLiteral linesTerminator, SqlCharStringLiteral nullAs)
+                               throws ParseException {
+                       return new HiveTableRowFormat(pos, fieldsTerminator, 
escape, collectionTerminator, mapKeyTerminator,
+                                       linesTerminator, nullAs, null, null);
+               }
+
+               public static HiveTableRowFormat withSerDe(SqlParserPos pos, 
SqlCharStringLiteral serdeClass, SqlNodeList serdeProps)
+                               throws ParseException {
+                       return new HiveTableRowFormat(pos, null, null, null, 
null, null, null, serdeClass, serdeProps);
+               }
+       }
+}
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java
 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlDescribeHiveTable.java
similarity index 52%
copy from 
flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java
copy to 
flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlDescribeHiveTable.java
index 01a47fe..ee5e7c8 100644
--- 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java
+++ 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlDescribeHiveTable.java
@@ -18,43 +18,34 @@
 
 package org.apache.flink.sql.parser.hive.ddl;
 
-import org.apache.flink.sql.parser.ddl.SqlAlterDatabase;
+import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
 
 import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 
 /**
- * Abstract class for ALTER DDL of a Hive database.
+ * DESCRIBE DDL to describe a Hive table.
  */
-public abstract class SqlAlterHiveDatabase extends SqlAlterDatabase {
+public class SqlDescribeHiveTable extends SqlRichDescribeTable {
 
-       public static final String ALTER_DATABASE_OP = "alter.database.op";
+       private final boolean extended;
+       private final boolean formatted;
 
-       protected final SqlNodeList originPropList;
-
-       public SqlAlterHiveDatabase(SqlParserPos pos, SqlIdentifier 
databaseName, SqlNodeList propertyList) {
-               super(pos, databaseName, propertyList);
-               originPropList = new SqlNodeList(propertyList.getList(), 
propertyList.getParserPosition());
-               propertyList.add(HiveDDLUtils.toTableOption(ALTER_DATABASE_OP, 
getAlterOp().name(), pos));
+       public SqlDescribeHiveTable(SqlParserPos pos, SqlIdentifier 
tableNameIdentifier, boolean extended, boolean formatted) {
+               super(pos, tableNameIdentifier, extended || formatted);
+               this.extended = extended;
+               this.formatted = formatted;
        }
 
-       protected abstract AlterHiveDatabaseOp getAlterOp();
-
        @Override
        public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-               writer.keyword("ALTER DATABASE");
-               getDatabaseName().unparse(writer, leftPrec, rightPrec);
-               writer.keyword("SET");
-       }
-
-       /**
-        * Type of ALTER DATABASE operation.
-        */
-       public enum AlterHiveDatabaseOp {
-               CHANGE_PROPS,
-               CHANGE_LOCATION,
-               CHANGE_OWNER
+               writer.keyword("DESCRIBE");
+               if (extended) {
+                       writer.keyword("EXTENDED");
+               } else if (formatted) {
+                       writer.keyword("FORMATTED");
+               }
+               tableNameIdentifier.unparse(writer, leftPrec, rightPrec);
        }
 }
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintEnable.java
 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintEnable.java
new file mode 100644
index 0000000..297076f
--- /dev/null
+++ 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintEnable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * Enumeration of Hive constraint ENABLE.
+ */
+public enum SqlHiveConstraintEnable {
+
+       ENABLE,
+       DISABLE;
+
+       /**
+        * Creates a parse-tree node representing an occurrence of this keyword
+        * at a particular position in the parsed text.
+        */
+       public SqlLiteral symbol(SqlParserPos pos) {
+               return SqlLiteral.createSymbol(this, pos);
+       }
+}
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintRely.java
 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintRely.java
new file mode 100644
index 0000000..e63c8d9
--- /dev/null
+++ 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintRely.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * Enumeration of Hive constraint RELY.
+ */
+public enum SqlHiveConstraintRely {
+
+       RELY,
+       NORELY;
+
+       /**
+        * Creates a parse-tree node representing an occurrence of this keyword
+        * at a particular position in the parsed text.
+        */
+       public SqlLiteral symbol(SqlParserPos pos) {
+               return SqlLiteral.createSymbol(this, pos);
+       }
+}
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintTrait.java
 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintTrait.java
new file mode 100644
index 0000000..913831d
--- /dev/null
+++ 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintTrait.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlWriter;
+
+/**
+ * To describe a Hive constraint, i.e. ENABLE/DISABLE, VALIDATE/NOVALIDATE, 
RELY/NORELY.
+ */
+public class SqlHiveConstraintTrait {
+
+       private final SqlLiteral enable;
+       private final SqlLiteral validate;
+       private final SqlLiteral rely;
+
+       public SqlHiveConstraintTrait(
+                       SqlLiteral enable,
+                       SqlLiteral validate,
+                       SqlLiteral rely) {
+               this.enable = enable;
+               this.validate = validate;
+               this.rely = rely;
+       }
+
+       public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+               enable.unparse(writer, leftPrec, rightPrec);
+               validate.unparse(writer, leftPrec, rightPrec);
+               rely.unparse(writer, leftPrec, rightPrec);
+       }
+
+       public boolean isEnable() {
+               return enable.getValueAs(SqlHiveConstraintEnable.class) == 
SqlHiveConstraintEnable.ENABLE;
+       }
+
+       public boolean isValidate() {
+               return validate.getValueAs(SqlHiveConstraintValidate.class) == 
SqlHiveConstraintValidate.VALIDATE;
+       }
+
+       public boolean isRely() {
+               return rely.getValueAs(SqlHiveConstraintRely.class) == 
SqlHiveConstraintRely.RELY;
+       }
+}
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintValidate.java
 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintValidate.java
new file mode 100644
index 0000000..841cdbe
--- /dev/null
+++ 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintValidate.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.ddl;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * Enumeration of Hive constraint VALIDATE.
+ */
+public enum SqlHiveConstraintValidate {
+
+       VALIDATE,
+       NOVALIDATE;
+
+       /**
+        * Creates a parse-tree node representing an occurrence of this keyword
+        * at a particular position in the parsed text.
+        */
+       public SqlLiteral symbol(SqlParserPos pos) {
+               return SqlLiteral.createSymbol(this, pos);
+       }
+}
diff --git 
a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/type/ExtendedHiveStructTypeNameSpec.java
 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/type/ExtendedHiveStructTypeNameSpec.java
new file mode 100644
index 0000000..2af2ce4
--- /dev/null
+++ 
b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/type/ExtendedHiveStructTypeNameSpec.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.hive.type;
+
+import org.apache.flink.sql.parser.hive.impl.ParseException;
+import org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.Pair;
+
+import java.util.List;
+
+/**
+ * To represent STRUCT type in Hive.
+ */
+public class ExtendedHiveStructTypeNameSpec extends ExtendedSqlRowTypeNameSpec 
{
+
+       public ExtendedHiveStructTypeNameSpec(
+                       SqlParserPos pos,
+                       List<SqlIdentifier> fieldNames,
+                       List<SqlDataTypeSpec> fieldTypes,
+                       List<SqlCharStringLiteral> comments) throws 
ParseException {
+               super(pos, fieldNames, fieldTypes, comments, false);
+               if (fieldNames.isEmpty()) {
+                       throw new ParseException("STRUCT with no fields is not 
allowed");
+               }
+       }
+
+       @Override
+       public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+               writer.print("STRUCT");
+               SqlWriter.Frame frame = 
writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
+               int i = 0;
+               for (Pair<SqlIdentifier, SqlDataTypeSpec> p : 
Pair.zip(getFieldNames(), getFieldTypes())) {
+                       writer.sep(",", false);
+                       p.left.unparse(writer, 0, 0);
+                       p.right.unparse(writer, leftPrec, rightPrec);
+                       if (!p.right.getNullable()) {
+                               writer.keyword("NOT NULL");
+                       }
+                       if (getComments().get(i) != null) {
+                               getComments().get(i).unparse(writer, leftPrec, 
rightPrec);
+                       }
+                       i += 1;
+               }
+               writer.endList(frame);
+       }
+}
diff --git 
a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
 
b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
index 4200030..eb8a18b 100644
--- 
a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
@@ -98,4 +98,110 @@ public class FlinkHiveSqlParserImplTest extends 
SqlParserTest {
                sql("describe schema db1").ok("DESCRIBE DATABASE `DB1`");
                sql("describe database extended db1").ok("DESCRIBE DATABASE 
EXTENDED `DB1`");
        }
+
+       @Test
+       public void testShowTables() {
+               // TODO: support SHOW TABLES IN 'db_name' 'regex_pattern'
+               sql("show tables").ok("SHOW TABLES");
+       }
+
+       @Test
+       public void testDescribeTable() {
+               // TODO: support describe partition and columns
+               sql("describe tbl").ok("DESCRIBE `TBL`");
+               sql("describe extended tbl").ok("DESCRIBE EXTENDED `TBL`");
+               sql("describe formatted tbl").ok("DESCRIBE FORMATTED `TBL`");
+       }
+
+       @Test
+       public void testCreateTable() {
+               sql("create table tbl (x int) row format delimited fields 
terminated by ',' escaped by '\\' " +
+                               "collection items terminated by ',' map keys 
terminated by ':' lines terminated by '\n' " +
+                               "null defined as 'null' location 
'/path/to/table'")
+                               .ok("CREATE TABLE `TBL` (\n" +
+                                               "  `X`  INTEGER\n" +
+                                               ")\n" +
+                                               "ROW FORMAT DELIMITED\n" +
+                                               "  FIELDS TERMINATED BY ',' 
ESCAPED BY '\\'\n" +
+                                               "  COLLECTION ITEMS TERMINATED 
BY ','\n" +
+                                               "  MAP KEYS TERMINATED BY 
':'\n" +
+                                               "  LINES TERMINATED BY '\n'\n" +
+                                               "  NULL DEFINED AS 'null'\n" +
+                                               "LOCATION '/path/to/table'");
+               sql("create table tbl (x double) stored as orc tblproperties 
('k1'='v1')")
+                               .ok("CREATE TABLE `TBL` (\n" +
+                                               "  `X`  DOUBLE\n" +
+                                               ")\n" +
+                                               "STORED AS `ORC`\n" +
+                                               "TBLPROPERTIES (\n" +
+                                               "  'k1' = 'v1'\n" +
+                                               ")");
+               sql("create table tbl (x decimal(5,2)) row format serde 
'serde.class.name' with serdeproperties ('serde.k1'='v1')")
+                               .ok("CREATE TABLE `TBL` (\n" +
+                                               "  `X`  DECIMAL(5, 2)\n" +
+                                               ")\n" +
+                                               "ROW FORMAT SERDE 
'serde.class.name' WITH SERDEPROPERTIES (\n" +
+                                               "  'serde.k1' = 'v1'\n" +
+                                               ")");
+               sql("create table tbl (x date) row format delimited fields 
terminated by '\u0001' " +
+                               "stored as inputformat 'input.format.class' 
outputformat 'output.format.class'")
+                               .ok("CREATE TABLE `TBL` (\n" +
+                                               "  `X`  DATE\n" +
+                                               ")\n" +
+                                               "ROW FORMAT DELIMITED\n" +
+                                               "  FIELDS TERMINATED BY 
u&'\\0001'\n" +
+                                               "STORED AS INPUTFORMAT 
'input.format.class' OUTPUTFORMAT 'output.format.class'");
+               sql("create table tbl (x struct<f1:timestamp,f2:int>) 
partitioned by (p1 string,p2 bigint) stored as rcfile")
+                               .ok("CREATE TABLE `TBL` (\n" +
+                                               "  `X`  STRUCT< `F1` TIMESTAMP, 
`F2` INTEGER >\n" +
+                                               ")\n" +
+                                               "PARTITIONED BY (\n" +
+                                               "  `P1`  STRING,\n" +
+                                               "  `P2`  BIGINT\n" +
+                                               ")\n" +
+                                               "STORED AS `RCFILE`");
+               sql("create external table tbl (x 
map<timestamp,array<timestamp>>) location '/table/path'")
+                               .ok("CREATE EXTERNAL TABLE `TBL` (\n" +
+                                               "  `X`  MAP< TIMESTAMP, ARRAY< 
TIMESTAMP > >\n" +
+                                               ")\n" +
+                                               "LOCATION '/table/path'");
+               sql("create temporary table tbl (x varchar(50)) partitioned by 
(p timestamp)")
+                               .ok("CREATE TEMPORARY TABLE `TBL` (\n" +
+                                               "  `X`  VARCHAR(50)\n" +
+                                               ")\n" +
+                                               "PARTITIONED BY (\n" +
+                                               "  `P`  TIMESTAMP\n" +
+                                               ")");
+               sql("create table tbl (v varchar)").fails("VARCHAR precision is 
mandatory");
+               // TODO: support CLUSTERED BY, SKEWED BY, STORED BY, col 
constraints
+       }
+
+       @Test
+       public void testConstraints() {
+               sql("create table tbl (x int not null enable rely, y string not 
null disable novalidate norely)")
+                               .ok("CREATE TABLE `TBL` (\n" +
+                                               "  `X`  INTEGER NOT NULL ENABLE 
NOVALIDATE RELY,\n" +
+                                               "  `Y`  STRING NOT NULL DISABLE 
NOVALIDATE NORELY\n" +
+                                               ")");
+               sql("create table tbl (x int,y timestamp not null,z 
string,primary key (x,z) disable novalidate rely)")
+                               .ok("CREATE TABLE `TBL` (\n" +
+                                               "  `X`  INTEGER,\n" +
+                                               "  `Y`  TIMESTAMP NOT NULL 
ENABLE NOVALIDATE RELY,\n" +
+                                               "  `Z`  STRING,\n" +
+                                               "  PRIMARY KEY (`X`, `Z`) 
DISABLE NOVALIDATE RELY\n" +
+                                               ")");
+               sql("create table tbl (x binary,y date,z string,constraint 
pk_cons primary key(x))")
+                               .ok("CREATE TABLE `TBL` (\n" +
+                                               "  `X`  BINARY,\n" +
+                                               "  `Y`  DATE,\n" +
+                                               "  `Z`  STRING,\n" +
+                                               "  CONSTRAINT `PK_CONS` PRIMARY 
KEY (`X`) ENABLE NOVALIDATE RELY\n" +
+                                               ")");
+       }
+
+       @Test
+       public void testDropTable() {
+               sql("drop table tbl").ok("DROP TABLE `TBL`");
+               sql("drop table if exists cat.tbl").ok("DROP TABLE IF EXISTS 
`CAT`.`TBL`");
+       }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index a5e487b..2e84982 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -317,7 +317,7 @@ public class SqlCreateTable extends SqlCreate implements 
ExtendedSqlNode {
                }
        }
 
-       private void printIndent(SqlWriter writer) {
+       protected void printIndent(SqlWriter writer) {
                writer.sep(",", false);
                writer.newlineAndIndent();
                writer.print("  ");
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/constraint/SqlTableConstraint.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/constraint/SqlTableConstraint.java
index 0195907..e35fd95 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/constraint/SqlTableConstraint.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/constraint/SqlTableConstraint.java
@@ -123,6 +123,10 @@ public class SqlTableConstraint extends SqlCall {
                return Optional.ofNullable(ret);
        }
 
+       public Optional<SqlIdentifier> getConstraintNameIdentifier() {
+               return Optional.ofNullable(constraintName);
+       }
+
        public SqlNodeList getColumns() {
                return columns;
        }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeTable.java
index 72e150a..5fdfb09 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeTable.java
@@ -37,7 +37,7 @@ import java.util.List;
 public class SqlRichDescribeTable extends SqlCall {
 
        public static final SqlSpecialOperator OPERATOR = new 
SqlSpecialOperator("DESCRIBE TABLE", SqlKind.DESCRIBE_TABLE);
-       private final SqlIdentifier tableNameIdentifier;
+       protected final SqlIdentifier tableNameIdentifier;
        private boolean isExtended = false;
 
        public SqlRichDescribeTable(SqlParserPos pos, SqlIdentifier 
tableNameIdentifier, boolean isExtended) {
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlCollectionTypeNameSpec.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlCollectionTypeNameSpec.java
index a690392..cfd38e0 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlCollectionTypeNameSpec.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlCollectionTypeNameSpec.java
@@ -62,6 +62,18 @@ public class ExtendedSqlCollectionTypeNameSpec extends 
SqlCollectionTypeNameSpec
                this.unparseAsStandard = unparseAsStandard;
        }
 
+       public boolean elementNullable() {
+               return elementNullable;
+       }
+
+       public SqlTypeName getCollectionTypeName() {
+               return collectionTypeName;
+       }
+
+       public boolean unparseAsStandard() {
+               return unparseAsStandard;
+       }
+
        @Override
        public RelDataType deriveType(SqlValidator validator) {
                RelDataType elementType = 
getElementTypeName().deriveType(validator);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
index 37e841b..c589326 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
@@ -86,6 +86,10 @@ public class ExtendedSqlRowTypeNameSpec extends 
SqlTypeNameSpec {
                return comments;
        }
 
+       public boolean unparseAsStandard() {
+               return unparseAsStandard;
+       }
+
        @Override
        public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
                writer.print("ROW");

Reply via email to