This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 78b516c [FLINK-17431][sql-parser-hive][hive] Implement table DDLs for Hive di… (#11935) 78b516c is described below commit 78b516ca366e303a0e541aa69bebe1a15ad7a3c8 Author: Rui Li <li...@apache.org> AuthorDate: Wed May 13 11:00:13 2020 +0800 [FLINK-17431][sql-parser-hive][hive] Implement table DDLs for Hive di… (#11935) * [FLINK-17431][sql-parser-hive][hive] Implement table DDLs for Hive dialect part 1 * address comments and fix collection delim * renaming * rebase * remove unused property * don't support bytes * add enums for hive constraint traits * store NN col names and add test * disallow varchar w/o precision * fix unparse for describe table * fix unparse row format * fix col type unparse * address comments --- .../flink/table/catalog/hive/HiveCatalog.java | 82 +-- .../table/catalog/hive/util/HiveTableUtil.java | 98 ++++ .../flink/connectors/hive/HiveDialectTest.java | 74 ++- .../flink/table/catalog/hive/HiveCatalogTest.java | 6 +- .../src/main/codegen/data/Parser.tdd | 21 + .../src/main/codegen/includes/parserImpls.ftl | 548 ++++++++++++++++++++- .../flink/sql/parser/hive/ddl/HiveDDLUtils.java | 221 +++++++++ .../sql/parser/hive/ddl/SqlAlterHiveDatabase.java | 2 +- .../parser/hive/ddl/SqlAlterHiveDatabaseOwner.java | 4 +- .../sql/parser/hive/ddl/SqlCreateHiveDatabase.java | 2 +- .../sql/parser/hive/ddl/SqlCreateHiveTable.java | 484 ++++++++++++++++++ ...HiveDatabase.java => SqlDescribeHiveTable.java} | 41 +- .../parser/hive/ddl/SqlHiveConstraintEnable.java | 39 ++ .../sql/parser/hive/ddl/SqlHiveConstraintRely.java | 39 ++ .../parser/hive/ddl/SqlHiveConstraintTrait.java | 59 +++ .../parser/hive/ddl/SqlHiveConstraintValidate.java | 39 ++ .../hive/type/ExtendedHiveStructTypeNameSpec.java | 68 +++ .../parser/hive/FlinkHiveSqlParserImplTest.java | 106 ++++ .../flink/sql/parser/ddl/SqlCreateTable.java | 2 +- .../parser/ddl/constraint/SqlTableConstraint.java | 4 + .../flink/sql/parser/dql/SqlRichDescribeTable.java | 2 +- .../type/ExtendedSqlCollectionTypeNameSpec.java | 12 + .../parser/type/ExtendedSqlRowTypeNameSpec.java | 4 + 23 files changed, 1885 insertions(+), 72 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 128aa21..815fe41 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connectors.hive.HiveTableFactory; +import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils; import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.constraints.UniqueConstraint; @@ -70,6 +71,7 @@ import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.FunctionDefinitionFactory; import org.apache.flink.table.factories.TableFactory; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -92,9 +94,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; -import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; -import org.apache.hadoop.hive.ql.io.StorageFormatFactory; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,6 +105,7 @@ import java.net.MalformedURLException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -113,6 +113,9 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_COLS; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_CONSTRAINT_TRAITS; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.PK_CONSTRAINT_TRAIT; import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX; import static org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveIntStat; import static org.apache.flink.table.catalog.hive.util.HiveStatsUtil.parsePositiveLongStat; @@ -128,8 +131,6 @@ public class HiveCatalog extends AbstractCatalog { public static final String DEFAULT_DB = "default"; private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); - private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory(); - private static final String DEFAULT_HIVE_TABLE_STORAGE_FORMAT = "TextFile"; // Prefix used to distinguish Flink functions from Hive functions. // It's appended to Flink function's class name @@ -373,34 +374,51 @@ public class HiveCatalog extends AbstractCatalog { throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName()); } - Table hiveTable = instantiateHiveTable(tablePath, table); + Table hiveTable = instantiateHiveTable(tablePath, table, hiveConf); UniqueConstraint pkConstraint = null; List<String> notNullCols = new ArrayList<>(); boolean isGeneric = isGenericForCreate(table.getOptions()); if (!isGeneric) { pkConstraint = table.getSchema().getPrimaryKey().orElse(null); - for (int i = 0; i < table.getSchema().getFieldDataTypes().length; i++) { - if (!table.getSchema().getFieldDataTypes()[i].getLogicalType().isNullable()) { - notNullCols.add(table.getSchema().getFieldNames()[i]); + String nnColStr = hiveTable.getParameters().remove(NOT_NULL_COLS); + if (nnColStr != null) { + notNullCols.addAll(Arrays.asList(nnColStr.split(HiveDDLUtils.COL_DELIMITER))); + } else { + for (int i = 0; i < table.getSchema().getFieldDataTypes().length; i++) { + if (!table.getSchema().getFieldDataTypes()[i].getLogicalType().isNullable()) { + notNullCols.add(table.getSchema().getFieldNames()[i]); + } } } } try { if (pkConstraint != null || !notNullCols.isEmpty()) { - // for now we just create constraints that are DISABLE, NOVALIDATE, RELY - Byte[] pkTraits = new Byte[pkConstraint == null ? 0 : pkConstraint.getColumns().size()]; - Arrays.fill(pkTraits, HiveTableUtil.relyConstraint((byte) 0)); - Byte[] nnTraits = new Byte[notNullCols.size()]; - Arrays.fill(nnTraits, HiveTableUtil.relyConstraint((byte) 0)); + // extract constraint traits from table properties + String pkTraitStr = hiveTable.getParameters().remove(PK_CONSTRAINT_TRAIT); + byte pkTrait = pkTraitStr == null ? HiveDDLUtils.defaultTrait() : Byte.parseByte(pkTraitStr); + List<Byte> pkTraits = + Collections.nCopies(pkConstraint == null ? 0 : pkConstraint.getColumns().size(), pkTrait); + + List<Byte> nnTraits; + String nnTraitsStr = hiveTable.getParameters().remove(NOT_NULL_CONSTRAINT_TRAITS); + if (nnTraitsStr != null) { + String[] traits = nnTraitsStr.split(HiveDDLUtils.COL_DELIMITER); + Preconditions.checkArgument(traits.length == notNullCols.size(), + "Number of NOT NULL columns and constraint traits mismatch"); + nnTraits = Arrays.stream(traits).map(Byte::new).collect(Collectors.toList()); + } else { + nnTraits = Collections.nCopies(notNullCols.size(), HiveDDLUtils.defaultTrait()); + } + client.createTableWithConstraints( hiveTable, hiveConf, pkConstraint, - Arrays.asList(pkTraits), + pkTraits, notNullCols, - Arrays.asList(nnTraits)); + nnTraits); } else { client.createTable(hiveTable); } @@ -466,7 +484,7 @@ public class HiveCatalog extends AbstractCatalog { existingTable.getClass().getName(), newCatalogTable.getClass().getName())); } - Table newTable = instantiateHiveTable(tablePath, newCatalogTable); + Table newTable = instantiateHiveTable(tablePath, newCatalogTable, hiveConf); // client.alter_table() requires a valid location // thus, if new table doesn't have that, it reuses location of the old table @@ -621,7 +639,7 @@ public class HiveCatalog extends AbstractCatalog { } @VisibleForTesting - protected static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table) { + protected static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, HiveConf hiveConf) { if (!(table instanceof CatalogTableImpl) && !(table instanceof CatalogViewImpl)) { throw new CatalogException( "HiveCatalog only supports CatalogTableImpl and CatalogViewImpl"); @@ -641,7 +659,7 @@ public class HiveCatalog extends AbstractCatalog { // Hive table's StorageDescriptor StorageDescriptor sd = hiveTable.getSd(); - setStorageFormat(sd, properties); + HiveTableUtil.setDefaultStorageFormat(sd, hiveConf); if (isGeneric) { DescriptorProperties tableSchemaProps = new DescriptorProperties(true); @@ -653,7 +671,9 @@ public class HiveCatalog extends AbstractCatalog { properties.putAll(tableSchemaProps.asMap()); properties = maskFlinkProperties(properties); + hiveTable.setParameters(properties); } else { + HiveTableUtil.initiateTableFromProperties(hiveTable, properties, hiveConf); List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema()); // Table columns and partition keys if (table instanceof CatalogTableImpl) { @@ -673,6 +693,8 @@ public class HiveCatalog extends AbstractCatalog { } else { sd.setCols(allColumns); } + // Table properties + hiveTable.getParameters().putAll(properties); } if (table instanceof CatalogViewImpl) { @@ -685,23 +707,9 @@ public class HiveCatalog extends AbstractCatalog { hiveTable.setTableType(TableType.VIRTUAL_VIEW.name()); } - // Table properties - hiveTable.setParameters(properties); - return hiveTable; } - private static void setStorageFormat(StorageDescriptor sd, Map<String, String> properties) { - // TODO: allow user to specify storage format. Simply use text format for now - String storageFormatName = DEFAULT_HIVE_TABLE_STORAGE_FORMAT; - StorageFormatDescriptor storageFormatDescriptor = storageFormatFactory.get(storageFormatName); - checkArgument(storageFormatDescriptor != null, "Unknown storage format " + storageFormatName); - sd.setInputFormat(storageFormatDescriptor.getInputFormat()); - sd.setOutputFormat(storageFormatDescriptor.getOutputFormat()); - String serdeLib = storageFormatDescriptor.getSerde(); - sd.getSerdeInfo().setSerializationLib(serdeLib != null ? serdeLib : LazySimpleSerDe.class.getName()); - } - /** * Filter out Hive-created properties, and return Flink-created properties. * Note that 'is_generic' is a special key and this method will leave it as-is. @@ -770,7 +778,7 @@ public class HiveCatalog extends AbstractCatalog { } } catch (TException e) { throw new CatalogException( - String.format("Failed to create partition %s of table %s", partitionSpec, tablePath)); + String.format("Failed to create partition %s of table %s", partitionSpec, tablePath), e); } } @@ -900,7 +908,7 @@ public class HiveCatalog extends AbstractCatalog { checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); checkNotNull(newPartition, "New partition cannot be null"); - boolean isGeneric = Boolean.valueOf(newPartition.getProperties().get(CatalogConfig.IS_GENERIC)); + boolean isGeneric = isGenericForGet(newPartition.getProperties()); if (isGeneric) { throw new CatalogException("Currently only supports non-generic CatalogPartition"); @@ -942,8 +950,8 @@ public class HiveCatalog extends AbstractCatalog { // make sure both table and partition are generic, or neither is private static void ensureTableAndPartitionMatch(Table hiveTable, CatalogPartition catalogPartition) { - boolean tableIsGeneric = Boolean.valueOf(hiveTable.getParameters().get(CatalogConfig.IS_GENERIC)); - boolean partitionIsGeneric = Boolean.valueOf(catalogPartition.getProperties().get(CatalogConfig.IS_GENERIC)); + boolean tableIsGeneric = isGenericForGet(hiveTable.getParameters()); + boolean partitionIsGeneric = isGenericForGet(catalogPartition.getProperties()); if (tableIsGeneric != partitionIsGeneric) { throw new CatalogException(String.format("Cannot handle %s partition for %s table", diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java index a696708..656a93a 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java @@ -18,6 +18,7 @@ package org.apache.flink.table.catalog.hive.util; +import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.hive.client.HiveShim; @@ -33,9 +34,16 @@ import org.apache.flink.table.functions.hive.conversion.HiveInspectors; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.RCFileStorageFormatDescriptor; +import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; +import org.apache.hadoop.hive.ql.io.StorageFormatFactory; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import java.util.ArrayList; @@ -44,6 +52,16 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_INFO_PROP_PREFIX; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_LIB_CLASS_NAME; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_FILE_FORMAT; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_INPUT_FORMAT; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_OUTPUT_FORMAT; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_IS_EXTERNAL; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_LOCATION_URI; +import static org.apache.flink.util.Preconditions.checkArgument; /** * Utils to for Hive-backed table. @@ -54,6 +72,8 @@ public class HiveTableUtil { private static final byte HIVE_CONSTRAINT_VALIDATE = 1 << 1; private static final byte HIVE_CONSTRAINT_RELY = 1; + private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory(); + private HiveTableUtil() { } @@ -175,6 +195,84 @@ public class HiveTableUtil { return Optional.of(String.join(" and ", filters)); } + /** + * Extract DDL semantics from properties and use it to initiate the table. The related properties will be removed + * from the map after they're used. + */ + public static void initiateTableFromProperties(Table hiveTable, Map<String, String> properties, HiveConf hiveConf) { + extractExternal(hiveTable, properties); + extractRowFormat(hiveTable.getSd(), properties); + extractStoredAs(hiveTable.getSd(), properties, hiveConf); + extractLocation(hiveTable.getSd(), properties); + } + + private static void extractExternal(Table hiveTable, Map<String, String> properties) { + boolean external = Boolean.parseBoolean(properties.remove(TABLE_IS_EXTERNAL)); + if (external) { + hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString()); + // follow Hive to set this property + hiveTable.getParameters().put("EXTERNAL", "TRUE"); + } + } + + public static void extractLocation(StorageDescriptor sd, Map<String, String> properties) { + String location = properties.remove(TABLE_LOCATION_URI); + if (location != null) { + sd.setLocation(location); + } + } + + public static void extractRowFormat(StorageDescriptor sd, Map<String, String> properties) { + String serdeLib = properties.remove(SERDE_LIB_CLASS_NAME); + if (serdeLib != null) { + sd.getSerdeInfo().setSerializationLib(serdeLib); + } + List<String> serdeProps = properties.keySet().stream() + .filter(p -> p.startsWith(SERDE_INFO_PROP_PREFIX)) + .collect(Collectors.toList()); + for (String prop : serdeProps) { + String value = properties.remove(prop); + // there was a typo of this property in hive, and was fixed in 3.0.0 -- https://issues.apache.org/jira/browse/HIVE-16922 + String key = prop.equals(HiveTableRowFormat.COLLECTION_DELIM) ? + serdeConstants.COLLECTION_DELIM : prop.substring(SERDE_INFO_PROP_PREFIX.length()); + sd.getSerdeInfo().getParameters().put(key, value); + } + } + + private static void extractStoredAs(StorageDescriptor sd, Map<String, String> properties, HiveConf hiveConf) { + String storageFormat = properties.remove(STORED_AS_FILE_FORMAT); + String inputFormat = properties.remove(STORED_AS_INPUT_FORMAT); + String outputFormat = properties.remove(STORED_AS_OUTPUT_FORMAT); + if (storageFormat == null && inputFormat == null) { + return; + } + if (storageFormat != null) { + setStorageFormat(sd, storageFormat, hiveConf); + } else { + sd.setInputFormat(inputFormat); + sd.setOutputFormat(outputFormat); + } + } + + public static void setStorageFormat(StorageDescriptor sd, String format, HiveConf hiveConf) { + StorageFormatDescriptor storageFormatDescriptor = storageFormatFactory.get(format); + checkArgument(storageFormatDescriptor != null, "Unknown storage format " + format); + sd.setInputFormat(storageFormatDescriptor.getInputFormat()); + sd.setOutputFormat(storageFormatDescriptor.getOutputFormat()); + String serdeLib = storageFormatDescriptor.getSerde(); + if (serdeLib == null && storageFormatDescriptor instanceof RCFileStorageFormatDescriptor) { + serdeLib = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE); + } + if (serdeLib != null) { + sd.getSerdeInfo().setSerializationLib(serdeLib); + } + } + + public static void setDefaultStorageFormat(StorageDescriptor sd, HiveConf hiveConf) { + sd.getSerdeInfo().setSerializationLib(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTSERDE)); + setStorageFormat(sd, hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT), hiveConf); + } + private static class ExpressionExtractor implements ExpressionVisitor<String> { // maps a supported function to its name diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java index 9f65c46..b874154 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectTest.java @@ -18,25 +18,40 @@ package org.apache.flink.connectors.hive; +import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable; +import org.apache.flink.table.HiveVersionTestUtil; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.config.CatalogConfig; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.util.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.junit.After; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; import java.io.File; import java.net.URI; +import java.net.URISyntaxException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * Test Hive syntax when Hive dialect is used. @@ -109,7 +124,64 @@ public class HiveDialectTest { String newLocation = warehouse + "/db1_new_location"; tableEnv.executeSql(String.format("alter database db1 set location '%s'", newLocation)); db = hiveCatalog.getHiveDatabase("db1"); - assertEquals(newLocation, new URI(db.getLocationUri()).getPath()); + assertEquals(newLocation, locationPath(db.getLocationUri())); } } + + @Test + public void testCreateTable() throws Exception { + String location = warehouse + "/external_location"; + tableEnv.sqlUpdate(String.format( + "create external table tbl1 (d decimal(10,0),ts timestamp) partitioned by (p string) location '%s' tblproperties('k1'='v1')", location)); + Table hiveTable = hiveCatalog.getHiveTable(new ObjectPath("default", "tbl1")); + assertEquals(TableType.EXTERNAL_TABLE.toString(), hiveTable.getTableType()); + assertEquals(1, hiveTable.getPartitionKeysSize()); + assertEquals(location, locationPath(hiveTable.getSd().getLocation())); + assertEquals("v1", hiveTable.getParameters().get("k1")); + assertFalse(hiveTable.getParameters().containsKey(SqlCreateHiveTable.TABLE_LOCATION_URI)); + + tableEnv.sqlUpdate("create table tbl2 (s struct<ts:timestamp,bin:binary>) stored as orc"); + hiveTable = hiveCatalog.getHiveTable(new ObjectPath("default", "tbl2")); + assertEquals(TableType.MANAGED_TABLE.toString(), hiveTable.getTableType()); + assertEquals(OrcSerde.class.getName(), hiveTable.getSd().getSerdeInfo().getSerializationLib()); + assertEquals(OrcInputFormat.class.getName(), hiveTable.getSd().getInputFormat()); + assertEquals(OrcOutputFormat.class.getName(), hiveTable.getSd().getOutputFormat()); + + tableEnv.sqlUpdate("create table tbl3 (m map<timestamp,binary>) partitioned by (p1 bigint,p2 tinyint) " + + "row format serde 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe'"); + hiveTable = hiveCatalog.getHiveTable(new ObjectPath("default", "tbl3")); + assertEquals(2, hiveTable.getPartitionKeysSize()); + assertEquals(LazyBinarySerDe.class.getName(), hiveTable.getSd().getSerdeInfo().getSerializationLib()); + + tableEnv.sqlUpdate("create table tbl4 (x int,y smallint) row format delimited fields terminated by '|' lines terminated by '\n'"); + hiveTable = hiveCatalog.getHiveTable(new ObjectPath("default", "tbl4")); + assertEquals("|", hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.FIELD_DELIM)); + assertEquals("\n", hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.LINE_DELIM)); + + tableEnv.sqlUpdate("create table tbl5 (m map<bigint,string>) row format delimited collection items terminated by ';' " + + "map keys terminated by ':'"); + hiveTable = hiveCatalog.getHiveTable(new ObjectPath("default", "tbl5")); + assertEquals(";", hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.COLLECTION_DELIM)); + assertEquals(":", hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.MAPKEY_DELIM)); + } + + @Test + public void testCreateTableWithConstraints() throws Exception { + Assume.assumeTrue(HiveVersionTestUtil.HIVE_310_OR_LATER); + tableEnv.sqlUpdate("create table tbl (x int,y int not null disable novalidate rely,z int not null disable novalidate norely," + + "constraint pk_name primary key (x) rely)"); + CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(new ObjectPath("default", "tbl")); + TableSchema tableSchema = catalogTable.getSchema(); + assertTrue("PK not present", tableSchema.getPrimaryKey().isPresent()); + assertEquals("pk_name", tableSchema.getPrimaryKey().get().getName()); + assertFalse("PK cannot be null", tableSchema.getFieldDataTypes()[0].getLogicalType().isNullable()); + assertFalse("RELY NOT NULL should be reflected in schema", + tableSchema.getFieldDataTypes()[1].getLogicalType().isNullable()); + assertTrue("NORELY NOT NULL shouldn't be reflected in schema", + tableSchema.getFieldDataTypes()[2].getLogicalType().isNullable()); + } + + private static String locationPath(String locationURI) throws URISyntaxException { + return new URI(locationURI).getPath(); + } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java index 23ccb45..44a5e5e 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java @@ -52,7 +52,8 @@ public class HiveCatalogTest { schema, new FileSystem().path("/test_path").toProperties(), null - )); + ), + HiveTestUtils.createHiveConf()); Map<String, String> prop = hiveTable.getParameters(); assertEquals(prop.remove(CatalogConfig.IS_GENERIC), String.valueOf("true")); @@ -71,7 +72,8 @@ public class HiveCatalogTest { schema, map, null - )); + ), + HiveTestUtils.createHiveConf()); Map<String, String> prop = hiveTable.getParameters(); assertEquals(prop.remove(CatalogConfig.IS_GENERIC), String.valueOf(false)); diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd index 2ff7ebe..73af9f7 100644 --- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd @@ -28,6 +28,19 @@ "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner" "org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseProps" "org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase" + "org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable" + "org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableCreationContext" + "org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat" + "org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs" + "org.apache.flink.sql.parser.hive.ddl.SqlDescribeHiveTable" + "org.apache.flink.sql.parser.hive.ddl.SqlHiveConstraintEnable" + "org.apache.flink.sql.parser.hive.ddl.SqlHiveConstraintRely" + "org.apache.flink.sql.parser.hive.ddl.SqlHiveConstraintTrait" + "org.apache.flink.sql.parser.hive.ddl.SqlHiveConstraintValidate" + "org.apache.flink.sql.parser.hive.type.ExtendedHiveStructTypeNameSpec" + "org.apache.flink.sql.parser.ddl.constraint.SqlConstraintEnforcement" + "org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint" + "org.apache.flink.sql.parser.ddl.constraint.SqlUniqueSpec" "org.apache.flink.sql.parser.ddl.SqlAlterDatabase" "org.apache.flink.sql.parser.ddl.SqlAlterTable" "org.apache.flink.sql.parser.ddl.SqlAlterTableProperties" @@ -481,6 +494,8 @@ "SqlUseDatabase()" "SqlAlterDatabase()" "SqlDescribeDatabase()" + "SqlShowTables()" + "SqlRichDescribeTable()" ] # List of methods for parsing custom literals. @@ -493,6 +508,10 @@ # Return type of method implementation should be "SqlTypeNameSpec". # Example: SqlParseTimeStampZ(). dataTypeParserMethods: [ + "ExtendedSqlBasicTypeName()" + "CustomizedCollectionsTypeName()" + "SqlMapTypeName()" + "ExtendedSqlRowTypeName()" ] # List of methods for parsing builtin function calls. @@ -511,11 +530,13 @@ # Each must accept arguments "(SqlParserPos pos, boolean replace)". createStatementParserMethods: [ "SqlCreateDatabase" + "SqlCreateTemporary" ] # List of methods for parsing extensions to "DROP" calls. # Each must accept arguments "(Span s)". dropStatementParserMethods: [ + "SqlDropTable" "SqlDropDatabase" ] diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl index 70e6dbb..da3175d 100644 --- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl @@ -218,4 +218,550 @@ SqlNode TableOption() : { return new SqlTableOption(key, value, getPos()); } -} \ No newline at end of file +} + + +SqlCreate SqlCreateTemporary(Span s, boolean replace) : +{ + boolean isTemporary = false; + SqlCreate create; +} +{ + [ <TEMPORARY> {isTemporary = true;} ] + + create = SqlCreateTable(s, isTemporary) + { + return create; + } +} + +/** +* Parse a "Show Tables" metadata query command. +*/ +SqlShowTables SqlShowTables() : +{ +} +{ + <SHOW> <TABLES> + { + return new SqlShowTables(getPos()); + } +} + +/** + * Here we add Rich in className to distinguish from calcite's original SqlDescribeTable. + */ +SqlRichDescribeTable SqlRichDescribeTable() : +{ + SqlIdentifier tableName; + SqlParserPos pos; + boolean extended = false; + boolean formatted = false; +} +{ + <DESCRIBE> { pos = getPos();} + [ LOOKAHEAD(2) + ( <EXTENDED> { extended = true; } + | + <FORMATTED> { formatted = true; } + ) + ] + tableName = CompoundIdentifier() + { + return new SqlDescribeHiveTable(pos, tableName, extended, formatted); + } +} + +SqlCreate SqlCreateTable(Span s, boolean isTemporary) : +{ + final SqlParserPos startPos = s.pos(); + SqlIdentifier tableName; + SqlNodeList primaryKeyList = SqlNodeList.EMPTY; + List<SqlNodeList> uniqueKeysList = new ArrayList<SqlNodeList>(); + SqlNodeList columnList = SqlNodeList.EMPTY; + SqlCharStringLiteral comment = null; + + SqlNodeList propertyList; + SqlNodeList partitionColumns = SqlNodeList.EMPTY; + SqlParserPos pos = startPos; + boolean isExternal = false; + HiveTableRowFormat rowFormat = null; + HiveTableStoredAs storedAs = null; + SqlCharStringLiteral location = null; + HiveTableCreationContext ctx = new HiveTableCreationContext(); +} +{ + [ <EXTERNAL> { isExternal = true; } ] + <TABLE> { propertyList = new SqlNodeList(getPos()); } + + tableName = CompoundIdentifier() + [ + <LPAREN> { pos = getPos(); } + TableColumn(ctx) + ( + <COMMA> TableColumn(ctx) + )* + { + pos = pos.plus(getPos()); + columnList = new SqlNodeList(ctx.columnList, pos); + } + <RPAREN> + ] + [ <COMMENT> <QUOTED_STRING> { + comment = createStringLiteral(token.image, getPos()); + }] + [ + <PARTITIONED> <BY> + <LPAREN> + { + List<SqlNode> partCols = new ArrayList(); + if ( columnList == SqlNodeList.EMPTY ) { + columnList = new SqlNodeList(pos.plus(getPos())); + } + } + PartColumnDef(partCols) + ( + <COMMA> PartColumnDef(partCols) + )* + { + partitionColumns = new SqlNodeList(partCols, pos.plus(getPos())); + } + <RPAREN> + ] + [ + <ROW> <FORMAT> + rowFormat = TableRowFormat(getPos()) + ] + [ + <STORED> <AS> + storedAs = TableStoredAs(getPos()) + ] + [ + <LOCATION> <QUOTED_STRING> + { location = createStringLiteral(token.image, getPos()); } + ] + [ + <TBLPROPERTIES> + { + SqlNodeList props = TableProperties(); + for (SqlNode node : props) { + propertyList.add(node); + } + } + ] + { + return new SqlCreateHiveTable(startPos.plus(getPos()), + tableName, + columnList, + ctx, + propertyList, + partitionColumns, + comment, + isTemporary, + isExternal, + rowFormat, + storedAs, + location); + } +} + +SqlDrop SqlDropTable(Span s, boolean replace) : +{ + SqlIdentifier tableName = null; + boolean ifExists = false; +} +{ + <TABLE> + + ( + <IF> <EXISTS> { ifExists = true; } + | + { ifExists = false; } + ) + + tableName = CompoundIdentifier() + + { + return new SqlDropTable(s.pos(), tableName, ifExists, false); + } +} + +void TableColumn2(List<SqlNode> list) : +{ + SqlParserPos pos; + SqlIdentifier name; + SqlDataTypeSpec type; + SqlCharStringLiteral comment = null; +} +{ + name = SimpleIdentifier() + type = ExtendedDataType() + [ <COMMENT> <QUOTED_STRING> { + comment = createStringLiteral(token.image, getPos()); + }] + { + SqlTableColumn tableColumn = new SqlTableColumn(name, type, null, comment, getPos()); + list.add(tableColumn); + } +} + +void PartColumnDef(List<SqlNode> list) : +{ + SqlParserPos pos; + SqlIdentifier name; + SqlDataTypeSpec type; + SqlCharStringLiteral comment = null; +} +{ + name = SimpleIdentifier() + type = DataType() + [ <COMMENT> <QUOTED_STRING> { + comment = createStringLiteral(token.image, getPos()); + }] + { + type = type.withNullable(true); + SqlTableColumn tableColumn = new SqlTableColumn(name, type, null, comment, getPos()); + list.add(tableColumn); + } +} + +void TableColumn(HiveTableCreationContext context) : +{ +} +{ + (LOOKAHEAD(2) + TableColumnWithConstraint(context) + | + TableConstraint(context) + ) +} + +/** Parses a table constraint for CREATE TABLE. */ +void TableConstraint(HiveTableCreationContext context) : +{ + SqlIdentifier constraintName = null; + final SqlLiteral spec; + final SqlNodeList columns; +} +{ + [ constraintName = ConstraintName() ] + spec = TableConstraintSpec() + columns = ParenthesizedSimpleIdentifierList() + context.pkTrait = ConstraintTrait() + { + SqlTableConstraint tableConstraint = new SqlTableConstraint( + constraintName, + spec, + columns, + SqlConstraintEnforcement.NOT_ENFORCED.symbol(getPos()), + true, + getPos()); + context.constraints.add(tableConstraint); + } +} + +SqlLiteral TableConstraintSpec() : +{ + SqlLiteral spec; +} +{ + <PRIMARY> <KEY> + { + spec = SqlUniqueSpec.PRIMARY_KEY.symbol(getPos()); + return spec; + } +} + +SqlIdentifier ConstraintName() : +{ + SqlIdentifier constraintName; +} +{ + <CONSTRAINT> constraintName = SimpleIdentifier() { + return constraintName; + } +} + +void TableColumnWithConstraint(HiveTableCreationContext context) : +{ + SqlParserPos pos; + SqlIdentifier name; + SqlDataTypeSpec type; + SqlCharStringLiteral comment = null; + SqlHiveConstraintTrait constraintTrait; +} +{ + name = SimpleIdentifier() + type = ExtendedDataType() + constraintTrait = ConstraintTrait() + { + // we have NOT NULL column constraint here + if (!type.getNullable()) { + if (context.notNullTraits == null) { + context.notNullTraits = new ArrayList(); + context.notNullCols = new ArrayList(); + } + context.notNullTraits.add(constraintTrait); + context.notNullCols.add(name); + } + SqlTableColumn tableColumn = new SqlTableColumn(name, type, null, comment, getPos()); + context.columnList.add(tableColumn); + } + [ <COMMENT> <QUOTED_STRING> { + comment = createStringLiteral(token.image, getPos()); + }] +} + +SqlHiveConstraintTrait ConstraintTrait() : +{ + // a constraint is by default ENABLE NOVALIDATE RELY + SqlLiteral enable = SqlHiveConstraintEnable.ENABLE.symbol(getPos()); + SqlLiteral validate = SqlHiveConstraintValidate.NOVALIDATE.symbol(getPos()); + SqlLiteral rely = SqlHiveConstraintRely.RELY.symbol(getPos()); +} +{ + [ + <ENABLE> { enable = SqlHiveConstraintEnable.ENABLE.symbol(getPos()); } + | + <DISABLE> { enable = SqlHiveConstraintEnable.DISABLE.symbol(getPos()); } + ] + [ + <NOVALIDATE> { validate = SqlHiveConstraintValidate.NOVALIDATE.symbol(getPos()); } + | + <VALIDATE> { validate = SqlHiveConstraintValidate.VALIDATE.symbol(getPos()); } + ] + [ + <RELY> { rely = SqlHiveConstraintRely.RELY.symbol(getPos()); } + | + <NORELY> { rely = SqlHiveConstraintRely.NORELY.symbol(getPos()); } + ] + { return new SqlHiveConstraintTrait(enable, validate, rely); } +} + +/** +* Different with {@link #DataType()}, we support a [ NULL | NOT NULL ] suffix syntax for both the +* collection element data type and the data type itself. +* +* <p>See {@link #SqlDataTypeSpec} for the syntax details of {@link #DataType()}. +*/ +SqlDataTypeSpec ExtendedDataType() : +{ + SqlTypeNameSpec typeName; + final Span s; + boolean elementNullable = true; + boolean nullable = true; +} +{ + <#-- #DataType does not take care of the nullable attribute. --> + typeName = TypeName() { + s = span(); + } + ( + LOOKAHEAD(3) + elementNullable = NullableOptDefaultTrue() + typeName = ExtendedCollectionsTypeName(typeName, elementNullable) + )* + nullable = NullableOptDefaultTrue() + { + return new SqlDataTypeSpec(typeName, s.end(this)).withNullable(nullable); + } +} + +HiveTableStoredAs TableStoredAs(SqlParserPos pos) : +{ + SqlIdentifier fileFormat = null; + SqlCharStringLiteral inputFormat = null; + SqlCharStringLiteral outputFormat = null; +} +{ + ( + LOOKAHEAD(2) + <INPUTFORMAT> <QUOTED_STRING> { inputFormat = createStringLiteral(token.image, getPos()); } + <OUTPUTFORMAT> <QUOTED_STRING> { outputFormat = createStringLiteral(token.image, getPos()); } + { return HiveTableStoredAs.ofInputOutputFormat(pos, inputFormat, outputFormat); } + | + fileFormat = SimpleIdentifier() + { return HiveTableStoredAs.ofFileFormat(pos, fileFormat); } + ) +} + +HiveTableRowFormat TableRowFormat(SqlParserPos pos) : +{ + SqlCharStringLiteral fieldsTerminator = null; + SqlCharStringLiteral escape = null; + SqlCharStringLiteral collectionTerminator = null; + SqlCharStringLiteral mapKeyTerminator = null; + SqlCharStringLiteral linesTerminator = null; + SqlCharStringLiteral nullAs = null; + SqlCharStringLiteral serdeClass = null; + SqlNodeList serdeProps = null; +} +{ + ( + <DELIMITED> + [ <FIELDS> <TERMINATED> <BY> <QUOTED_STRING> + { fieldsTerminator = createStringLiteral(token.image, getPos()); } + [ <ESCAPED> <BY> <QUOTED_STRING> { escape = createStringLiteral(token.image, getPos()); } ] + ] + [ <COLLECTION> <ITEMS> <TERMINATED> <BY> <QUOTED_STRING> { collectionTerminator = createStringLiteral(token.image, getPos()); } ] + [ <MAP> <KEYS> <TERMINATED> <BY> <QUOTED_STRING> { mapKeyTerminator = createStringLiteral(token.image, getPos()); } ] + [ <LINES> <TERMINATED> <BY> <QUOTED_STRING> { linesTerminator = createStringLiteral(token.image, getPos()); } ] + [ <NULL> <DEFINED> <AS> <QUOTED_STRING> { nullAs = createStringLiteral(token.image, getPos()); } ] + { return HiveTableRowFormat.withDelimited(pos, fieldsTerminator, escape, collectionTerminator, mapKeyTerminator, linesTerminator, nullAs); } + | + <SERDE> <QUOTED_STRING> + { + serdeClass = createStringLiteral(token.image, getPos()); + } + [ <WITH> <SERDEPROPERTIES> serdeProps = TableProperties() ] + { return HiveTableRowFormat.withSerDe(pos, serdeClass, serdeProps); } + ) +} + +/** +* A sql type name extended basic data type, it has a counterpart basic +* sql type name but always represents as a special alias compared with the standard name. +* +* <p>For example, STRING is synonym of VARCHAR(INT_MAX). +*/ +SqlTypeNameSpec ExtendedSqlBasicTypeName() : +{ + final SqlTypeName typeName; + final String typeAlias; + int precision = -1; +} +{ + <STRING> + { + typeName = SqlTypeName.VARCHAR; + typeAlias = token.image; + precision = Integer.MAX_VALUE; + return new SqlAlienSystemTypeNameSpec(typeAlias, typeName, precision, getPos()); + } +} + +SqlTypeNameSpec CustomizedCollectionsTypeName() : +{ + final SqlTypeName collectionTypeName; + final SqlTypeNameSpec elementTypeName; + boolean elementNullable = true; +} +{ + ( + <ARRAY> { + collectionTypeName = SqlTypeName.ARRAY; + } + | + <MULTISET> { + collectionTypeName = SqlTypeName.MULTISET; + } + ) + <LT> + elementTypeName = TypeName() + elementNullable = NullableOptDefaultTrue() + <GT> + { + return new ExtendedSqlCollectionTypeNameSpec( + elementTypeName, + elementNullable, + collectionTypeName, + false, + getPos()); + } +} + +SqlTypeNameSpec ExtendedCollectionsTypeName( + SqlTypeNameSpec elementTypeName, + boolean elementNullable) : +{ + final SqlTypeName collectionTypeName; +} +{ + ( + <MULTISET> { collectionTypeName = SqlTypeName.MULTISET; } + | + <ARRAY> { collectionTypeName = SqlTypeName.ARRAY; } + ) + { + return new ExtendedSqlCollectionTypeNameSpec( + elementTypeName, + elementNullable, + collectionTypeName, + true, + getPos()); + } +} + +SqlTypeNameSpec SqlMapTypeName() : +{ + SqlDataTypeSpec keyType; + SqlDataTypeSpec valType; + boolean nullable = true; +} +{ + <MAP> + <LT> + keyType = ExtendedDataType() + <COMMA> + valType = ExtendedDataType() + <GT> + { + return new SqlMapTypeNameSpec(keyType, valType, getPos()); + } +} + +void ExtendedFieldNameTypeCommaList( + List<SqlIdentifier> fieldNames, + List<SqlDataTypeSpec> fieldTypes, + List<SqlCharStringLiteral> comments) : +{ + SqlIdentifier fName; + SqlDataTypeSpec fType; + boolean nullable; +} +{ + fName = SimpleIdentifier() <COLON> fType = ExtendedDataType() + { + fieldNames.add(fName); + fieldTypes.add(fType); + } + ( + <QUOTED_STRING> { + comments.add(createStringLiteral(token.image, getPos())); + } + | + { comments.add(null); } + ) + ( + <COMMA> + fName = SimpleIdentifier() <COLON> fType = ExtendedDataType() + { + fieldNames.add(fName); + fieldTypes.add(fType); + } + ( + <QUOTED_STRING> { + comments.add(createStringLiteral(token.image, getPos())); + } + | + { comments.add(null); } + ) + )* +} + +SqlTypeNameSpec ExtendedSqlRowTypeName() : +{ + List<SqlIdentifier> fieldNames = new ArrayList<SqlIdentifier>(); + List<SqlDataTypeSpec> fieldTypes = new ArrayList<SqlDataTypeSpec>(); + List<SqlCharStringLiteral> comments = new ArrayList<SqlCharStringLiteral>(); +} +{ + <STRUCT> <LT> ExtendedFieldNameTypeCommaList(fieldNames, fieldTypes, comments) <GT> + { + return new ExtendedHiveStructTypeNameSpec( + getPos(), + fieldNames, + fieldTypes, + comments); + } +} diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java index a367e1e..bfbd33a 100644 --- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java @@ -18,31 +18,65 @@ package org.apache.flink.sql.parser.hive.ddl; +import org.apache.flink.sql.parser.ddl.SqlTableColumn; import org.apache.flink.sql.parser.ddl.SqlTableOption; import org.apache.flink.sql.parser.hive.impl.ParseException; +import org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec; +import org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec; +import org.apache.flink.sql.parser.type.SqlMapTypeNameSpec; import org.apache.flink.table.catalog.config.CatalogConfig; +import org.apache.calcite.sql.SqlBasicTypeNameSpec; +import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlTypeNameSpec; import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeName; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP; import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase.DATABASE_LOCATION_URI; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_INFO_PROP_PREFIX; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_LIB_CLASS_NAME; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_FILE_FORMAT; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_INPUT_FORMAT; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_OUTPUT_FORMAT; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_CONSTRAINT_TRAITS; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.PK_CONSTRAINT_TRAIT; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_IS_EXTERNAL; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_LOCATION_URI; /** * Util methods for Hive DDL Sql nodes. */ public class HiveDDLUtils { + // assume ';' cannot be used in column identifiers or type names, otherwise we need to implement escaping + public static final String COL_DELIMITER = ";"; + + private static final byte HIVE_CONSTRAINT_ENABLE = 1 << 2; + private static final byte HIVE_CONSTRAINT_VALIDATE = 1 << 1; + private static final byte HIVE_CONSTRAINT_RELY = 1; + private static final Set<String> RESERVED_DB_PROPERTIES = new HashSet<>(); + private static final Set<String> RESERVED_TABLE_PROPERTIES = new HashSet<>(); + private static final List<String> RESERVED_TABLE_PROP_PREFIX = new ArrayList<>(); static { RESERVED_DB_PROPERTIES.addAll(Arrays.asList(ALTER_DATABASE_OP, DATABASE_LOCATION_URI)); + + RESERVED_TABLE_PROPERTIES.addAll(Arrays.asList(TABLE_LOCATION_URI, + TABLE_IS_EXTERNAL, PK_CONSTRAINT_TRAIT, NOT_NULL_CONSTRAINT_TRAITS, + STORED_AS_FILE_FORMAT, STORED_AS_INPUT_FORMAT, STORED_AS_OUTPUT_FORMAT, SERDE_LIB_CLASS_NAME)); + + RESERVED_TABLE_PROP_PREFIX.add(SERDE_INFO_PROP_PREFIX); } private HiveDDLUtils() { @@ -52,6 +86,12 @@ public class HiveDDLUtils { return checkReservedProperties(RESERVED_DB_PROPERTIES, props, "Databases"); } + public static SqlNodeList checkReservedTableProperties(SqlNodeList props) throws ParseException { + props = checkReservedProperties(RESERVED_TABLE_PROPERTIES, props, "Tables"); + props = checkReservedPrefix(RESERVED_TABLE_PROP_PREFIX, props, "Tables"); + return props; + } + public static SqlNodeList ensureNonGeneric(SqlNodeList props) throws ParseException { for (SqlNode node : props) { if (node instanceof SqlTableOption && ((SqlTableOption) node).getKeyString().equalsIgnoreCase(CatalogConfig.IS_GENERIC)) { @@ -63,6 +103,28 @@ public class HiveDDLUtils { return props; } + private static SqlNodeList checkReservedPrefix(List<String> reserved, SqlNodeList properties, String metaType) throws ParseException { + if (properties == null) { + return null; + } + Set<String> match = new HashSet<>(); + for (SqlNode node : properties) { + if (node instanceof SqlTableOption) { + String key = ((SqlTableOption) node).getKeyString(); + for (String prefix : reserved) { + if (key.startsWith(prefix)) { + match.add(key); + } + } + } + } + if (!match.isEmpty()) { + throw new ParseException(String.format( + "Properties %s have reserved prefix and shouldn't be used for Hive %s", match, metaType)); + } + return properties; + } + private static SqlNodeList checkReservedProperties(Set<String> reservedProperties, SqlNodeList properties, String metaType) throws ParseException { if (properties == null) { @@ -91,4 +153,163 @@ public class HiveDDLUtils { public static SqlTableOption toTableOption(String key, String value, SqlParserPos pos) { return new SqlTableOption(SqlLiteral.createCharString(key, pos), SqlLiteral.createCharString(value, pos), pos); } + + public static void convertDataTypes(SqlNodeList columns) throws ParseException { + if (columns != null) { + for (SqlNode node : columns) { + convertDataTypes((SqlTableColumn) node); + } + } + } + + // Check and convert data types to comply with HiveQL, e.g. TIMESTAMP and BINARY + public static void convertDataTypes(SqlTableColumn column) throws ParseException { + column.setType(convertDataTypes(column.getType())); + } + + private static SqlDataTypeSpec convertDataTypes(SqlDataTypeSpec typeSpec) throws ParseException { + SqlTypeNameSpec nameSpec = typeSpec.getTypeNameSpec(); + SqlTypeNameSpec convertedNameSpec = convertDataTypes(nameSpec); + if (nameSpec != convertedNameSpec) { + typeSpec = new SqlDataTypeSpec(convertedNameSpec, typeSpec.getTimeZone(), typeSpec.getNullable(), + typeSpec.getParserPosition()); + } + return typeSpec; + } + + private static SqlTypeNameSpec convertDataTypes(SqlTypeNameSpec nameSpec) throws ParseException { + if (nameSpec instanceof SqlBasicTypeNameSpec) { + SqlBasicTypeNameSpec basicNameSpec = (SqlBasicTypeNameSpec) nameSpec; + if (basicNameSpec.getTypeName().getSimple().equalsIgnoreCase(SqlTypeName.TIMESTAMP.name())) { + if (basicNameSpec.getPrecision() < 0) { + nameSpec = new SqlBasicTypeNameSpec(SqlTypeName.TIMESTAMP, 9, basicNameSpec.getScale(), + basicNameSpec.getCharSetName(), basicNameSpec.getParserPos()); + } + } else if (basicNameSpec.getTypeName().getSimple().equalsIgnoreCase(SqlTypeName.BINARY.name())) { + if (basicNameSpec.getPrecision() < 0) { + nameSpec = new SqlBasicTypeNameSpec(SqlTypeName.VARBINARY, Integer.MAX_VALUE, basicNameSpec.getScale(), + basicNameSpec.getCharSetName(), basicNameSpec.getParserPos()); + } + } else if (basicNameSpec.getTypeName().getSimple().equalsIgnoreCase(SqlTypeName.VARCHAR.name())) { + if (basicNameSpec.getPrecision() < 0) { + throw new ParseException("VARCHAR precision is mandatory"); + } + } + } else if (nameSpec instanceof ExtendedSqlCollectionTypeNameSpec) { + ExtendedSqlCollectionTypeNameSpec collectionNameSpec = (ExtendedSqlCollectionTypeNameSpec) nameSpec; + SqlTypeNameSpec elementNameSpec = collectionNameSpec.getElementTypeName(); + SqlTypeNameSpec convertedElementNameSpec = convertDataTypes(elementNameSpec); + if (convertedElementNameSpec != elementNameSpec) { + nameSpec = new ExtendedSqlCollectionTypeNameSpec(convertedElementNameSpec, + collectionNameSpec.elementNullable(), collectionNameSpec.getCollectionTypeName(), + collectionNameSpec.unparseAsStandard(), collectionNameSpec.getParserPos()); + } + } else if (nameSpec instanceof SqlMapTypeNameSpec) { + SqlMapTypeNameSpec mapNameSpec = (SqlMapTypeNameSpec) nameSpec; + SqlDataTypeSpec keyTypeSpec = mapNameSpec.getKeyType(); + SqlDataTypeSpec valTypeSpec = mapNameSpec.getValType(); + SqlDataTypeSpec convertedKeyTypeSpec = convertDataTypes(keyTypeSpec); + SqlDataTypeSpec convertedValTypeSpec = convertDataTypes(valTypeSpec); + if (keyTypeSpec != convertedKeyTypeSpec || valTypeSpec != convertedValTypeSpec) { + nameSpec = new SqlMapTypeNameSpec(convertedKeyTypeSpec, convertedValTypeSpec, nameSpec.getParserPos()); + } + } else if (nameSpec instanceof ExtendedSqlRowTypeNameSpec) { + ExtendedSqlRowTypeNameSpec rowNameSpec = (ExtendedSqlRowTypeNameSpec) nameSpec; + List<SqlDataTypeSpec> fieldTypeSpecs = rowNameSpec.getFieldTypes(); + List<SqlDataTypeSpec> convertedFieldTypeSpecs = new ArrayList<>(fieldTypeSpecs.size()); + boolean updated = false; + for (SqlDataTypeSpec fieldTypeSpec : fieldTypeSpecs) { + SqlDataTypeSpec convertedFieldTypeSpec = convertDataTypes(fieldTypeSpec); + if (fieldTypeSpec != convertedFieldTypeSpec) { + updated = true; + } + convertedFieldTypeSpecs.add(convertedFieldTypeSpec); + } + if (updated) { + nameSpec = new ExtendedSqlRowTypeNameSpec(nameSpec.getParserPos(), rowNameSpec.getFieldNames(), + convertedFieldTypeSpecs, rowNameSpec.getComments(), rowNameSpec.unparseAsStandard()); + } + } + return nameSpec; + } + + // a constraint is by default ENABLE NOVALIDATE RELY + public static byte defaultTrait() { + byte res = enableConstraint((byte) 0); + res = relyConstraint(res); + return res; + } + + // returns a constraint trait that requires ENABLE + public static byte enableConstraint(byte trait) { + return (byte) (trait | HIVE_CONSTRAINT_ENABLE); + } + + // returns a constraint trait that doesn't require ENABLE + public static byte disableConstraint(byte trait) { + return (byte) (trait & (~HIVE_CONSTRAINT_ENABLE)); + } + + // returns a constraint trait that requires VALIDATE + public static byte validateConstraint(byte trait) { + return (byte) (trait | HIVE_CONSTRAINT_VALIDATE); + } + + // returns a constraint trait that doesn't require VALIDATE + public static byte noValidateConstraint(byte trait) { + return (byte) (trait & (~HIVE_CONSTRAINT_VALIDATE)); + } + + // returns a constraint trait that requires RELY + public static byte relyConstraint(byte trait) { + return (byte) (trait | HIVE_CONSTRAINT_RELY); + } + + // returns a constraint trait that doesn't require RELY + public static byte noRelyConstraint(byte trait) { + return (byte) (trait & (~HIVE_CONSTRAINT_RELY)); + } + + // returns whether a trait requires ENABLE constraint + public static boolean requireEnableConstraint(byte trait) { + return (trait & HIVE_CONSTRAINT_ENABLE) != 0; + } + + // returns whether a trait requires VALIDATE constraint + public static boolean requireValidateConstraint(byte trait) { + return (trait & HIVE_CONSTRAINT_VALIDATE) != 0; + } + + // returns whether a trait requires RELY constraint + public static boolean requireRelyConstraint(byte trait) { + return (trait & HIVE_CONSTRAINT_RELY) != 0; + } + + public static byte encodeConstraintTrait(SqlHiveConstraintTrait trait) { + byte res = 0; + if (trait.isEnable()) { + res = enableConstraint(res); + } + if (trait.isValidate()) { + res = validateConstraint(res); + } + if (trait.isRely()) { + res = relyConstraint(res); + } + return res; + } + + public static SqlNodeList deepCopyColList(SqlNodeList colList) { + SqlNodeList res = new SqlNodeList(colList.getParserPosition()); + for (SqlNode node : colList) { + SqlTableColumn col = (SqlTableColumn) node; + res.add(new SqlTableColumn( + col.getName(), + col.getType(), + col.getConstraint().orElse(null), + col.getComment().orElse(null), + col.getParserPosition())); + } + return res; + } } diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java index 01a47fe..d3aee55 100644 --- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java @@ -30,7 +30,7 @@ import org.apache.calcite.sql.parser.SqlParserPos; */ public abstract class SqlAlterHiveDatabase extends SqlAlterDatabase { - public static final String ALTER_DATABASE_OP = "alter.database.op"; + public static final String ALTER_DATABASE_OP = "hive.alter.database.op"; protected final SqlNodeList originPropList; diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabaseOwner.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabaseOwner.java index 67ac40e..27aba2f 100644 --- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabaseOwner.java +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabaseOwner.java @@ -31,8 +31,8 @@ import org.apache.calcite.sql.parser.SqlParserPos; */ public class SqlAlterHiveDatabaseOwner extends SqlAlterHiveDatabase { - public static final String DATABASE_OWNER_NAME = "database.owner.name"; - public static final String DATABASE_OWNER_TYPE = "database.owner.type"; + public static final String DATABASE_OWNER_NAME = "hive.database.owner.name"; + public static final String DATABASE_OWNER_TYPE = "hive.database.owner.type"; public static final String USER_OWNER = "user"; public static final String ROLE_OWNER = "role"; diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java index 628b397..d7e7bd5 100644 --- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java @@ -36,7 +36,7 @@ import org.apache.calcite.sql.parser.SqlParserPos; */ public class SqlCreateHiveDatabase extends SqlCreateDatabase { - public static final String DATABASE_LOCATION_URI = "database.location_uri"; + public static final String DATABASE_LOCATION_URI = "hive.database.location-uri"; private SqlNodeList originPropList; private final SqlCharStringLiteral location; diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java new file mode 100644 index 0000000..3fcec91 --- /dev/null +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.hive.ddl; + +import org.apache.flink.sql.parser.ddl.SqlCreateTable; +import org.apache.flink.sql.parser.ddl.SqlTableColumn; +import org.apache.flink.sql.parser.ddl.SqlTableOption; +import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; +import org.apache.flink.sql.parser.hive.impl.ParseException; +import org.apache.flink.table.catalog.config.CatalogConfig; + +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import javax.annotation.Nullable; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * CREATE Table DDL for Hive dialect. + */ +public class SqlCreateHiveTable extends SqlCreateTable { + + public static final String TABLE_LOCATION_URI = "hive.location-uri"; + public static final String TABLE_IS_EXTERNAL = "hive.is-external"; + public static final String PK_CONSTRAINT_TRAIT = "hive.pk.constraint.trait"; + public static final String NOT_NULL_CONSTRAINT_TRAITS = "hive.not.null.constraint.traits"; + public static final String NOT_NULL_COLS = "hive.not.null.cols"; + + private final HiveTableCreationContext creationContext; + private final SqlNodeList originPropList; + private final boolean isExternal; + private final HiveTableRowFormat rowFormat; + private final HiveTableStoredAs storedAs; + private final SqlCharStringLiteral location; + private final SqlNodeList origColList; + private final SqlNodeList origPartColList; + + public SqlCreateHiveTable(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList columnList, + HiveTableCreationContext creationContext, SqlNodeList propertyList, + SqlNodeList partColList, @Nullable SqlCharStringLiteral comment, boolean isTemporary, boolean isExternal, + HiveTableRowFormat rowFormat, HiveTableStoredAs storedAs, SqlCharStringLiteral location) throws ParseException { + + super(pos, tableName, columnList, creationContext.constraints, + HiveDDLUtils.checkReservedTableProperties(propertyList), extractPartColIdentifiers(partColList), null, + comment, null, isTemporary); + + this.origColList = HiveDDLUtils.deepCopyColList(columnList); + this.origPartColList = partColList != null ? + HiveDDLUtils.deepCopyColList(partColList) : + SqlNodeList.EMPTY; + + HiveDDLUtils.convertDataTypes(columnList); + HiveDDLUtils.convertDataTypes(partColList); + originPropList = new SqlNodeList(propertyList.getList(), propertyList.getParserPosition()); + // mark it as a hive table + HiveDDLUtils.ensureNonGeneric(propertyList); + propertyList.add(HiveDDLUtils.toTableOption(CatalogConfig.IS_GENERIC, "false", pos)); + // set external + this.isExternal = isExternal; + if (isExternal) { + propertyList.add(HiveDDLUtils.toTableOption(TABLE_IS_EXTERNAL, "true", pos)); + } + // add partition cols to col list + if (partColList != null) { + for (SqlNode partCol : partColList) { + columnList.add(partCol); + } + } + // set PRIMARY KEY + this.creationContext = creationContext; + for (SqlTableConstraint tableConstraint : creationContext.constraints) { + if (!tableConstraint.isPrimaryKey()) { + throw new ParseException("Only PrimaryKey table constraint is supported at the moment"); + } else { + // PK list is taken care of by super class, we need to set trait here + propertyList.add(HiveDDLUtils.toTableOption( + PK_CONSTRAINT_TRAIT, + String.valueOf(HiveDDLUtils.encodeConstraintTrait(creationContext.pkTrait)), + propertyList.getParserPosition())); + } + } + // set NOT NULL + if (creationContext.notNullTraits != null) { + // set traits + String notNullTraits = creationContext.notNullTraits.stream() + .map(HiveDDLUtils::encodeConstraintTrait) + .map(Object::toString) + .collect(Collectors.joining(HiveDDLUtils.COL_DELIMITER)); + propertyList.add(HiveDDLUtils.toTableOption( + NOT_NULL_CONSTRAINT_TRAITS, notNullTraits, propertyList.getParserPosition())); + // set col names + String notNullCols = creationContext.notNullCols.stream() + .map(SqlIdentifier::getSimple) + .collect(Collectors.joining(HiveDDLUtils.COL_DELIMITER)); + propertyList.add(HiveDDLUtils.toTableOption(NOT_NULL_COLS, notNullCols, propertyList.getParserPosition())); + } + // set row format + this.rowFormat = rowFormat; + if (rowFormat != null) { + for (SqlNode node : rowFormat.toPropList()) { + propertyList.add(node); + } + } + // set stored as + this.storedAs = storedAs; + if (storedAs != null) { + for (SqlNode node : storedAs.toPropList()) { + propertyList.add(node); + } + } + // set location + this.location = location; + if (location != null) { + propertyList.add(HiveDDLUtils.toTableOption(TABLE_LOCATION_URI, location, location.getParserPosition())); + } + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("CREATE"); + if (isTemporary()) { + writer.keyword("TEMPORARY"); + } + if (isExternal) { + writer.keyword("EXTERNAL"); + } + writer.keyword("TABLE"); + if (ifNotExists) { + writer.keyword("IF NOT EXISTS"); + } + getTableName().unparse(writer, leftPrec, rightPrec); + // columns + SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")"); + unparseColumns(creationContext, + origColList, + writer, leftPrec, rightPrec); + for (SqlTableConstraint tableConstraint : creationContext.constraints) { + printIndent(writer); + tableConstraint.getConstraintNameIdentifier().ifPresent(name -> { + writer.keyword("CONSTRAINT"); + name.unparse(writer, leftPrec, rightPrec); + }); + writer.keyword("PRIMARY KEY"); + SqlWriter.Frame pkFrame = writer.startList("(", ")"); + tableConstraint.getColumns().unparse(writer, leftPrec, rightPrec); + writer.endList(pkFrame); + creationContext.pkTrait.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(frame); + // table comment + getComment().ifPresent(c -> { + writer.keyword("COMMENT"); + c.unparse(writer, leftPrec, rightPrec); + }); + // partitions + if (origPartColList.size() > 0) { + writer.newlineAndIndent(); + writer.keyword("PARTITIONED BY"); + SqlWriter.Frame partitionedByFrame = writer.startList("(", ")"); + unparseColumns(creationContext, + origPartColList, + writer, leftPrec, rightPrec); + writer.newlineAndIndent(); + writer.endList(partitionedByFrame); + } + // row format + unparseRowFormat(writer, leftPrec, rightPrec); + // stored as + unparseStoredAs(writer, leftPrec, rightPrec); + // location + if (location != null) { + writer.newlineAndIndent(); + writer.keyword("LOCATION"); + location.unparse(writer, leftPrec, rightPrec); + } + // properties + if (originPropList.size() > 0) { + writer.newlineAndIndent(); + writer.keyword("TBLPROPERTIES"); + unparsePropList(originPropList, writer, leftPrec, rightPrec); + } + } + + private void unparseStoredAs(SqlWriter writer, int leftPrec, int rightPrec) { + if (storedAs == null) { + return; + } + writer.newlineAndIndent(); + writer.keyword("STORED AS"); + if (storedAs.fileFormat != null) { + storedAs.fileFormat.unparse(writer, leftPrec, rightPrec); + } else { + writer.keyword("INPUTFORMAT"); + storedAs.intputFormat.unparse(writer, leftPrec, rightPrec); + writer.keyword("OUTPUTFORMAT"); + storedAs.outputFormat.unparse(writer, leftPrec, rightPrec); + } + } + + private void unparseRowFormat(SqlWriter writer, int leftPrec, int rightPrec) { + if (rowFormat == null) { + return; + } + writer.newlineAndIndent(); + writer.keyword("ROW FORMAT"); + if (rowFormat.serdeClass != null) { + writer.keyword("SERDE"); + rowFormat.serdeClass.unparse(writer, leftPrec, rightPrec); + if (rowFormat.serdeProps != null) { + writer.keyword("WITH SERDEPROPERTIES"); + unparsePropList(rowFormat.serdeProps, writer, leftPrec, rightPrec); + } + } else { + writer.keyword("DELIMITED"); + SqlCharStringLiteral fieldDelim = rowFormat.delimitPropToValue.get(HiveTableRowFormat.FIELD_DELIM); + SqlCharStringLiteral escape = rowFormat.delimitPropToValue.get(HiveTableRowFormat.ESCAPE_CHAR); + if (fieldDelim != null) { + writer.newlineAndIndent(); + writer.print(" "); + writer.keyword("FIELDS TERMINATED BY"); + fieldDelim.unparse(writer, leftPrec, rightPrec); + if (escape != null) { + writer.keyword("ESCAPED BY"); + escape.unparse(writer, leftPrec, rightPrec); + } + } + SqlCharStringLiteral collectionDelim = rowFormat.delimitPropToValue.get(HiveTableRowFormat.COLLECTION_DELIM); + if (collectionDelim != null) { + writer.newlineAndIndent(); + writer.print(" "); + writer.keyword("COLLECTION ITEMS TERMINATED BY"); + collectionDelim.unparse(writer, leftPrec, rightPrec); + } + SqlCharStringLiteral mapKeyDelim = rowFormat.delimitPropToValue.get(HiveTableRowFormat.MAPKEY_DELIM); + if (mapKeyDelim != null) { + writer.newlineAndIndent(); + writer.print(" "); + writer.keyword("MAP KEYS TERMINATED BY"); + mapKeyDelim.unparse(writer, leftPrec, rightPrec); + } + SqlCharStringLiteral lineDelim = rowFormat.delimitPropToValue.get(HiveTableRowFormat.LINE_DELIM); + if (lineDelim != null) { + writer.newlineAndIndent(); + writer.print(" "); + writer.keyword("LINES TERMINATED BY"); + lineDelim.unparse(writer, leftPrec, rightPrec); + } + SqlCharStringLiteral nullAs = rowFormat.delimitPropToValue.get(HiveTableRowFormat.SERIALIZATION_NULL_FORMAT); + if (nullAs != null) { + writer.newlineAndIndent(); + writer.print(" "); + writer.keyword("NULL DEFINED AS"); + nullAs.unparse(writer, leftPrec, rightPrec); + } + } + } + + private void unparsePropList(SqlNodeList propList, SqlWriter writer, int leftPrec, int rightPrec) { + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode property : propList) { + printIndent(writer); + property.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } + + private void unparseColumns(HiveTableCreationContext context, SqlNodeList columns, + SqlWriter writer, int leftPrec, int rightPrec) { + List<SqlHiveConstraintTrait> notNullTraits = context.notNullTraits; + int traitIndex = 0; + for (SqlNode node : columns) { + printIndent(writer); + SqlTableColumn column = (SqlTableColumn) node; + column.getName().unparse(writer, leftPrec, rightPrec); + writer.print(" "); + column.getType().unparse(writer, leftPrec, rightPrec); + if (column.getType().getNullable() != null && !column.getType().getNullable()) { + writer.keyword("NOT NULL"); + notNullTraits.get(traitIndex++).unparse(writer, leftPrec, rightPrec); + } + column.getComment().ifPresent(c -> { + writer.keyword("COMMENT"); + c.unparse(writer, leftPrec, rightPrec); + }); + } + } + + // Extract the identifiers from partition col list -- that's what SqlCreateTable expects for partition keys + private static SqlNodeList extractPartColIdentifiers(SqlNodeList partCols) { + if (partCols == null) { + return null; + } + SqlNodeList res = new SqlNodeList(partCols.getParserPosition()); + for (SqlNode node : partCols) { + SqlTableColumn partCol = (SqlTableColumn) node; + res.add(partCol.getName()); + } + return res; + } + + /** + * Creation context for a Hive table. + */ + public static class HiveTableCreationContext extends TableCreationContext { + public SqlHiveConstraintTrait pkTrait = null; + public List<SqlHiveConstraintTrait> notNullTraits = null; + // PK cols are also considered not null, so we need to remember explicit NN cols + public List<SqlIdentifier> notNullCols = null; + } + + /** + * To represent STORED AS in CREATE TABLE DDL. + */ + public static class HiveTableStoredAs { + + public static final String STORED_AS_FILE_FORMAT = "hive.storage.file-format"; + public static final String STORED_AS_INPUT_FORMAT = "hive.stored.as.input.format"; + public static final String STORED_AS_OUTPUT_FORMAT = "hive.stored.as.output.format"; + + private final SqlParserPos pos; + private final SqlIdentifier fileFormat; + private final SqlCharStringLiteral intputFormat; + private final SqlCharStringLiteral outputFormat; + + private HiveTableStoredAs(SqlParserPos pos, SqlIdentifier fileFormat, SqlCharStringLiteral intputFormat, + SqlCharStringLiteral outputFormat) throws ParseException { + this.pos = pos; + this.fileFormat = fileFormat; + this.intputFormat = intputFormat; + this.outputFormat = outputFormat; + validate(); + } + + private void validate() throws ParseException { + if (fileFormat != null) { + if (intputFormat != null || outputFormat != null) { + throw new ParseException("Both file format and input/output format are specified"); + } + } else { + if (intputFormat == null || outputFormat == null) { + throw new ParseException("Neither file format nor input/output format is specified"); + } + } + } + + public SqlNodeList toPropList() { + SqlNodeList res = new SqlNodeList(pos); + if (fileFormat != null) { + res.add(HiveDDLUtils.toTableOption(STORED_AS_FILE_FORMAT, fileFormat.getSimple(), fileFormat.getParserPosition())); + } else { + res.add(HiveDDLUtils.toTableOption(STORED_AS_INPUT_FORMAT, intputFormat, intputFormat.getParserPosition())); + res.add(HiveDDLUtils.toTableOption(STORED_AS_OUTPUT_FORMAT, outputFormat, outputFormat.getParserPosition())); + } + return res; + } + + public static HiveTableStoredAs ofFileFormat(SqlParserPos pos, SqlIdentifier fileFormat) throws ParseException { + return new HiveTableStoredAs(pos, fileFormat, null, null); + } + + public static HiveTableStoredAs ofInputOutputFormat(SqlParserPos pos, SqlCharStringLiteral intputFormat, + SqlCharStringLiteral outputFormat) throws ParseException { + return new HiveTableStoredAs(pos, null, intputFormat, outputFormat); + } + } + + /** + * To represent ROW FORMAT in CREATE TABLE DDL. + */ + public static class HiveTableRowFormat { + + public static final String SERDE_LIB_CLASS_NAME = "hive.serde.lib.class.name"; + public static final String SERDE_INFO_PROP_PREFIX = "hive.serde.info.prop."; + private static final String FIELD_DELIM = SERDE_INFO_PROP_PREFIX + "field.delim"; + public static final String COLLECTION_DELIM = SERDE_INFO_PROP_PREFIX + "collection.delim"; + private static final String ESCAPE_CHAR = SERDE_INFO_PROP_PREFIX + "escape.delim"; + private static final String MAPKEY_DELIM = SERDE_INFO_PROP_PREFIX + "mapkey.delim"; + private static final String LINE_DELIM = SERDE_INFO_PROP_PREFIX + "line.delim"; + private static final String SERIALIZATION_NULL_FORMAT = SERDE_INFO_PROP_PREFIX + "serialization.null.format"; + + private final SqlParserPos pos; + private final Map<String, SqlCharStringLiteral> delimitPropToValue = new LinkedHashMap<>(); + private final SqlCharStringLiteral serdeClass; + private final SqlNodeList serdeProps; + + private HiveTableRowFormat(SqlParserPos pos, SqlCharStringLiteral fieldsTerminator, SqlCharStringLiteral escape, + SqlCharStringLiteral collectionTerminator, SqlCharStringLiteral mapKeyTerminator, + SqlCharStringLiteral linesTerminator, SqlCharStringLiteral nullAs, SqlCharStringLiteral serdeClass, + SqlNodeList serdeProps) throws ParseException { + this.pos = pos; + if (fieldsTerminator != null) { + delimitPropToValue.put(FIELD_DELIM, fieldsTerminator); + } + if (escape != null) { + delimitPropToValue.put(ESCAPE_CHAR, escape); + } + if (collectionTerminator != null) { + delimitPropToValue.put(COLLECTION_DELIM, collectionTerminator); + } + if (mapKeyTerminator != null) { + delimitPropToValue.put(MAPKEY_DELIM, mapKeyTerminator); + } + if (linesTerminator != null) { + delimitPropToValue.put(LINE_DELIM, linesTerminator); + } + if (nullAs != null) { + delimitPropToValue.put(SERIALIZATION_NULL_FORMAT, nullAs); + } + this.serdeClass = serdeClass; + this.serdeProps = serdeProps; + validate(); + } + + private void validate() throws ParseException { + if (!delimitPropToValue.isEmpty()) { + if (serdeClass != null || serdeProps != null) { + throw new ParseException("Both DELIMITED and SERDE specified"); + } + } else { + if (serdeClass == null) { + throw new ParseException("Neither DELIMITED nor SERDE specified"); + } + } + } + + public SqlNodeList toPropList() { + SqlNodeList list = new SqlNodeList(pos); + if (serdeClass != null) { + list.add(HiveDDLUtils.toTableOption(SERDE_LIB_CLASS_NAME, serdeClass, pos)); + if (serdeProps != null) { + for (SqlNode sqlNode : serdeProps) { + SqlTableOption option = (SqlTableOption) sqlNode; + list.add(HiveDDLUtils.toTableOption(SERDE_INFO_PROP_PREFIX + option.getKeyString(), + option.getValue(), pos)); + } + } + } else { + for (String prop : delimitPropToValue.keySet()) { + list.add(HiveDDLUtils.toTableOption(prop, delimitPropToValue.get(prop), pos)); + } + } + return list; + } + + public static HiveTableRowFormat withDelimited(SqlParserPos pos, SqlCharStringLiteral fieldsTerminator, + SqlCharStringLiteral escape, SqlCharStringLiteral collectionTerminator, + SqlCharStringLiteral mapKeyTerminator, SqlCharStringLiteral linesTerminator, SqlCharStringLiteral nullAs) + throws ParseException { + return new HiveTableRowFormat(pos, fieldsTerminator, escape, collectionTerminator, mapKeyTerminator, + linesTerminator, nullAs, null, null); + } + + public static HiveTableRowFormat withSerDe(SqlParserPos pos, SqlCharStringLiteral serdeClass, SqlNodeList serdeProps) + throws ParseException { + return new HiveTableRowFormat(pos, null, null, null, null, null, null, serdeClass, serdeProps); + } + } +} diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlDescribeHiveTable.java similarity index 52% copy from flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java copy to flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlDescribeHiveTable.java index 01a47fe..ee5e7c8 100644 --- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabase.java +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlDescribeHiveTable.java @@ -18,43 +18,34 @@ package org.apache.flink.sql.parser.hive.ddl; -import org.apache.flink.sql.parser.ddl.SqlAlterDatabase; +import org.apache.flink.sql.parser.dql.SqlRichDescribeTable; import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; /** - * Abstract class for ALTER DDL of a Hive database. + * DESCRIBE DDL to describe a Hive table. */ -public abstract class SqlAlterHiveDatabase extends SqlAlterDatabase { +public class SqlDescribeHiveTable extends SqlRichDescribeTable { - public static final String ALTER_DATABASE_OP = "alter.database.op"; + private final boolean extended; + private final boolean formatted; - protected final SqlNodeList originPropList; - - public SqlAlterHiveDatabase(SqlParserPos pos, SqlIdentifier databaseName, SqlNodeList propertyList) { - super(pos, databaseName, propertyList); - originPropList = new SqlNodeList(propertyList.getList(), propertyList.getParserPosition()); - propertyList.add(HiveDDLUtils.toTableOption(ALTER_DATABASE_OP, getAlterOp().name(), pos)); + public SqlDescribeHiveTable(SqlParserPos pos, SqlIdentifier tableNameIdentifier, boolean extended, boolean formatted) { + super(pos, tableNameIdentifier, extended || formatted); + this.extended = extended; + this.formatted = formatted; } - protected abstract AlterHiveDatabaseOp getAlterOp(); - @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { - writer.keyword("ALTER DATABASE"); - getDatabaseName().unparse(writer, leftPrec, rightPrec); - writer.keyword("SET"); - } - - /** - * Type of ALTER DATABASE operation. - */ - public enum AlterHiveDatabaseOp { - CHANGE_PROPS, - CHANGE_LOCATION, - CHANGE_OWNER + writer.keyword("DESCRIBE"); + if (extended) { + writer.keyword("EXTENDED"); + } else if (formatted) { + writer.keyword("FORMATTED"); + } + tableNameIdentifier.unparse(writer, leftPrec, rightPrec); } } diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintEnable.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintEnable.java new file mode 100644 index 0000000..297076f --- /dev/null +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintEnable.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.hive.ddl; + +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.parser.SqlParserPos; + +/** + * Enumeration of Hive constraint ENABLE. + */ +public enum SqlHiveConstraintEnable { + + ENABLE, + DISABLE; + + /** + * Creates a parse-tree node representing an occurrence of this keyword + * at a particular position in the parsed text. + */ + public SqlLiteral symbol(SqlParserPos pos) { + return SqlLiteral.createSymbol(this, pos); + } +} diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintRely.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintRely.java new file mode 100644 index 0000000..e63c8d9 --- /dev/null +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintRely.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.hive.ddl; + +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.parser.SqlParserPos; + +/** + * Enumeration of Hive constraint RELY. + */ +public enum SqlHiveConstraintRely { + + RELY, + NORELY; + + /** + * Creates a parse-tree node representing an occurrence of this keyword + * at a particular position in the parsed text. + */ + public SqlLiteral symbol(SqlParserPos pos) { + return SqlLiteral.createSymbol(this, pos); + } +} diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintTrait.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintTrait.java new file mode 100644 index 0000000..913831d --- /dev/null +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintTrait.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.hive.ddl; + +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlWriter; + +/** + * To describe a Hive constraint, i.e. ENABLE/DISABLE, VALIDATE/NOVALIDATE, RELY/NORELY. + */ +public class SqlHiveConstraintTrait { + + private final SqlLiteral enable; + private final SqlLiteral validate; + private final SqlLiteral rely; + + public SqlHiveConstraintTrait( + SqlLiteral enable, + SqlLiteral validate, + SqlLiteral rely) { + this.enable = enable; + this.validate = validate; + this.rely = rely; + } + + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + enable.unparse(writer, leftPrec, rightPrec); + validate.unparse(writer, leftPrec, rightPrec); + rely.unparse(writer, leftPrec, rightPrec); + } + + public boolean isEnable() { + return enable.getValueAs(SqlHiveConstraintEnable.class) == SqlHiveConstraintEnable.ENABLE; + } + + public boolean isValidate() { + return validate.getValueAs(SqlHiveConstraintValidate.class) == SqlHiveConstraintValidate.VALIDATE; + } + + public boolean isRely() { + return rely.getValueAs(SqlHiveConstraintRely.class) == SqlHiveConstraintRely.RELY; + } +} diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintValidate.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintValidate.java new file mode 100644 index 0000000..841cdbe --- /dev/null +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlHiveConstraintValidate.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.hive.ddl; + +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.parser.SqlParserPos; + +/** + * Enumeration of Hive constraint VALIDATE. + */ +public enum SqlHiveConstraintValidate { + + VALIDATE, + NOVALIDATE; + + /** + * Creates a parse-tree node representing an occurrence of this keyword + * at a particular position in the parsed text. + */ + public SqlLiteral symbol(SqlParserPos pos) { + return SqlLiteral.createSymbol(this, pos); + } +} diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/type/ExtendedHiveStructTypeNameSpec.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/type/ExtendedHiveStructTypeNameSpec.java new file mode 100644 index 0000000..2af2ce4 --- /dev/null +++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/type/ExtendedHiveStructTypeNameSpec.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.hive.type; + +import org.apache.flink.sql.parser.hive.impl.ParseException; +import org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec; + +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.Pair; + +import java.util.List; + +/** + * To represent STRUCT type in Hive. + */ +public class ExtendedHiveStructTypeNameSpec extends ExtendedSqlRowTypeNameSpec { + + public ExtendedHiveStructTypeNameSpec( + SqlParserPos pos, + List<SqlIdentifier> fieldNames, + List<SqlDataTypeSpec> fieldTypes, + List<SqlCharStringLiteral> comments) throws ParseException { + super(pos, fieldNames, fieldTypes, comments, false); + if (fieldNames.isEmpty()) { + throw new ParseException("STRUCT with no fields is not allowed"); + } + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.print("STRUCT"); + SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">"); + int i = 0; + for (Pair<SqlIdentifier, SqlDataTypeSpec> p : Pair.zip(getFieldNames(), getFieldTypes())) { + writer.sep(",", false); + p.left.unparse(writer, 0, 0); + p.right.unparse(writer, leftPrec, rightPrec); + if (!p.right.getNullable()) { + writer.keyword("NOT NULL"); + } + if (getComments().get(i) != null) { + getComments().get(i).unparse(writer, leftPrec, rightPrec); + } + i += 1; + } + writer.endList(frame); + } +} diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java index 4200030..eb8a18b 100644 --- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java +++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java @@ -98,4 +98,110 @@ public class FlinkHiveSqlParserImplTest extends SqlParserTest { sql("describe schema db1").ok("DESCRIBE DATABASE `DB1`"); sql("describe database extended db1").ok("DESCRIBE DATABASE EXTENDED `DB1`"); } + + @Test + public void testShowTables() { + // TODO: support SHOW TABLES IN 'db_name' 'regex_pattern' + sql("show tables").ok("SHOW TABLES"); + } + + @Test + public void testDescribeTable() { + // TODO: support describe partition and columns + sql("describe tbl").ok("DESCRIBE `TBL`"); + sql("describe extended tbl").ok("DESCRIBE EXTENDED `TBL`"); + sql("describe formatted tbl").ok("DESCRIBE FORMATTED `TBL`"); + } + + @Test + public void testCreateTable() { + sql("create table tbl (x int) row format delimited fields terminated by ',' escaped by '\\' " + + "collection items terminated by ',' map keys terminated by ':' lines terminated by '\n' " + + "null defined as 'null' location '/path/to/table'") + .ok("CREATE TABLE `TBL` (\n" + + " `X` INTEGER\n" + + ")\n" + + "ROW FORMAT DELIMITED\n" + + " FIELDS TERMINATED BY ',' ESCAPED BY '\\'\n" + + " COLLECTION ITEMS TERMINATED BY ','\n" + + " MAP KEYS TERMINATED BY ':'\n" + + " LINES TERMINATED BY '\n'\n" + + " NULL DEFINED AS 'null'\n" + + "LOCATION '/path/to/table'"); + sql("create table tbl (x double) stored as orc tblproperties ('k1'='v1')") + .ok("CREATE TABLE `TBL` (\n" + + " `X` DOUBLE\n" + + ")\n" + + "STORED AS `ORC`\n" + + "TBLPROPERTIES (\n" + + " 'k1' = 'v1'\n" + + ")"); + sql("create table tbl (x decimal(5,2)) row format serde 'serde.class.name' with serdeproperties ('serde.k1'='v1')") + .ok("CREATE TABLE `TBL` (\n" + + " `X` DECIMAL(5, 2)\n" + + ")\n" + + "ROW FORMAT SERDE 'serde.class.name' WITH SERDEPROPERTIES (\n" + + " 'serde.k1' = 'v1'\n" + + ")"); + sql("create table tbl (x date) row format delimited fields terminated by '\u0001' " + + "stored as inputformat 'input.format.class' outputformat 'output.format.class'") + .ok("CREATE TABLE `TBL` (\n" + + " `X` DATE\n" + + ")\n" + + "ROW FORMAT DELIMITED\n" + + " FIELDS TERMINATED BY u&'\\0001'\n" + + "STORED AS INPUTFORMAT 'input.format.class' OUTPUTFORMAT 'output.format.class'"); + sql("create table tbl (x struct<f1:timestamp,f2:int>) partitioned by (p1 string,p2 bigint) stored as rcfile") + .ok("CREATE TABLE `TBL` (\n" + + " `X` STRUCT< `F1` TIMESTAMP, `F2` INTEGER >\n" + + ")\n" + + "PARTITIONED BY (\n" + + " `P1` STRING,\n" + + " `P2` BIGINT\n" + + ")\n" + + "STORED AS `RCFILE`"); + sql("create external table tbl (x map<timestamp,array<timestamp>>) location '/table/path'") + .ok("CREATE EXTERNAL TABLE `TBL` (\n" + + " `X` MAP< TIMESTAMP, ARRAY< TIMESTAMP > >\n" + + ")\n" + + "LOCATION '/table/path'"); + sql("create temporary table tbl (x varchar(50)) partitioned by (p timestamp)") + .ok("CREATE TEMPORARY TABLE `TBL` (\n" + + " `X` VARCHAR(50)\n" + + ")\n" + + "PARTITIONED BY (\n" + + " `P` TIMESTAMP\n" + + ")"); + sql("create table tbl (v varchar)").fails("VARCHAR precision is mandatory"); + // TODO: support CLUSTERED BY, SKEWED BY, STORED BY, col constraints + } + + @Test + public void testConstraints() { + sql("create table tbl (x int not null enable rely, y string not null disable novalidate norely)") + .ok("CREATE TABLE `TBL` (\n" + + " `X` INTEGER NOT NULL ENABLE NOVALIDATE RELY,\n" + + " `Y` STRING NOT NULL DISABLE NOVALIDATE NORELY\n" + + ")"); + sql("create table tbl (x int,y timestamp not null,z string,primary key (x,z) disable novalidate rely)") + .ok("CREATE TABLE `TBL` (\n" + + " `X` INTEGER,\n" + + " `Y` TIMESTAMP NOT NULL ENABLE NOVALIDATE RELY,\n" + + " `Z` STRING,\n" + + " PRIMARY KEY (`X`, `Z`) DISABLE NOVALIDATE RELY\n" + + ")"); + sql("create table tbl (x binary,y date,z string,constraint pk_cons primary key(x))") + .ok("CREATE TABLE `TBL` (\n" + + " `X` BINARY,\n" + + " `Y` DATE,\n" + + " `Z` STRING,\n" + + " CONSTRAINT `PK_CONS` PRIMARY KEY (`X`) ENABLE NOVALIDATE RELY\n" + + ")"); + } + + @Test + public void testDropTable() { + sql("drop table tbl").ok("DROP TABLE `TBL`"); + sql("drop table if exists cat.tbl").ok("DROP TABLE IF EXISTS `CAT`.`TBL`"); + } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java index a5e487b..2e84982 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java @@ -317,7 +317,7 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode { } } - private void printIndent(SqlWriter writer) { + protected void printIndent(SqlWriter writer) { writer.sep(",", false); writer.newlineAndIndent(); writer.print(" "); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/constraint/SqlTableConstraint.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/constraint/SqlTableConstraint.java index 0195907..e35fd95 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/constraint/SqlTableConstraint.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/constraint/SqlTableConstraint.java @@ -123,6 +123,10 @@ public class SqlTableConstraint extends SqlCall { return Optional.ofNullable(ret); } + public Optional<SqlIdentifier> getConstraintNameIdentifier() { + return Optional.ofNullable(constraintName); + } + public SqlNodeList getColumns() { return columns; } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeTable.java index 72e150a..5fdfb09 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeTable.java @@ -37,7 +37,7 @@ import java.util.List; public class SqlRichDescribeTable extends SqlCall { public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("DESCRIBE TABLE", SqlKind.DESCRIBE_TABLE); - private final SqlIdentifier tableNameIdentifier; + protected final SqlIdentifier tableNameIdentifier; private boolean isExtended = false; public SqlRichDescribeTable(SqlParserPos pos, SqlIdentifier tableNameIdentifier, boolean isExtended) { diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlCollectionTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlCollectionTypeNameSpec.java index a690392..cfd38e0 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlCollectionTypeNameSpec.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlCollectionTypeNameSpec.java @@ -62,6 +62,18 @@ public class ExtendedSqlCollectionTypeNameSpec extends SqlCollectionTypeNameSpec this.unparseAsStandard = unparseAsStandard; } + public boolean elementNullable() { + return elementNullable; + } + + public SqlTypeName getCollectionTypeName() { + return collectionTypeName; + } + + public boolean unparseAsStandard() { + return unparseAsStandard; + } + @Override public RelDataType deriveType(SqlValidator validator) { RelDataType elementType = getElementTypeName().deriveType(validator); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java index 37e841b..c589326 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java @@ -86,6 +86,10 @@ public class ExtendedSqlRowTypeNameSpec extends SqlTypeNameSpec { return comments; } + public boolean unparseAsStandard() { + return unparseAsStandard; + } + @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.print("ROW");