This is an automated email from the ASF dual-hosted git repository. zhangbutao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new eb2cac384da HIVE-28015: Iceberg: Add identifier-field-ids support in Hive (#5047)(Butao Zhang, reviewed by Denys Kuzmenko) eb2cac384da is described below commit eb2cac384da8e71a049ff44d883ca363938c6a69 Author: Butao Zhang <zhangbu...@cmss.chinamobile.com> AuthorDate: Wed Feb 21 20:51:50 2024 +0800 HIVE-28015: Iceberg: Add identifier-field-ids support in Hive (#5047)(Butao Zhang, reviewed by Denys Kuzmenko) --- .../iceberg/mr/hive/HiveIcebergMetaHook.java | 55 +++++++++++++++++---- .../hive/TestHiveIcebergStorageHandlerNoScan.java | 56 ++++++++++++++++++++++ .../apache/hadoop/hive/metastore/HiveMetaHook.java | 12 +++++ .../hadoop/hive/metastore/HiveMetaStoreClient.java | 2 +- 4 files changed, 115 insertions(+), 10 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 9a108e51972..94aabe65d43 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -43,9 +43,11 @@ import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.PartitionDropOptions; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.CreateTableRequest; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -122,6 +124,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructProjection; import org.apache.thrift.TException; @@ -194,6 +197,12 @@ public class HiveIcebergMetaHook implements HiveMetaHook { @Override public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) { + CreateTableRequest request = new CreateTableRequest(hmsTable); + preCreateTable(request); + } + @Override + public void preCreateTable(CreateTableRequest request) { + org.apache.hadoop.hive.metastore.api.Table hmsTable = request.getTable(); if (hmsTable.isTemporary()) { throw new UnsupportedOperationException("Creation of temporary iceberg tables is not supported."); } @@ -234,7 +243,12 @@ public class HiveIcebergMetaHook implements HiveMetaHook { // - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC takes precedence so the user can override the // Iceberg schema and specification generated by the code - Schema schema = schema(catalogProperties, hmsTable); + Set<String> identifierFields = Optional.ofNullable(request.getPrimaryKeys()) + .map(primaryKeys -> primaryKeys.stream() + .map(SQLPrimaryKey::getColumn_name) + .collect(Collectors.toSet())) + .orElse(Collections.emptySet()); + Schema schema = schema(catalogProperties, hmsTable, identifierFields); PartitionSpec spec = spec(conf, schema, hmsTable); // If there are partition keys specified remove them from the HMS table and add them to the column list @@ -255,6 +269,8 @@ public class HiveIcebergMetaHook implements HiveMetaHook { // Set whether the format is ORC, to be used during vectorization. setOrcOnlyFilesParam(hmsTable); + // Remove hive primary key columns from table request, as iceberg doesn't support hive primary key. + request.setPrimaryKeys(null); } @Override @@ -384,7 +400,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook { preAlterTableProperties = new PreAlterTableProperties(); preAlterTableProperties.tableLocation = sd.getLocation(); preAlterTableProperties.format = sd.getInputFormat(); - preAlterTableProperties.schema = schema(catalogProperties, hmsTable); + preAlterTableProperties.schema = schema(catalogProperties, hmsTable, Collections.emptySet()); preAlterTableProperties.partitionKeys = hmsTable.getPartitionKeys(); context.getProperties().put(HiveMetaHook.ALLOW_PARTITION_KEY_CHANGE, "true"); @@ -794,19 +810,40 @@ public class HiveIcebergMetaHook implements HiveMetaHook { return properties; } - private Schema schema(Properties properties, org.apache.hadoop.hive.metastore.api.Table hmsTable) { + private Schema schema(Properties properties, org.apache.hadoop.hive.metastore.api.Table hmsTable, + Set<String> identifierFields) { boolean autoConversion = conf.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false); if (properties.getProperty(InputFormatConfig.TABLE_SCHEMA) != null) { return SchemaParser.fromJson(properties.getProperty(InputFormatConfig.TABLE_SCHEMA)); - } else if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) { - // Add partitioning columns to the original column list before creating the Iceberg Schema - List<FieldSchema> cols = Lists.newArrayList(hmsTable.getSd().getCols()); + } + List<FieldSchema> cols = Lists.newArrayList(hmsTable.getSd().getCols()); + if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) { cols.addAll(hmsTable.getPartitionKeys()); - return HiveSchemaUtil.convert(cols, autoConversion); - } else { - return HiveSchemaUtil.convert(hmsTable.getSd().getCols(), autoConversion); } + Schema schema = HiveSchemaUtil.convert(cols, autoConversion); + + return getSchemaWithIdentifierFields(schema, identifierFields); + } + + private Schema getSchemaWithIdentifierFields(Schema schema, Set<String> identifierFields) { + if (identifierFields == null || identifierFields.isEmpty()) { + return schema; + } + Set<Integer> identifierFieldIds = identifierFields.stream() + .map(column -> { + Types.NestedField field = schema.findField(column); + Preconditions.checkNotNull(field, + "Cannot find identifier field ID for the column %s in schema %s", column, schema); + return field.fieldId(); + }) + .collect(Collectors.toSet()); + + List<Types.NestedField> cols = schema.columns().stream() + .map(column -> identifierFieldIds.contains(column.fieldId()) ? column.asRequired() : column) + .collect(Collectors.toList()); + + return new Schema(cols, identifierFieldIds); } private static PartitionSpec spec(Configuration configuration, Schema schema, diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index c1bbeb03989..5a63733bd1e 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -75,6 +75,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -2054,6 +2055,61 @@ public class TestHiveIcebergStorageHandlerNoScan { Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE), hmsProps.get(TableProperties.MERGE_MODE)); } + @Test + public void testCreateTableWithIdentifierField() { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + String query = String.format("CREATE EXTERNAL TABLE customers (" + + "customer_id BIGINT primary key disable novalidate, " + + "first_name STRING, " + + "last_name STRING) " + + "STORED BY iceBerg %s TBLPROPERTIES ('%s'='%s')", + testTables.locationForCreateTableSQL(identifier), + InputFormatConfig.CATALOG_NAME, + testTables.catalogName()); + shell.executeStatement(query); + org.apache.iceberg.Table table = testTables.loadTable(identifier); + Assert.assertEquals("Should have new identifier field", + Sets.newHashSet(table.schema().findField("customer_id").fieldId()), table.schema().identifierFieldIds()); + } + + @Test + public void testCreateTableWithMultiIdentifierFields() { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + String query = String.format("CREATE EXTERNAL TABLE customers (" + + "customer_id BIGINT," + + "first_name STRING, " + + "last_name STRING," + + "primary key (customer_id, first_name) disable novalidate) " + + "STORED BY iceBerg %s TBLPROPERTIES ('%s'='%s')", + testTables.locationForCreateTableSQL(identifier), + InputFormatConfig.CATALOG_NAME, + testTables.catalogName()); + shell.executeStatement(query); + org.apache.iceberg.Table table = testTables.loadTable(identifier); + Assert.assertEquals("Should have new two identifier fields", + Sets.newHashSet(table.schema().findField("customer_id").fieldId(), + table.schema().findField("first_name").fieldId()), table.schema().identifierFieldIds()); + } + + @Test + public void testCreateTableFailedWithNestedIdentifierField() { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + String query = String.format("CREATE EXTERNAL TABLE customers_with_nested_column (" + + "customer_id BIGINT," + + "first_name STRING, " + + "last_name STRING, " + + "user_info STRUCT<address: STRING, phone: STRING> primary key disable novalidate) " + + "STORED BY iceBerg %s TBLPROPERTIES ('%s'='%s')", + testTables.locationForCreateTableSQL(identifier), + InputFormatConfig.CATALOG_NAME, + testTables.catalogName()); + + // Iceberg table doesn't support nested column as identifier field. + Assert.assertThrows( + "Cannot add field user_info as an identifier field: not a primitive type field", + IllegalArgumentException.class, () -> shell.executeStatement(query)); + } + private String getCurrentSnapshotForHiveCatalogTable(org.apache.iceberg.Table icebergTable) { return ((BaseMetastoreTableOperations) ((BaseTable) icebergTable).operations()).currentMetadataLocation(); } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java index 695a3282838..115942b9b8f 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.api.CreateTableRequest; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; @@ -76,6 +77,17 @@ public interface HiveMetaHook { void preCreateTable(Table table) throws MetaException; + /** + * Called before a new table definition is added to the metastore + * during CREATE TABLE. + * + * @param request the whole request to create a new table + */ + default void preCreateTable(CreateTableRequest request) + throws MetaException { + preCreateTable(request.getTable()); + } + /** * Called after failure adding a new table definition to the metastore * during CREATE TABLE. diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 7d484bf44cd..862096cb4d8 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -1481,7 +1481,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { HiveMetaHook hook = getHook(tbl); if (hook != null) { - hook.preCreateTable(tbl); + hook.preCreateTable(request); } boolean success = false; try {