This is an automated email from the ASF dual-hosted git repository.

zhangbutao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new eb2cac384da HIVE-28015: Iceberg: Add identifier-field-ids support in 
Hive (#5047)(Butao Zhang, reviewed by Denys Kuzmenko)
eb2cac384da is described below

commit eb2cac384da8e71a049ff44d883ca363938c6a69
Author: Butao Zhang <zhangbu...@cmss.chinamobile.com>
AuthorDate: Wed Feb 21 20:51:50 2024 +0800

    HIVE-28015: Iceberg: Add identifier-field-ids support in Hive (#5047)(Butao 
Zhang, reviewed by Denys Kuzmenko)
---
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       | 55 +++++++++++++++++----
 .../hive/TestHiveIcebergStorageHandlerNoScan.java  | 56 ++++++++++++++++++++++
 .../apache/hadoop/hive/metastore/HiveMetaHook.java | 12 +++++
 .../hadoop/hive/metastore/HiveMetaStoreClient.java |  2 +-
 4 files changed, 115 insertions(+), 10 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 9a108e51972..94aabe65d43 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -43,9 +43,11 @@ import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.PartitionDropOptions;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -122,6 +124,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.StructProjection;
 import org.apache.thrift.TException;
@@ -194,6 +197,12 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
 
   @Override
   public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    CreateTableRequest request = new CreateTableRequest(hmsTable);
+    preCreateTable(request);
+  }
+  @Override
+  public void preCreateTable(CreateTableRequest request) {
+    org.apache.hadoop.hive.metastore.api.Table hmsTable = request.getTable();
     if (hmsTable.isTemporary()) {
       throw new UnsupportedOperationException("Creation of temporary iceberg 
tables is not supported.");
     }
@@ -234,7 +243,12 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
     // - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC 
takes precedence so the user can override the
     // Iceberg schema and specification generated by the code
 
-    Schema schema = schema(catalogProperties, hmsTable);
+    Set<String> identifierFields = 
Optional.ofNullable(request.getPrimaryKeys())
+            .map(primaryKeys -> primaryKeys.stream()
+                    .map(SQLPrimaryKey::getColumn_name)
+                    .collect(Collectors.toSet()))
+            .orElse(Collections.emptySet());
+    Schema schema = schema(catalogProperties, hmsTable, identifierFields);
     PartitionSpec spec = spec(conf, schema, hmsTable);
 
     // If there are partition keys specified remove them from the HMS table 
and add them to the column list
@@ -255,6 +269,8 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
 
     // Set whether the format is ORC, to be used during vectorization.
     setOrcOnlyFilesParam(hmsTable);
+    // Remove hive primary key columns from table request, as iceberg doesn't 
support hive primary key.
+    request.setPrimaryKeys(null);
   }
 
   @Override
@@ -384,7 +400,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
       preAlterTableProperties = new PreAlterTableProperties();
       preAlterTableProperties.tableLocation = sd.getLocation();
       preAlterTableProperties.format = sd.getInputFormat();
-      preAlterTableProperties.schema = schema(catalogProperties, hmsTable);
+      preAlterTableProperties.schema = schema(catalogProperties, hmsTable, 
Collections.emptySet());
       preAlterTableProperties.partitionKeys = hmsTable.getPartitionKeys();
 
       context.getProperties().put(HiveMetaHook.ALLOW_PARTITION_KEY_CHANGE, 
"true");
@@ -794,19 +810,40 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
     return properties;
   }
 
-  private Schema schema(Properties properties, 
org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+  private Schema schema(Properties properties, 
org.apache.hadoop.hive.metastore.api.Table hmsTable,
+                        Set<String> identifierFields) {
     boolean autoConversion = 
conf.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
 
     if (properties.getProperty(InputFormatConfig.TABLE_SCHEMA) != null) {
       return 
SchemaParser.fromJson(properties.getProperty(InputFormatConfig.TABLE_SCHEMA));
-    } else if (hmsTable.isSetPartitionKeys() && 
!hmsTable.getPartitionKeys().isEmpty()) {
-      // Add partitioning columns to the original column list before creating 
the Iceberg Schema
-      List<FieldSchema> cols = Lists.newArrayList(hmsTable.getSd().getCols());
+    }
+    List<FieldSchema> cols = Lists.newArrayList(hmsTable.getSd().getCols());
+    if (hmsTable.isSetPartitionKeys() && 
!hmsTable.getPartitionKeys().isEmpty()) {
       cols.addAll(hmsTable.getPartitionKeys());
-      return HiveSchemaUtil.convert(cols, autoConversion);
-    } else {
-      return HiveSchemaUtil.convert(hmsTable.getSd().getCols(), 
autoConversion);
     }
+    Schema schema = HiveSchemaUtil.convert(cols, autoConversion);
+
+    return getSchemaWithIdentifierFields(schema, identifierFields);
+  }
+
+  private Schema getSchemaWithIdentifierFields(Schema schema, Set<String> 
identifierFields) {
+    if (identifierFields == null || identifierFields.isEmpty()) {
+      return schema;
+    }
+    Set<Integer> identifierFieldIds = identifierFields.stream()
+            .map(column -> {
+              Types.NestedField field = schema.findField(column);
+              Preconditions.checkNotNull(field,
+                      "Cannot find identifier field ID for the column %s in 
schema %s", column, schema);
+              return field.fieldId();
+            })
+            .collect(Collectors.toSet());
+
+    List<Types.NestedField> cols = schema.columns().stream()
+            .map(column -> identifierFieldIds.contains(column.fieldId()) ? 
column.asRequired() : column)
+            .collect(Collectors.toList());
+
+    return new Schema(cols, identifierFieldIds);
   }
 
   private static PartitionSpec spec(Configuration configuration, Schema schema,
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index c1bbeb03989..5a63733bd1e 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -75,6 +75,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
@@ -2054,6 +2055,61 @@ public class TestHiveIcebergStorageHandlerNoScan {
     Assert.assertEquals(icePros.get(TableProperties.MERGE_MODE), 
hmsProps.get(TableProperties.MERGE_MODE));
   }
 
+  @Test
+  public void testCreateTableWithIdentifierField() {
+    TableIdentifier identifier = TableIdentifier.of("default", "customers");
+    String query = String.format("CREATE EXTERNAL TABLE customers (" +
+                    "customer_id BIGINT primary key disable novalidate, " +
+                    "first_name STRING, " +
+                    "last_name STRING) " +
+                    "STORED BY iceBerg %s TBLPROPERTIES ('%s'='%s')",
+            testTables.locationForCreateTableSQL(identifier),
+            InputFormatConfig.CATALOG_NAME,
+            testTables.catalogName());
+    shell.executeStatement(query);
+    org.apache.iceberg.Table table = testTables.loadTable(identifier);
+    Assert.assertEquals("Should have new identifier field",
+            
Sets.newHashSet(table.schema().findField("customer_id").fieldId()), 
table.schema().identifierFieldIds());
+  }
+
+  @Test
+  public void testCreateTableWithMultiIdentifierFields() {
+    TableIdentifier identifier = TableIdentifier.of("default", "customers");
+    String query = String.format("CREATE EXTERNAL TABLE customers (" +
+                    "customer_id BIGINT," +
+                    "first_name STRING, " +
+                    "last_name STRING," +
+                    "primary key (customer_id, first_name) disable novalidate) 
" +
+                    "STORED BY iceBerg %s TBLPROPERTIES ('%s'='%s')",
+            testTables.locationForCreateTableSQL(identifier),
+            InputFormatConfig.CATALOG_NAME,
+            testTables.catalogName());
+    shell.executeStatement(query);
+    org.apache.iceberg.Table table = testTables.loadTable(identifier);
+    Assert.assertEquals("Should have new two identifier fields",
+            Sets.newHashSet(table.schema().findField("customer_id").fieldId(),
+                    table.schema().findField("first_name").fieldId()), 
table.schema().identifierFieldIds());
+  }
+
+  @Test
+  public void testCreateTableFailedWithNestedIdentifierField() {
+    TableIdentifier identifier = TableIdentifier.of("default", "customers");
+    String query = String.format("CREATE EXTERNAL TABLE 
customers_with_nested_column (" +
+                    "customer_id BIGINT," +
+                    "first_name STRING, " +
+                    "last_name STRING, " +
+                    "user_info STRUCT<address: STRING, phone: STRING> primary 
key disable novalidate) " +
+                    "STORED BY iceBerg %s TBLPROPERTIES ('%s'='%s')",
+            testTables.locationForCreateTableSQL(identifier),
+            InputFormatConfig.CATALOG_NAME,
+            testTables.catalogName());
+
+    // Iceberg table doesn't support nested column as identifier field.
+    Assert.assertThrows(
+            "Cannot add field user_info as an identifier field: not a 
primitive type field",
+            IllegalArgumentException.class, () -> 
shell.executeStatement(query));
+  }
+
   private String 
getCurrentSnapshotForHiveCatalogTable(org.apache.iceberg.Table icebergTable) {
     return ((BaseMetastoreTableOperations) ((BaseTable) 
icebergTable).operations()).currentMetadataLocation();
   }
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
index 695a3282838..115942b9b8f 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -76,6 +77,17 @@ public interface HiveMetaHook {
   void preCreateTable(Table table)
     throws MetaException;
 
+  /**
+   * Called before a new table definition is added to the metastore
+   * during CREATE TABLE.
+   *
+   * @param request the whole request to create a new table
+   */
+  default void preCreateTable(CreateTableRequest request)
+          throws MetaException {
+    preCreateTable(request.getTable());
+  }
+
   /**
    * Called after failure adding a new table definition to the metastore
    * during CREATE TABLE.
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 7d484bf44cd..862096cb4d8 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -1481,7 +1481,7 @@ public class HiveMetaStoreClient implements 
IMetaStoreClient, AutoCloseable {
 
     HiveMetaHook hook = getHook(tbl);
     if (hook != null) {
-      hook.preCreateTable(tbl);
+      hook.preCreateTable(request);
     }
     boolean success = false;
     try {

Reply via email to