difin commented on code in PR #5995:
URL: https://github.com/apache/hive/pull/5995#discussion_r2305093367


##########
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java:
##########
@@ -221,7 +222,8 @@ static void setPartitionSpec(TableMetadata metadata, 
Map<String, String> paramet
   }
 
   public static PartitionSpec getPartitionSpec(Map<String, String> props, 
Schema schema) {
-    return 
Optional.ofNullable(props.get(TableProperties.DEFAULT_PARTITION_SPEC))
+    return Optional.ofNullable(props.get(PARTITION_SPEC))

Review Comment:
   Done



##########
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java:
##########
@@ -104,38 +109,56 @@ public static void alterTable(
     }
   }
 
-  public static Table convertIcebergTableToHiveTable(org.apache.iceberg.Table 
icebergTable, Configuration conf) {
-    Table hiveTable = new Table();
-    TableMetadata metadata = ((BaseTable) icebergTable).operations().current();
+  public static List<FieldSchema> getPartitionKeys(org.apache.iceberg.Table 
table, int specId) {
+    Schema schema = table.specs().get(specId).schema();
+    List<FieldSchema> hiveSchema = HiveSchemaUtil.convert(schema);
+    Map<String, String> colNameToColType = hiveSchema.stream()
+        .collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
+    return table.specs().get(specId).fields().stream()
+        .map(partField -> new FieldSchema(
+            schema.findColumnName(partField.sourceId()),
+            colNameToColType.get(schema.findColumnName(partField.sourceId())),
+            String.format("Transform: %s", partField.transform().toString()))
+        )
+        .toList();
+  }
+
+  public static Table toHiveTable(org.apache.iceberg.Table table, 
Configuration conf) {
+    var result = new Table();
+    TableName tableName = TableName.fromString(table.name(), 
MetaStoreUtils.getDefaultCatalog(conf),
+        Warehouse.DEFAULT_DATABASE_NAME);
+    result.setCatName(tableName.getCat());
+    result.setDbName(tableName.getDb());
+    result.setTableName(tableName.getTable());
+    result.setTableType(TableType.EXTERNAL_TABLE.toString());
+    result.setPartitionKeys(getPartitionKeys(table, table.spec().specId()));
+    TableMetadata metadata = ((BaseTable) table).operations().current();
     long maxHiveTablePropertySize = 
conf.getLong(HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE,
         HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
-    
HMSTablePropertyHelper.updateHmsTableForIcebergTable(metadata.metadataFileLocation(),
 hiveTable, metadata,
+    
HMSTablePropertyHelper.updateHmsTableForIcebergTable(metadata.metadataFileLocation(),
 result, metadata,
         null, true, maxHiveTablePropertySize, null);
-    hiveTable.getParameters().put(CatalogUtils.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
-    TableName tableName = TableName.fromString(icebergTable.name(), null, 
null);
-    hiveTable.setTableName(tableName.getTable());
-    hiveTable.setDbName(tableName.getDb());
-    StorageDescriptor storageDescriptor = new StorageDescriptor();
-    hiveTable.setSd(storageDescriptor);
-    hiveTable.setTableType("EXTERNAL_TABLE");
-    hiveTable.setPartitionKeys(new LinkedList<>());
-    List<FieldSchema> cols = new LinkedList<>();
-    storageDescriptor.setCols(cols);
-    storageDescriptor.setLocation(icebergTable.location());
-    storageDescriptor.setInputFormat(DEFAULT_INPUT_FORMAT_CLASS);
-    storageDescriptor.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
-    storageDescriptor.setBucketCols(new LinkedList<>());
-    storageDescriptor.setSortCols(new LinkedList<>());
-    storageDescriptor.setParameters(Maps.newHashMap());
-    SerDeInfo serDeInfo = new SerDeInfo("icebergSerde", DEFAULT_SERDE_CLASS, 
Maps.newHashMap());
-    serDeInfo.getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1"); 
// Default serialization format.
-    storageDescriptor.setSerdeInfo(serDeInfo);
-    icebergTable.schema().columns().forEach(icebergColumn -> {
-      FieldSchema fieldSchema = new FieldSchema();
-      fieldSchema.setName(icebergColumn.name());
-      fieldSchema.setType(icebergColumn.type().toString());
-      cols.add(fieldSchema);
-    });
-    return hiveTable;
+    result.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
conf.get(CatalogUtils.CATALOG_CONFIG_TYPE));
+    result.setSd(toHiveStorageDescriptor(table));
+    return result;
+  }
+
+  private static StorageDescriptor 
toHiveStorageDescriptor(org.apache.iceberg.Table table) {

Review Comment:
   Done



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java:
##########
@@ -272,26 +274,33 @@ static Optional<Catalog> loadCatalog(Configuration conf, 
String catalogName) {
   private static Map<String, String> getCatalogProperties(Configuration conf, 
String catalogName) {
     Map<String, String> catalogProperties = Maps.newHashMap();
 
-    String keyPrefix;
-    if (ICEBERG_DEFAULT_CATALOG_NAME.equals(catalogName)) {
-      keyPrefix = InputFormatConfig.CATALOG_CONFIG_PREFIX + catalogName;
-    } else {
-      keyPrefix = 
String.format(InputFormatConfig.CUSTOM_CATALOG_CONFIG_PREFIX, catalogName);
-      catalogProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE,
-          
conf.get(MetastoreConf.ConfVars.HIVE_ICEBERG_CATALOG_TYPE.getVarname()));
-    }
+    List<String> keyPrefixes = List.of(
+        InputFormatConfig.CATALOG_CONFIG_PREFIX + catalogName,

Review Comment:
   we've discussed it over a call



##########
itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java:
##########
@@ -697,6 +698,28 @@ public IcebergLlapLocalCliConfig() {
     }
   }
 
+  public static class TestIcebergLlapRESTCatalogLocalCliDriver extends 
AbstractCliConfig {

Review Comment:
   Done



##########
standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/client/builder/HiveMetaStoreClientBuilder.java:
##########
@@ -88,7 +90,7 @@ private static IMetaStoreClient createClient(Configuration 
conf, boolean allowEm
 
     IMetaStoreClient baseMetaStoreClient = null;
     try {
-      baseMetaStoreClient = JavaUtils.newInstance(mscClass,
+     baseMetaStoreClient = JavaUtils.newInstance(mscClass,

Review Comment:
   Done



##########
standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/client/builder/HiveMetaStoreClientBuilder.java:
##########
@@ -103,4 +106,10 @@ private static IMetaStoreClient createClient(Configuration 
conf, boolean allowEm
 
     return baseMetaStoreClient;
   }
+  
+  private static void setCatalogImpl(Configuration conf) {
+    String catalogType = MetaStoreUtils.getCatalogType(conf);
+    conf.set(MetastoreConf.ConfVars.METASTORE_CLIENT_IMPL.getVarname(),

Review Comment:
   As discusses, ATM we will set `METASTORE_CLIENT_IMPL` in the config until 
Hive gets multi-catalog support.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java:
##########
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
+import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.HMSTablePropertyHelper;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
+
+public class BaseHiveIcebergMetaHook implements HiveMetaHook {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseHiveIcebergMetaHook.class);
+  private static final ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper();
+  public static final Map<String, String> COMMON_HMS_PROPERTIES = 
ImmutableMap.of(
+      BaseMetastoreTableOperations.TABLE_TYPE_PROP, 
BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase()
+  );
+  private static final Set<String> PARAMETERS_TO_REMOVE = ImmutableSet
+      .of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME, 
InputFormatConfig.PARTITION_SPEC);
+  static final String ORC_FILES_ONLY = "iceberg.orc.files.only";
+
+  protected final Configuration conf;
+  protected Table icebergTable = null;
+  protected Properties catalogProperties;
+  protected boolean createHMSTableInHook = false;
+
+  public enum FileFormat {
+    ORC("orc"), PARQUET("parquet"), AVRO("avro");
+
+    private final String label;
+
+    FileFormat(String label) {
+      this.label = label;
+    }
+
+    public String getLabel() {
+      return label;
+    }
+  }
+
+  public BaseHiveIcebergMetaHook(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    CreateTableRequest request = new CreateTableRequest(hmsTable);
+    preCreateTable(request);
+  }
+
+  @Override
+  public void preCreateTable(CreateTableRequest request) {
+    org.apache.hadoop.hive.metastore.api.Table hmsTable = request.getTable();
+    if (hmsTable.isTemporary()) {
+      throw new UnsupportedOperationException("Creation of temporary iceberg 
tables is not supported.");
+    }
+    this.catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
+
+    // Set the table type even for non HiveCatalog based tables
+    hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+        BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase());
+
+    if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
+      if 
(Boolean.parseBoolean(this.catalogProperties.getProperty(hive_metastoreConstants.TABLE_IS_CTLT)))
 {
+        throw new UnsupportedOperationException("CTLT target table must be a 
HiveCatalog table.");
+      }
+      // For non-HiveCatalog tables too, we should set the input and output 
format
+      // so that the table can be read by other engines like Impala
+      
hmsTable.getSd().setInputFormat(HiveIcebergInputFormat.class.getCanonicalName());
+      
hmsTable.getSd().setOutputFormat(HiveIcebergOutputFormat.class.getCanonicalName());
+
+      // If not using HiveCatalog check for existing table
+      try {
+        this.icebergTable = IcebergTableUtil.getTable(conf, catalogProperties, 
true);
+
+        if (Catalogs.hadoopCatalog(conf, catalogProperties) && 
hmsTable.getSd() != null &&
+                hmsTable.getSd().getLocation() == null) {
+          hmsTable.getSd().setLocation(icebergTable.location());
+        }
+        
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
+            "Iceberg table already created - can not use provided schema");
+        
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
+            "Iceberg table already created - can not use provided partition 
specification");
+
+        LOG.info("Iceberg table already exists {}", icebergTable);
+        return;
+      } catch (NoSuchTableException nte) {
+        // If the table does not exist we will create it below
+      }
+    }
+
+    // If the table does not exist collect data for table creation
+    // - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC 
takes precedence so the user can override the
+    // Iceberg schema and specification generated by the code
+
+    Set<String> identifierFields = 
Optional.ofNullable(request.getPrimaryKeys())
+        .map(primaryKeys ->
+            
primaryKeys.stream().map(SQLPrimaryKey::getColumn_name).collect(Collectors.toSet()))
+        .orElse(Collections.emptySet());
+
+    Schema schema = schema(catalogProperties, hmsTable, identifierFields);
+    PartitionSpec spec = spec(conf, schema, hmsTable);
+
+    // If there are partition keys specified remove them from the HMS table 
and add them to the column list
+    if (hmsTable.isSetPartitionKeys()) {
+      hmsTable.getSd().getCols().addAll(hmsTable.getPartitionKeys());
+      hmsTable.setPartitionKeysIsSet(false);
+    }
+
+    catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(schema));
+    String specString = PartitionSpecParser.toJson(spec);
+    catalogProperties.put(InputFormatConfig.PARTITION_SPEC, specString);
+    validateCatalogConfigsDefined();
+
+    if (request.getEnvContext() == null) {
+      request.setEnvContext(new EnvironmentContext());
+    }
+    
request.getEnvContext().putToProperties(TableProperties.DEFAULT_PARTITION_SPEC, 
specString);
+    setCommonHmsTablePropertiesForIceberg(hmsTable);
+
+    if 
(hmsTable.getParameters().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP))
 {
+      createHMSTableInHook = true;
+    }
+
+    
assertFileFormat(catalogProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+
+    // Set whether the format is ORC, to be used during vectorization.
+    setOrcOnlyFilesParam(hmsTable);
+    // Remove hive primary key columns from table request, as iceberg doesn't 
support hive primary key.
+    request.setPrimaryKeys(null);
+    setSortOrder(hmsTable, schema, catalogProperties);
+  }
+
+  /**
+   * Method for verification that necessary catalog configs are defined in 
Session Conf.
+   *
+   * <p>If the catalog name is provided in 'iceberg.catalog' table property,
+   * and the name is not the default catalog and not hadoop catalog, checks 
that one of the two configs
+   * is defined in Session Conf: iceberg.catalog.<code>catalogName</code>.type
+   * or iceberg.catalog.<code>catalogName</code>.catalog-impl. See description 
in Catalogs.java for more details.
+   *
+   */
+  private void validateCatalogConfigsDefined() {
+    String catalogName = 
catalogProperties.getProperty(InputFormatConfig.CATALOG_NAME);
+    if (!StringUtils.isEmpty(catalogName) && 
!Catalogs.ICEBERG_HADOOP_TABLE_NAME.equals(catalogName) &&
+        !Catalogs.ICEBERG_DEFAULT_CATALOG_NAME.equals(catalogName)) {
+
+      boolean configsExist = 
!StringUtils.isEmpty(CatalogUtils.getCatalogType(conf, catalogName)) ||

Review Comment:
   If Session conf doesn't contain configs for the catalogName the type will be 
null.



##########
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive.client;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DropDatabaseRequest;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.client.BaseMetaStoreClient;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.HMSTablePropertyHelper;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.hive.RuntimeMetaException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveRESTCatalogClient extends BaseMetaStoreClient {
+
+  public static final String NAMESPACE_SEPARATOR = ".";
+  public static final String DB_OWNER = "owner";
+  public static final String DB_OWNER_TYPE = "ownerType";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveRESTCatalogClient.class);
+
+  private RESTCatalog restCatalog;
+
+  public HiveRESTCatalogClient(Configuration conf, boolean allowEmbedded) {
+    this(conf);
+  }
+
+  public HiveRESTCatalogClient(Configuration conf) {
+    super(conf);
+    reconnect();
+  }
+
+  @Override
+  public void reconnect()  {
+    close();
+    String catName = MetaStoreUtils.getDefaultCatalog(conf);
+    Map<String, String> properties = CatalogUtils.getCatalogProperties(conf, 
CatalogUtils.getCatalogName(conf));
+    restCatalog = (RESTCatalog) CatalogUtil.buildIcebergCatalog(catName, 
properties, null);
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (restCatalog != null) {
+        restCatalog.close();
+      }
+    } catch (IOException e) {
+      throw new RuntimeMetaException(e.getCause(), "Failed to close existing 
REST catalog");
+    }
+  }
+
+  @Override
+  public List<String> getDatabases(String catName, String dbPattern) {
+    validateCurrentCatalog(catName);
+    // Convert the Hive glob pattern (e.g., "db*") to a valid Java regex 
("db.*").
+    String regex = dbPattern.replace("*", ".*");
+    Pattern pattern = Pattern.compile(regex);
+
+    return restCatalog.listNamespaces(Namespace.empty()).stream()
+        .map(Namespace::toString)
+        .filter(pattern.asPredicate())
+        .toList();
+  }
+
+  @Override
+  public List<String> getAllDatabases(String catName) {
+    return getDatabases(catName, "*");
+  }
+
+  @Override
+  public List<String> getTables(String catName, String dbName, String 
tablePattern) {
+    validateCurrentCatalog(catName);
+
+    // Convert the Hive glob pattern to a Java regex.
+    String regex = tablePattern.replace("*", ".*");
+    Pattern pattern = Pattern.compile(regex);
+
+    // List tables from the specific database (namespace) and filter them.
+    return restCatalog.listTables(Namespace.of(dbName)).stream()
+        .map(TableIdentifier::name)
+        .filter(pattern.asPredicate())
+        .toList();
+  }
+
+  @Override
+  public List<String> getAllTables(String catName, String dbName) {
+    return getTables(catName, dbName, "*");
+  }
+
+  @Override
+  public void dropTable(Table table, boolean deleteData, boolean 
ignoreUnknownTab, boolean ifPurge) throws TException {
+    restCatalog.dropTable(TableIdentifier.of(table.getDbName(), 
table.getTableName()));
+  }
+
+  private void validateCurrentCatalog(String catName) {
+    if (!restCatalog.name().equals(catName)) {
+      throw new IllegalArgumentException(
+          String.format("Catalog name '%s' does not match the current catalog 
'%s'", catName, restCatalog.name()));
+    }
+  }
+
+  @Override
+  public boolean tableExists(String catName, String dbName, String tableName) {
+    validateCurrentCatalog(catName);
+    return restCatalog.tableExists(TableIdentifier.of(dbName, tableName));
+  }
+
+  @Override
+  public Database getDatabase(String catName, String dbName) {
+    validateCurrentCatalog(catName);
+
+    return restCatalog.listNamespaces(Namespace.empty()).stream()
+        .filter(namespace -> namespace.levels()[0].equals(dbName))
+        .map(namespace -> {
+          Database database = new Database();
+          database.setName(String.join(NAMESPACE_SEPARATOR, 
namespace.levels()));
+          Map<String, String> namespaceMetadata = 
restCatalog.loadNamespaceMetadata(Namespace.of(dbName));
+          
database.setLocationUri(namespaceMetadata.get(CatalogUtils.LOCATION));
+          database.setCatalogName(restCatalog.name());
+          database.setOwnerName(namespaceMetadata.get(DB_OWNER));
+          try {
+            
database.setOwnerType(PrincipalType.valueOf(namespaceMetadata.get(DB_OWNER_TYPE)));
+          } catch (Exception e) {
+            LOG.warn("Can not set ownerType: {}", 
namespaceMetadata.get(DB_OWNER_TYPE), e);
+          }
+          return database;
+        }).findFirst().get();
+  }
+
+  @Override
+  public Table getTable(GetTableRequest tableRequest) throws TException {
+    validateCurrentCatalog(tableRequest.getCatName());
+    org.apache.iceberg.Table icebergTable;
+    try {
+      icebergTable = 
restCatalog.loadTable(TableIdentifier.of(tableRequest.getDbName(),
+          tableRequest.getTblName()));
+    } catch (NoSuchTableException exception) {
+      throw new NoSuchObjectException();
+    }
+    return MetastoreUtil.toHiveTable(icebergTable, conf);
+  }
+
+  @Override
+  public void createTable(CreateTableRequest request) throws TException {
+    Table table = request.getTable();
+    List<FieldSchema> cols = Lists.newArrayList(table.getSd().getCols());
+    if (table.isSetPartitionKeys() && !table.getPartitionKeys().isEmpty()) {
+      cols.addAll(table.getPartitionKeys());
+    }
+    Properties catalogProperties = CatalogUtils.getCatalogProperties(table);
+    Schema schema = HiveSchemaUtil.convert(cols, true);
+    Map<String, String> envCtxProps = 
Optional.ofNullable(request.getEnvContext())
+        .map(EnvironmentContext::getProperties)
+        .orElse(Collections.emptyMap());
+    org.apache.iceberg.PartitionSpec partitionSpec =
+        HMSTablePropertyHelper.getPartitionSpec(envCtxProps, schema);
+    SortOrder sortOrder = 
HMSTablePropertyHelper.getSortOrder(catalogProperties, schema);
+
+    restCatalog.buildTable(TableIdentifier.of(table.getDbName(), 
table.getTableName()), schema)
+        .withPartitionSpec(partitionSpec)
+        .withLocation(catalogProperties.getProperty(CatalogUtils.LOCATION))
+        .withSortOrder(sortOrder)
+        .withProperties(catalogProperties.entrySet().stream()

Review Comment:
   Done



##########
iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog.q:
##########
@@ -0,0 +1,68 @@
+-- SORT_QUERY_RESULTS
+-- Mask neededVirtualColumns due to non-strict order
+--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
+-- Mask random uuid
+--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/
+-- Mask a random snapshot id
+--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
+-- Mask added file size
+--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask total file size
+--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask current-snapshot-timestamp-ms
+--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
+--! 
qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+--! 
qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+-- Mask compaction id as they will be allocated in parallel threads
+--! qt:replace:/^[0-9]/#Masked#/
+-- Mask removed file size
+--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask iceberg version
+--! 
qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/
+
+--! These settings are set in the driver setup (see 
TestIcebergRESTCatalogLlapLocalCliDriver.java)   
+--! conf.set('metastore.client.impl', 
'org.apache.iceberg.hive.client.HiveRESTCatalogClient');
+--! conf.set('metastore.catalog.default', 'ice01');
+--! conf.set('iceberg.catalog.ice01.type', 'rest');
+--! conf.set('iceberg.catalog.ice01.uri', <RESTServer URI>);
+
+--! Verify rest catalog properties are set in conf
+set metastore.client.impl;
+set metastore.catalog.default;
+set iceberg.catalog.ice01.type;

Review Comment:
   Yes, the following 3 can be set in test (Done):
   ```
   conf.set('metastore.client.impl', 
'org.apache.iceberg.hive.client.HiveRESTCatalogClient');
   conf.set('metastore.catalog.default', 'ice01');
   conf.set('iceberg.catalog.ice01.type', 'rest');
   ```
   but `'iceberg.catalog.ice01.uri'` has a dynamically assigned port, I don't 
think this can be set in the .q test.
   



##########
iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog.q:
##########
@@ -0,0 +1,37 @@
+-- SORT_QUERY_RESULTS
+-- Mask neededVirtualColumns due to non-strict order
+--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
+-- Mask random uuid
+--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/
+-- Mask a random snapshot id
+--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
+-- Mask added file size
+--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask total file size
+--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask current-snapshot-timestamp-ms
+--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
+--! 
qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+--! 
qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+-- Mask compaction id as they will be allocated in parallel threads
+--! qt:replace:/^[0-9]/#Masked#/
+-- Mask removed file size
+--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask iceberg version
+--! 
qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/
+
+create table ice_orc (
+    first_name string, 
+    last_name string,
+    dept_id bigint,
+    team_id bigint
+ )
+partitioned by (company_id bigint)
+stored by iceberg stored as orc;
+
+--! Verify rest catalog properties are set in conf
+set iceberg.catalog;

Review Comment:
   1. It is set in TestIcebergRESTCatalogLlapLocalCliDriver in the @Before 
method.
   2. We discussed that it will be done in future PRs when Hive add 
multi-catalog support.



##########
standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/client/builder/HiveMetaStoreClientBuilder.java:
##########
@@ -103,4 +106,10 @@ private static IMetaStoreClient createClient(Configuration 
conf, boolean allowEm
 
     return baseMetaStoreClient;
   }
+  
+  private static void setCatalogImpl(Configuration conf) {

Review Comment:
   Done



##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java:
##########
@@ -87,6 +88,34 @@ public class Warehouse {
   private boolean storageAuthCheck = false;
   private ReplChangeManager cm = null;
 
+  public enum CatalogType {

Review Comment:
   Done



##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java:
##########
@@ -1135,6 +1138,20 @@ public static String getDefaultCatalog(Configuration 
conf) {
     return catName;
   }
 
+  public static String getCatalogName(Configuration conf) {
+    return Optional.ofNullable(conf.get(ICEBERG_CATALOG))

Review Comment:
   Moved to Iceberg `MetastoreUtil`.



##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java:
##########
@@ -87,6 +88,34 @@ public class Warehouse {
   private boolean storageAuthCheck = false;
   private ReplChangeManager cm = null;
 
+  public enum CatalogType {
+    REST("rest", "org.apache.iceberg.hive.client.HiveRESTCatalogClient"),
+    DEFAULT("default", 
MetastoreConf.ConfVars.METASTORE_CLIENT_IMPL.getDefaultVal().toString());
+
+    private final String type;
+    private final String clientImplClass;
+
+    CatalogType(String type, String clientImplClass) {
+      this.type = type;
+      this.clientImplClass = clientImplClass;
+    }
+
+    public String getClientImplClass() {
+      return clientImplClass;
+    }
+
+    public static CatalogType getCatalogType(String type) {

Review Comment:
   Done



##########
ql/pom.xml:
##########
@@ -1120,10 +1120,6 @@
                   <pattern>com.fasterxml.jackson</pattern>
                   
<shadedPattern>org.apache.hive.com.fasterxml.jackson</shadedPattern>
                 </relocation>
-                <relocation>

Review Comment:
   I'm not sure, it was a test to check if that resolves q-test issue which I 
wasn't able to resolve other way.
   But it didn't seem to cause issue in precommit tests.



##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java:
##########
@@ -35,6 +35,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;

Review Comment:
   Done



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -257,7 +258,8 @@ public Class<? extends AbstractSerDe> getSerDeClass() {
   public HiveMetaHook getMetaHook() {
     // Make sure to always return a new instance here, as HiveIcebergMetaHook 
might hold state relevant for the
     // operation.
-    return new HiveIcebergMetaHook(conf);
+    return StringUtils.isEmpty(MetastoreUtil.getCatalogType(conf)) ?

Review Comment:
   Tested `hadoop_catalog_create_table.q`, when a table is created in that test 
it uses `HiveIcebergMetaHook`, not `BaseHiveIcebergMetaHook` because it sets 
`'iceberg.catalog'` in table properties and we don't check them when deciding 
which meta hook to create in storage handler, we check it in session conf only:
   
   ```
     @Override
     public HiveMetaHook getMetaHook() {
       // checks catalog type in Session Conf
       return StringUtils.isEmpty(MetastoreUtil.getCatalogType(conf)) ?
           new HiveIcebergMetaHook(conf) : new BaseHiveIcebergMetaHook(conf);
     }
   ```



##########
iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog.q:
##########
@@ -0,0 +1,43 @@
+-- SORT_QUERY_RESULTS
+-- Mask neededVirtualColumns due to non-strict order
+--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
+-- Mask random uuid
+--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/
+-- Mask a random snapshot id
+--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
+-- Mask added file size
+--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask total file size
+--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask current-snapshot-timestamp-ms
+--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
+--! 
qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+--! 
qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+-- Mask compaction id as they will be allocated in parallel threads
+--! qt:replace:/^[0-9]/#Masked#/
+-- Mask removed file size
+--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask iceberg version
+--! 
qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/
+
+--! These setting are set in the driver setup (see 
TestIcebergRESTCatalogLlapLocalCliDriver.java)   
+--! conf.set('metastore.client.impl', 
'org.apache.iceberg.hive.client.HiveRESTCatalogClient');
+--! conf.set('metastore.catalog.default', 'ice01');
+--! conf.set('iceberg.catalog.ice01.type', 'rest');
+--! conf.set('iceberg.catalog.ice01.uri', <RESTServer URI>);
+    
+create table ice_orc (
+    first_name string, 
+    last_name string,
+    dept_id bigint,
+    team_id bigint
+ )
+partitioned by (company_id bigint)
+stored by iceberg stored as orc;
+
+--! Verify rest catalog properties are set in conf
+set metastore.catalog.default;
+set iceberg.catalog.ice01.type;
+
+--! Output should contain: 'type' = 'rest'
+show create table ice_orc;

Review Comment:
   Added all of that and some more, except for `alter` - RESTCatalog doesn't 
support that.



##########
iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog.q:
##########
@@ -0,0 +1,43 @@
+-- SORT_QUERY_RESULTS
+-- Mask neededVirtualColumns due to non-strict order
+--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
+-- Mask random uuid
+--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/
+-- Mask a random snapshot id
+--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
+-- Mask added file size
+--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask total file size
+--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask current-snapshot-timestamp-ms
+--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
+--! 
qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+--! 
qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+-- Mask compaction id as they will be allocated in parallel threads
+--! qt:replace:/^[0-9]/#Masked#/
+-- Mask removed file size
+--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask iceberg version
+--! 
qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/
+
+--! These setting are set in the driver setup (see 
TestIcebergRESTCatalogLlapLocalCliDriver.java)   
+--! conf.set('metastore.client.impl', 
'org.apache.iceberg.hive.client.HiveRESTCatalogClient');
+--! conf.set('metastore.catalog.default', 'ice01');
+--! conf.set('iceberg.catalog.ice01.type', 'rest');
+--! conf.set('iceberg.catalog.ice01.uri', <RESTServer URI>);
+    
+create table ice_orc (
+    first_name string, 
+    last_name string,
+    dept_id bigint,
+    team_id bigint
+ )
+partitioned by (company_id bigint)
+stored by iceberg stored as orc;

Review Comment:
   Tested, initially it didn't work, added some fixes and now it works.



##########
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CatalogUtils.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class CatalogUtils {
+  public static final String NAME = "name";
+  public static final String LOCATION = "location";
+  public static final String CATALOG_NAME = "iceberg.catalog";
+  public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";
+  public static final String CATALOG_WAREHOUSE_TEMPLATE = 
"iceberg.catalog.%s.warehouse";
+  public static final String CATALOG_IMPL_TEMPLATE = 
"iceberg.catalog.%s.catalog-impl";
+  public static final String CATALOG_DEFAULT_CONFIG_PREFIX = 
"iceberg.catalog-default.";
+  public static final String ICEBERG_HADOOP_TABLE_NAME = 
"location_based_table";
+  public static final String NO_CATALOG_TYPE = "no catalog";
+  public static final Set<String> PROPERTIES_TO_REMOVE = ImmutableSet.of(
+      // We don't want to push down the metadata location props to Iceberg 
from HMS,
+      // since the snapshot pointer in HMS would always be one step ahead
+      BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
+      BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP);
+
+  private CatalogUtils() {
+
+  }
+
+  /**
+   * Calculates the properties we would like to send to the catalog.
+   * <ul>
+   * <li>The base of the properties is the properties stored at the Hive 
Metastore for the given table
+   * <li>We add the {@link CatalogUtils#LOCATION} as the table location
+   * <li>We add the {@link CatalogUtils#NAME} as
+   * TableIdentifier defined by the database name and table name
+   * <li>We add the serdeProperties of the HMS table
+   * <li>We remove some parameters that we don't want to push down to the 
Iceberg table props
+   * </ul>
+   * @param hmsTable Table for which we are calculating the properties
+   * @return The properties we can provide for Iceberg functions
+   */
+  public static Properties 
getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+    Properties properties = new Properties();
+    properties.putAll(toIcebergProperties(hmsTable.getParameters()));
+
+    if (properties.get(LOCATION) == null && hmsTable.getSd() != null &&
+        hmsTable.getSd().getLocation() != null) {
+      properties.put(LOCATION, hmsTable.getSd().getLocation());
+    }
+
+    if (properties.get(NAME) == null) {
+      properties.put(NAME, TableIdentifier.of(hmsTable.getDbName(),
+          hmsTable.getTableName()).toString());
+    }
+
+    SerDeInfo serdeInfo = hmsTable.getSd().getSerdeInfo();
+    if (serdeInfo != null) {
+      properties.putAll(toIcebergProperties(serdeInfo.getParameters()));
+    }
+
+    // Remove HMS table parameters we don't want to propagate to Iceberg
+    PROPERTIES_TO_REMOVE.forEach(properties::remove);
+
+    return properties;
+  }
+
+  private static Properties toIcebergProperties(Map<String, String> 
parameters) {
+    Properties properties = new Properties();
+    parameters.entrySet().stream()
+        .filter(e -> e.getKey() != null && e.getValue() != null)
+        .forEach(e -> {
+          String icebergKey = 
HMSTablePropertyHelper.translateToIcebergProp(e.getKey());
+          properties.put(icebergKey, e.getValue());
+        });
+    return properties;
+  }
+
+  /**
+   * Collect all the catalog specific configuration from the global hive 
configuration.
+   * @param conf a Hadoop configuration
+   * @param catalogName name of the catalog
+   * @return complete map of catalog properties
+   */
+  public static Map<String, String> getCatalogProperties(Configuration conf, 
String catalogName) {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+    String keyPrefix = CATALOG_CONFIG_PREFIX + catalogName;
+    conf.forEach(config -> {
+      if 
(config.getKey().startsWith(CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX)) {
+        catalogProperties.putIfAbsent(
+            
config.getKey().substring(CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX.length()),
+            config.getValue());
+      } else if (config.getKey().startsWith(keyPrefix)) {
+        catalogProperties.put(
+            config.getKey().substring(keyPrefix.length() + 1),
+            config.getValue());
+      }
+    });
+
+    return catalogProperties;
+  }
+
+  public static String getCatalogName(Configuration conf) {
+    return MetastoreConf.getVar(conf, MetastoreConf.ConfVars.CATALOG_DEFAULT);
+  }
+
+  public static String getCatalogType(Configuration conf) {
+    return getCatalogType(conf, CatalogUtils.getCatalogName(conf));
+  }
+
+  public static String getCatalogType(Configuration conf, Properties 
catalogProperties) {
+    return 
Optional.ofNullable(catalogProperties.getProperty(CatalogUtils.CATALOG_NAME))
+        .or(() -> Optional.ofNullable(MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.CATALOG_DEFAULT)))
+        .map(catName -> getCatalogType(conf, catName))
+        .orElse(null);
+  }
+
+  /**
+   * Get Hadoop config key of a catalog property based on catalog name
+   * @param catalogName catalog name
+   * @param catalogProperty catalog property, can be any custom property,
+   *                        a commonly used list of properties can be found
+   *                        at {@link org.apache.iceberg.CatalogProperties}
+   * @return Hadoop config key of a catalog property for the catalog name
+   */
+  public static String catalogPropertyConfigKey(String catalogName, String 
catalogProperty) {
+    return String.format("%s%s.%s", CATALOG_CONFIG_PREFIX, catalogName, 
catalogProperty);
+  }
+
+  /**
+   * Return the catalog type based on the catalog name.
+   * <p>
+   * See Catalogs documentation for catalog type resolution strategy.
+   *
+   * @param conf global hive configuration
+   * @param catalogName name of the catalog
+   * @return type of the catalog, can be null
+   */
+  public static String getCatalogType(Configuration conf, String catalogName) {
+    if (catalogName != null) {
+      String catalogType = conf.get(catalogPropertyConfigKey(
+          catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE));
+      if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME)) {
+        return NO_CATALOG_TYPE;
+      } else {
+        return catalogType;
+      }
+    } else {
+      String catalogType = conf.get(CatalogUtil.ICEBERG_CATALOG_TYPE);
+      if (catalogType != null && catalogType.equals(LOCATION)) {
+        return NO_CATALOG_TYPE;
+      } else {
+        return catalogType;
+      }
+    }
+  }
+
+  public static String getCatalogImpl(Configuration conf, String catName) {

Review Comment:
   Done



##########
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CatalogUtils.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class CatalogUtils {
+  public static final String NAME = "name";
+  public static final String LOCATION = "location";
+  public static final String CATALOG_NAME = "iceberg.catalog";
+  public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";
+  public static final String CATALOG_WAREHOUSE_TEMPLATE = 
"iceberg.catalog.%s.warehouse";
+  public static final String CATALOG_IMPL_TEMPLATE = 
"iceberg.catalog.%s.catalog-impl";
+  public static final String CATALOG_DEFAULT_CONFIG_PREFIX = 
"iceberg.catalog-default.";
+  public static final String ICEBERG_HADOOP_TABLE_NAME = 
"location_based_table";
+  public static final String NO_CATALOG_TYPE = "no catalog";
+  public static final Set<String> PROPERTIES_TO_REMOVE = ImmutableSet.of(
+      // We don't want to push down the metadata location props to Iceberg 
from HMS,
+      // since the snapshot pointer in HMS would always be one step ahead
+      BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
+      BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP);
+
+  private CatalogUtils() {
+
+  }
+
+  /**
+   * Calculates the properties we would like to send to the catalog.
+   * <ul>
+   * <li>The base of the properties is the properties stored at the Hive 
Metastore for the given table
+   * <li>We add the {@link CatalogUtils#LOCATION} as the table location
+   * <li>We add the {@link CatalogUtils#NAME} as
+   * TableIdentifier defined by the database name and table name
+   * <li>We add the serdeProperties of the HMS table
+   * <li>We remove some parameters that we don't want to push down to the 
Iceberg table props
+   * </ul>
+   * @param hmsTable Table for which we are calculating the properties
+   * @return The properties we can provide for Iceberg functions
+   */
+  public static Properties 
getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+    Properties properties = new Properties();
+    properties.putAll(toIcebergProperties(hmsTable.getParameters()));
+
+    if (properties.get(LOCATION) == null && hmsTable.getSd() != null &&
+        hmsTable.getSd().getLocation() != null) {
+      properties.put(LOCATION, hmsTable.getSd().getLocation());
+    }
+
+    if (properties.get(NAME) == null) {
+      properties.put(NAME, TableIdentifier.of(hmsTable.getDbName(),
+          hmsTable.getTableName()).toString());
+    }
+
+    SerDeInfo serdeInfo = hmsTable.getSd().getSerdeInfo();
+    if (serdeInfo != null) {
+      properties.putAll(toIcebergProperties(serdeInfo.getParameters()));
+    }
+
+    // Remove HMS table parameters we don't want to propagate to Iceberg
+    PROPERTIES_TO_REMOVE.forEach(properties::remove);
+
+    return properties;
+  }
+
+  private static Properties toIcebergProperties(Map<String, String> 
parameters) {
+    Properties properties = new Properties();
+    parameters.entrySet().stream()
+        .filter(e -> e.getKey() != null && e.getValue() != null)
+        .forEach(e -> {
+          String icebergKey = 
HMSTablePropertyHelper.translateToIcebergProp(e.getKey());
+          properties.put(icebergKey, e.getValue());
+        });
+    return properties;
+  }
+
+  /**
+   * Collect all the catalog specific configuration from the global hive 
configuration.
+   * @param conf a Hadoop configuration
+   * @param catalogName name of the catalog
+   * @return complete map of catalog properties
+   */
+  public static Map<String, String> getCatalogProperties(Configuration conf, 
String catalogName) {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+    String keyPrefix = CATALOG_CONFIG_PREFIX + catalogName;
+    conf.forEach(config -> {
+      if 
(config.getKey().startsWith(CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX)) {
+        catalogProperties.putIfAbsent(
+            
config.getKey().substring(CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX.length()),
+            config.getValue());
+      } else if (config.getKey().startsWith(keyPrefix)) {
+        catalogProperties.put(
+            config.getKey().substring(keyPrefix.length() + 1),
+            config.getValue());
+      }
+    });
+
+    return catalogProperties;
+  }
+
+  public static String getCatalogName(Configuration conf) {
+    return MetastoreConf.getVar(conf, MetastoreConf.ConfVars.CATALOG_DEFAULT);
+  }
+
+  public static String getCatalogType(Configuration conf) {
+    return getCatalogType(conf, CatalogUtils.getCatalogName(conf));
+  }
+
+  public static String getCatalogType(Configuration conf, Properties 
catalogProperties) {
+    return 
Optional.ofNullable(catalogProperties.getProperty(CatalogUtils.CATALOG_NAME))

Review Comment:
   Done, but since NO_CATALOG_TYPE is reserved for location-based tables, 
updated as following:
   
   ```
       String catalogName = catalogProperties.getProperty(
           CatalogUtils.CATALOG_NAME, 
           MetastoreConf.getVar(conf, MetastoreConf.ConfVars.CATALOG_DEFAULT));
       return getCatalogType(conf, catalogName);
   ```



##########
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CatalogUtils.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class CatalogUtils {
+  public static final String NAME = "name";
+  public static final String LOCATION = "location";
+  public static final String CATALOG_NAME = "iceberg.catalog";
+  public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";
+  public static final String CATALOG_WAREHOUSE_TEMPLATE = 
"iceberg.catalog.%s.warehouse";
+  public static final String CATALOG_IMPL_TEMPLATE = 
"iceberg.catalog.%s.catalog-impl";
+  public static final String CATALOG_DEFAULT_CONFIG_PREFIX = 
"iceberg.catalog-default.";
+  public static final String ICEBERG_HADOOP_TABLE_NAME = 
"location_based_table";
+  public static final String NO_CATALOG_TYPE = "no catalog";
+  public static final Set<String> PROPERTIES_TO_REMOVE = ImmutableSet.of(
+      // We don't want to push down the metadata location props to Iceberg 
from HMS,
+      // since the snapshot pointer in HMS would always be one step ahead
+      BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
+      BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP);
+
+  private CatalogUtils() {
+
+  }
+
+  /**
+   * Calculates the properties we would like to send to the catalog.
+   * <ul>
+   * <li>The base of the properties is the properties stored at the Hive 
Metastore for the given table
+   * <li>We add the {@link CatalogUtils#LOCATION} as the table location
+   * <li>We add the {@link CatalogUtils#NAME} as
+   * TableIdentifier defined by the database name and table name
+   * <li>We add the serdeProperties of the HMS table
+   * <li>We remove some parameters that we don't want to push down to the 
Iceberg table props
+   * </ul>
+   * @param hmsTable Table for which we are calculating the properties
+   * @return The properties we can provide for Iceberg functions
+   */
+  public static Properties 
getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+    Properties properties = new Properties();
+    properties.putAll(toIcebergProperties(hmsTable.getParameters()));
+
+    if (properties.get(LOCATION) == null && hmsTable.getSd() != null &&
+        hmsTable.getSd().getLocation() != null) {
+      properties.put(LOCATION, hmsTable.getSd().getLocation());
+    }
+
+    if (properties.get(NAME) == null) {
+      properties.put(NAME, TableIdentifier.of(hmsTable.getDbName(),
+          hmsTable.getTableName()).toString());
+    }
+
+    SerDeInfo serdeInfo = hmsTable.getSd().getSerdeInfo();
+    if (serdeInfo != null) {
+      properties.putAll(toIcebergProperties(serdeInfo.getParameters()));
+    }
+
+    // Remove HMS table parameters we don't want to propagate to Iceberg
+    PROPERTIES_TO_REMOVE.forEach(properties::remove);
+
+    return properties;
+  }
+
+  private static Properties toIcebergProperties(Map<String, String> 
parameters) {
+    Properties properties = new Properties();
+    parameters.entrySet().stream()
+        .filter(e -> e.getKey() != null && e.getValue() != null)
+        .forEach(e -> {
+          String icebergKey = 
HMSTablePropertyHelper.translateToIcebergProp(e.getKey());
+          properties.put(icebergKey, e.getValue());
+        });
+    return properties;
+  }
+
+  /**
+   * Collect all the catalog specific configuration from the global hive 
configuration.
+   * @param conf a Hadoop configuration
+   * @param catalogName name of the catalog
+   * @return complete map of catalog properties
+   */
+  public static Map<String, String> getCatalogProperties(Configuration conf, 
String catalogName) {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+    String keyPrefix = CATALOG_CONFIG_PREFIX + catalogName;
+    conf.forEach(config -> {
+      if 
(config.getKey().startsWith(CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX)) {
+        catalogProperties.putIfAbsent(
+            
config.getKey().substring(CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX.length()),
+            config.getValue());
+      } else if (config.getKey().startsWith(keyPrefix)) {
+        catalogProperties.put(
+            config.getKey().substring(keyPrefix.length() + 1),
+            config.getValue());
+      }
+    });
+
+    return catalogProperties;
+  }
+
+  public static String getCatalogName(Configuration conf) {
+    return MetastoreConf.getVar(conf, MetastoreConf.ConfVars.CATALOG_DEFAULT);
+  }
+
+  public static String getCatalogType(Configuration conf) {
+    return getCatalogType(conf, CatalogUtils.getCatalogName(conf));
+  }
+
+  public static String getCatalogType(Configuration conf, Properties 
catalogProperties) {
+    return 
Optional.ofNullable(catalogProperties.getProperty(CatalogUtils.CATALOG_NAME))
+        .or(() -> Optional.ofNullable(MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.CATALOG_DEFAULT)))
+        .map(catName -> getCatalogType(conf, catName))
+        .orElse(null);
+  }
+
+  /**
+   * Get Hadoop config key of a catalog property based on catalog name
+   * @param catalogName catalog name
+   * @param catalogProperty catalog property, can be any custom property,
+   *                        a commonly used list of properties can be found
+   *                        at {@link org.apache.iceberg.CatalogProperties}
+   * @return Hadoop config key of a catalog property for the catalog name
+   */
+  public static String catalogPropertyConfigKey(String catalogName, String 
catalogProperty) {
+    return String.format("%s%s.%s", CATALOG_CONFIG_PREFIX, catalogName, 
catalogProperty);
+  }
+
+  /**
+   * Return the catalog type based on the catalog name.
+   * <p>
+   * See Catalogs documentation for catalog type resolution strategy.
+   *
+   * @param conf global hive configuration
+   * @param catalogName name of the catalog
+   * @return type of the catalog, can be null
+   */
+  public static String getCatalogType(Configuration conf, String catalogName) {
+    if (catalogName != null) {
+      String catalogType = conf.get(catalogPropertyConfigKey(
+          catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE));
+      if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME)) {
+        return NO_CATALOG_TYPE;
+      } else {
+        return catalogType;
+      }
+    } else {
+      String catalogType = conf.get(CatalogUtil.ICEBERG_CATALOG_TYPE);
+      if (catalogType != null && catalogType.equals(LOCATION)) {
+        return NO_CATALOG_TYPE;
+      } else {
+        return catalogType;
+      }
+    }
+  }
+
+  public static String getCatalogImpl(Configuration conf, String catName) {
+    return Optional.ofNullable(catName)
+        .filter(StringUtils::isNotEmpty)
+        .map(name -> String.format(CatalogUtils.CATALOG_IMPL_TEMPLATE, name))
+        .map(conf::get)
+        .orElse("");

Review Comment:
   Done



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
+import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.HMSTablePropertyHelper;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
+
+public class BaseHiveIcebergMetaHook implements HiveMetaHook {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseHiveIcebergMetaHook.class);
+  private static final ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper();
+  public static final Map<String, String> COMMON_HMS_PROPERTIES = 
ImmutableMap.of(
+      BaseMetastoreTableOperations.TABLE_TYPE_PROP, 
BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase()
+  );
+  private static final Set<String> PARAMETERS_TO_REMOVE = ImmutableSet
+      .of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME, 
InputFormatConfig.PARTITION_SPEC);
+  static final String ORC_FILES_ONLY = "iceberg.orc.files.only";
+
+  protected final Configuration conf;
+  protected Table icebergTable = null;
+  protected Properties catalogProperties;
+  protected boolean createHMSTableInHook = false;
+
+  public enum FileFormat {
+    ORC("orc"), PARQUET("parquet"), AVRO("avro");
+
+    private final String label;
+
+    FileFormat(String label) {
+      this.label = label;
+    }
+
+    public String getLabel() {
+      return label;
+    }
+  }
+
+  public BaseHiveIcebergMetaHook(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    CreateTableRequest request = new CreateTableRequest(hmsTable);
+    preCreateTable(request);
+  }
+
+  @Override
+  public void preCreateTable(CreateTableRequest request) {
+    org.apache.hadoop.hive.metastore.api.Table hmsTable = request.getTable();
+    if (hmsTable.isTemporary()) {
+      throw new UnsupportedOperationException("Creation of temporary iceberg 
tables is not supported.");
+    }
+    this.catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
+
+    // Set the table type even for non HiveCatalog based tables
+    hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+        BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase());
+
+    if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
+      if 
(Boolean.parseBoolean(this.catalogProperties.getProperty(hive_metastoreConstants.TABLE_IS_CTLT)))
 {
+        throw new UnsupportedOperationException("CTLT target table must be a 
HiveCatalog table.");
+      }
+      // For non-HiveCatalog tables too, we should set the input and output 
format
+      // so that the table can be read by other engines like Impala
+      
hmsTable.getSd().setInputFormat(HiveIcebergInputFormat.class.getCanonicalName());
+      
hmsTable.getSd().setOutputFormat(HiveIcebergOutputFormat.class.getCanonicalName());
+
+      // If not using HiveCatalog check for existing table
+      try {
+        this.icebergTable = IcebergTableUtil.getTable(conf, catalogProperties, 
true);
+
+        if (Catalogs.hadoopCatalog(conf, catalogProperties) && 
hmsTable.getSd() != null &&
+                hmsTable.getSd().getLocation() == null) {
+          hmsTable.getSd().setLocation(icebergTable.location());
+        }
+        
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
+            "Iceberg table already created - can not use provided schema");
+        
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
+            "Iceberg table already created - can not use provided partition 
specification");
+
+        LOG.info("Iceberg table already exists {}", icebergTable);
+        return;
+      } catch (NoSuchTableException nte) {
+        // If the table does not exist we will create it below
+      }
+    }
+
+    // If the table does not exist collect data for table creation
+    // - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC 
takes precedence so the user can override the
+    // Iceberg schema and specification generated by the code
+
+    Set<String> identifierFields = 
Optional.ofNullable(request.getPrimaryKeys())
+        .map(primaryKeys ->
+            
primaryKeys.stream().map(SQLPrimaryKey::getColumn_name).collect(Collectors.toSet()))
+        .orElse(Collections.emptySet());
+
+    Schema schema = schema(catalogProperties, hmsTable, identifierFields);
+    PartitionSpec spec = spec(conf, schema, hmsTable);
+
+    // If there are partition keys specified remove them from the HMS table 
and add them to the column list
+    if (hmsTable.isSetPartitionKeys()) {
+      hmsTable.getSd().getCols().addAll(hmsTable.getPartitionKeys());
+      hmsTable.setPartitionKeysIsSet(false);
+    }
+
+    catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(schema));
+    String specString = PartitionSpecParser.toJson(spec);
+    catalogProperties.put(InputFormatConfig.PARTITION_SPEC, specString);
+
+    if (request.getEnvContext() == null) {
+      request.setEnvContext(new EnvironmentContext());
+    }
+    
request.getEnvContext().putToProperties(TableProperties.DEFAULT_PARTITION_SPEC, 
specString);
+    setCommonHmsTablePropertiesForIceberg(hmsTable);
+
+    if 
(hmsTable.getParameters().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP))
 {
+      createHMSTableInHook = true;
+    }
+
+    
assertFileFormat(catalogProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+
+    // Set whether the format is ORC, to be used during vectorization.
+    setOrcOnlyFilesParam(hmsTable);
+    // Remove hive primary key columns from table request, as iceberg doesn't 
support hive primary key.
+    request.setPrimaryKeys(null);
+    setSortOrder(hmsTable, schema, catalogProperties);
+  }
+
+  private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, Schema schema,
+      Properties properties) {
+    String sortOderJSONString = 
hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER);
+    SortFields sortFields = null;
+    if (!Strings.isNullOrEmpty(sortOderJSONString)) {
+      try {
+        sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOderJSONString, 
SortFields.class);
+      } catch (Exception e) {
+        LOG.warn("Can not read write order json: {}", sortOderJSONString, e);
+        return;
+      }
+      if (sortFields != null && !sortFields.getSortFields().isEmpty()) {
+        SortOrder.Builder sortOderBuilder = SortOrder.builderFor(schema);
+        sortFields.getSortFields().forEach(fieldDesc -> {
+          NullOrder nullOrder = fieldDesc.getNullOrdering() == 
NullOrdering.NULLS_FIRST ?
+              NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
+          SortDirection sortDirection = fieldDesc.getDirection() == 
SortFieldDesc.SortDirection.ASC ?
+              SortDirection.ASC : SortDirection.DESC;
+          sortOderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, 
nullOrder);
+        });
+        properties.put(TableProperties.DEFAULT_SORT_ORDER, 
SortOrderParser.toJson(sortOderBuilder.build()));
+      }
+    }
+  }
+
+  @Override
+  public void rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    // do nothing
+  }
+
+  @Override
+  public void commitCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    // do nothing
+  }
+
+  @Override
+  public void preDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    // do nothing
+  }
+
+  @Override
+  public void rollbackDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    // do nothing
+  }
+
+  @Override
+  public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, boolean deleteData) {
+    // do nothing
+  }
+
+  @Override
+  public boolean createHMSTableInHook() {
+    return createHMSTableInHook;
+  }
+
+  private static void assertFileFormat(String format) {
+    if (format == null) {
+      return;
+    }
+    String lowerCaseFormat = format.toLowerCase();
+    Preconditions.checkArgument(Arrays.stream(FileFormat.values()).anyMatch(v 
-> lowerCaseFormat.contains(v.label)),
+        String.format("Unsupported fileformat %s", format));
+  }
+
+  protected void 
setCommonHmsTablePropertiesForIceberg(org.apache.hadoop.hive.metastore.api.Table
 hmsTable) {
+    // If the table is not managed by Hive or Hadoop catalog, then the 
location should be set
+    if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
+      String location = (hmsTable.getSd() != null) ? 
hmsTable.getSd().getLocation() : null;
+      if (location == null && Catalogs.hadoopCatalog(conf, catalogProperties)) 
{
+        location = IcebergTableUtil.defaultWarehouseLocation(
+            TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()),
+            conf, catalogProperties);
+        hmsTable.getSd().setLocation(location);
+      }
+      Preconditions.checkArgument(location != null, "Table location not set");
+    }
+
+    Map<String, String> hmsParams = hmsTable.getParameters();
+    COMMON_HMS_PROPERTIES.forEach(hmsParams::putIfAbsent);
+
+    // Remove null values from hms table properties
+    hmsParams.entrySet().removeIf(e -> e.getKey() == null || e.getValue() == 
null);
+
+    // Remove creation related properties
+    PARAMETERS_TO_REMOVE.forEach(hmsParams::remove);
+
+    setWriteModeDefaults(null, hmsParams, null);
+  }
+
+  protected Schema schema(Properties properties, 
org.apache.hadoop.hive.metastore.api.Table hmsTable,
+                        Set<String> identifierFields) {
+    boolean autoConversion = 
conf.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
+
+    if (properties.getProperty(InputFormatConfig.TABLE_SCHEMA) != null) {
+      return 
SchemaParser.fromJson(properties.getProperty(InputFormatConfig.TABLE_SCHEMA));
+    }
+    List<FieldSchema> cols = Lists.newArrayList(hmsTable.getSd().getCols());
+    if (hmsTable.isSetPartitionKeys() && 
!hmsTable.getPartitionKeys().isEmpty()) {
+      cols.addAll(hmsTable.getPartitionKeys());
+    }
+    Schema schema = HiveSchemaUtil.convert(cols, autoConversion);
+
+    return getSchemaWithIdentifierFields(schema, identifierFields);
+  }
+
+  private Schema getSchemaWithIdentifierFields(Schema schema, Set<String> 
identifierFields) {
+    if (identifierFields == null || identifierFields.isEmpty()) {
+      return schema;
+    }
+    Set<Integer> identifierFieldIds = identifierFields.stream()
+            .map(column -> {
+              Types.NestedField field = schema.findField(column);
+              Preconditions.checkNotNull(field,
+                      "Cannot find identifier field ID for the column %s in 
schema %s", column, schema);
+              return field.fieldId();
+            })
+            .collect(Collectors.toSet());
+
+    List<Types.NestedField> cols = schema.columns().stream()
+            .map(column -> identifierFieldIds.contains(column.fieldId()) ? 
column.asRequired() : column)
+            .toList();
+
+    return new Schema(cols, identifierFieldIds);
+  }
+
+  protected static PartitionSpec spec(Configuration configuration, Schema 
schema,
+      org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+
+    Preconditions.checkArgument(!hmsTable.isSetPartitionKeys() || 
hmsTable.getPartitionKeys().isEmpty(),
+        "We can only handle non-partitioned Hive tables. The Iceberg schema 
should be in " +
+            InputFormatConfig.PARTITION_SPEC + " or already converted to a 
partition transform ");
+
+    PartitionSpec spec = IcebergTableUtil.spec(configuration, schema);
+    if (spec != null) {
+      
Preconditions.checkArgument(hmsTable.getParameters().get(InputFormatConfig.PARTITION_SPEC)
 == null,
+          "Provide only one of the following: Hive partition transform 
specification, or the " +
+              InputFormatConfig.PARTITION_SPEC + " property");
+      return spec;
+    }
+
+    return HMSTablePropertyHelper.getPartitionSpec(hmsTable.getParameters(), 
schema);
+  }
+
+  protected void 
setOrcOnlyFilesParam(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+    if (isOrcOnlyFiles(hmsTable)) {

Review Comment:
   Done



##########
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive.client;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DropDatabaseRequest;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.client.BaseMetaStoreClient;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.HMSTablePropertyHelper;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.hive.RuntimeMetaException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveRESTCatalogClient extends BaseMetaStoreClient {
+
+  public static final String NAMESPACE_SEPARATOR = ".";
+  public static final String DB_OWNER = "owner";
+  public static final String DB_OWNER_TYPE = "ownerType";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveRESTCatalogClient.class);
+
+  private RESTCatalog restCatalog;
+
+  public HiveRESTCatalogClient(Configuration conf, boolean allowEmbedded) {
+    this(conf);
+  }
+
+  public HiveRESTCatalogClient(Configuration conf) {
+    super(conf);
+    reconnect();
+  }
+
+  @Override
+  public void reconnect()  {
+    close();
+    String catName = MetaStoreUtils.getDefaultCatalog(conf);
+    Map<String, String> properties = CatalogUtils.getCatalogProperties(conf, 
CatalogUtils.getCatalogName(conf));
+    restCatalog = (RESTCatalog) CatalogUtil.buildIcebergCatalog(catName, 
properties, null);
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (restCatalog != null) {
+        restCatalog.close();
+      }
+    } catch (IOException e) {
+      throw new RuntimeMetaException(e.getCause(), "Failed to close existing 
REST catalog");
+    }
+  }
+
+  @Override
+  public List<String> getDatabases(String catName, String dbPattern) {
+    validateCurrentCatalog(catName);
+    // Convert the Hive glob pattern (e.g., "db*") to a valid Java regex 
("db.*").
+    String regex = dbPattern.replace("*", ".*");
+    Pattern pattern = Pattern.compile(regex);
+
+    return restCatalog.listNamespaces(Namespace.empty()).stream()
+        .map(Namespace::toString)
+        .filter(pattern.asPredicate())
+        .toList();
+  }
+
+  @Override
+  public List<String> getAllDatabases(String catName) {
+    return getDatabases(catName, "*");
+  }
+
+  @Override
+  public List<String> getTables(String catName, String dbName, String 
tablePattern) {
+    validateCurrentCatalog(catName);
+
+    // Convert the Hive glob pattern to a Java regex.
+    String regex = tablePattern.replace("*", ".*");
+    Pattern pattern = Pattern.compile(regex);
+
+    // List tables from the specific database (namespace) and filter them.
+    return restCatalog.listTables(Namespace.of(dbName)).stream()
+        .map(TableIdentifier::name)
+        .filter(pattern.asPredicate())
+        .toList();
+  }
+
+  @Override
+  public List<String> getAllTables(String catName, String dbName) {
+    return getTables(catName, dbName, "*");
+  }
+
+  @Override
+  public void dropTable(Table table, boolean deleteData, boolean 
ignoreUnknownTab, boolean ifPurge) throws TException {
+    restCatalog.dropTable(TableIdentifier.of(table.getDbName(), 
table.getTableName()));
+  }
+
+  private void validateCurrentCatalog(String catName) {
+    if (!restCatalog.name().equals(catName)) {
+      throw new IllegalArgumentException(
+          String.format("Catalog name '%s' does not match the current catalog 
'%s'", catName, restCatalog.name()));
+    }
+  }
+
+  @Override
+  public boolean tableExists(String catName, String dbName, String tableName) {
+    validateCurrentCatalog(catName);
+    return restCatalog.tableExists(TableIdentifier.of(dbName, tableName));
+  }
+
+  @Override
+  public Database getDatabase(String catName, String dbName) {
+    validateCurrentCatalog(catName);
+
+    return restCatalog.listNamespaces(Namespace.empty()).stream()
+        .filter(namespace -> namespace.levels()[0].equals(dbName))
+        .map(namespace -> {
+          Database database = new Database();
+          database.setName(String.join(NAMESPACE_SEPARATOR, 
namespace.levels()));
+          Map<String, String> namespaceMetadata = 
restCatalog.loadNamespaceMetadata(Namespace.of(dbName));
+          
database.setLocationUri(namespaceMetadata.get(CatalogUtils.LOCATION));
+          database.setCatalogName(restCatalog.name());
+          database.setOwnerName(namespaceMetadata.get(DB_OWNER));
+          try {
+            
database.setOwnerType(PrincipalType.valueOf(namespaceMetadata.get(DB_OWNER_TYPE)));
+          } catch (Exception e) {
+            LOG.warn("Can not set ownerType: {}", 
namespaceMetadata.get(DB_OWNER_TYPE), e);
+          }
+          return database;
+        }).findFirst().get();

Review Comment:
   Changed to throw `NoSuchObjectException` if db is not found.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java:
##########
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
+import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.HMSTablePropertyHelper;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
+
+public class BaseHiveIcebergMetaHook implements HiveMetaHook {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseHiveIcebergMetaHook.class);
+  private static final ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper();
+  public static final Map<String, String> COMMON_HMS_PROPERTIES = 
ImmutableMap.of(
+      BaseMetastoreTableOperations.TABLE_TYPE_PROP, 
BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase()
+  );
+  private static final Set<String> PARAMETERS_TO_REMOVE = ImmutableSet
+      .of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME, 
InputFormatConfig.PARTITION_SPEC);
+  static final String ORC_FILES_ONLY = "iceberg.orc.files.only";
+
+  protected final Configuration conf;
+  protected Table icebergTable = null;
+  protected Properties catalogProperties;
+  protected boolean createHMSTableInHook = false;
+
+  public enum FileFormat {
+    ORC("orc"), PARQUET("parquet"), AVRO("avro");
+
+    private final String label;
+
+    FileFormat(String label) {
+      this.label = label;
+    }
+
+    public String getLabel() {
+      return label;
+    }
+  }
+
+  public BaseHiveIcebergMetaHook(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    CreateTableRequest request = new CreateTableRequest(hmsTable);
+    preCreateTable(request);
+  }
+
+  @Override
+  public void preCreateTable(CreateTableRequest request) {
+    org.apache.hadoop.hive.metastore.api.Table hmsTable = request.getTable();
+    if (hmsTable.isTemporary()) {
+      throw new UnsupportedOperationException("Creation of temporary iceberg 
tables is not supported.");
+    }
+    this.catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
+
+    // Set the table type even for non HiveCatalog based tables
+    hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+        BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase());
+
+    if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
+      if 
(Boolean.parseBoolean(this.catalogProperties.getProperty(hive_metastoreConstants.TABLE_IS_CTLT)))
 {
+        throw new UnsupportedOperationException("CTLT target table must be a 
HiveCatalog table.");
+      }
+      // For non-HiveCatalog tables too, we should set the input and output 
format
+      // so that the table can be read by other engines like Impala
+      
hmsTable.getSd().setInputFormat(HiveIcebergInputFormat.class.getCanonicalName());
+      
hmsTable.getSd().setOutputFormat(HiveIcebergOutputFormat.class.getCanonicalName());
+
+      // If not using HiveCatalog check for existing table
+      try {
+        this.icebergTable = IcebergTableUtil.getTable(conf, catalogProperties, 
true);
+
+        if (Catalogs.hadoopCatalog(conf, catalogProperties) && 
hmsTable.getSd() != null &&
+                hmsTable.getSd().getLocation() == null) {
+          hmsTable.getSd().setLocation(icebergTable.location());
+        }
+        
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
+            "Iceberg table already created - can not use provided schema");
+        
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
+            "Iceberg table already created - can not use provided partition 
specification");
+
+        LOG.info("Iceberg table already exists {}", icebergTable);
+        return;
+      } catch (NoSuchTableException nte) {
+        // If the table does not exist we will create it below
+      }
+    }
+
+    // If the table does not exist collect data for table creation
+    // - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC 
takes precedence so the user can override the
+    // Iceberg schema and specification generated by the code
+
+    Set<String> identifierFields = 
Optional.ofNullable(request.getPrimaryKeys())
+        .map(primaryKeys ->
+            
primaryKeys.stream().map(SQLPrimaryKey::getColumn_name).collect(Collectors.toSet()))
+        .orElse(Collections.emptySet());
+
+    Schema schema = schema(catalogProperties, hmsTable, identifierFields);
+    PartitionSpec spec = spec(conf, schema, hmsTable);
+
+    // If there are partition keys specified remove them from the HMS table 
and add them to the column list
+    if (hmsTable.isSetPartitionKeys()) {
+      hmsTable.getSd().getCols().addAll(hmsTable.getPartitionKeys());
+      hmsTable.setPartitionKeysIsSet(false);
+    }
+
+    catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(schema));
+    String specString = PartitionSpecParser.toJson(spec);
+    catalogProperties.put(InputFormatConfig.PARTITION_SPEC, specString);
+    validateCatalogConfigsDefined();
+
+    if (request.getEnvContext() == null) {
+      request.setEnvContext(new EnvironmentContext());
+    }
+    
request.getEnvContext().putToProperties(TableProperties.DEFAULT_PARTITION_SPEC, 
specString);
+    setCommonHmsTablePropertiesForIceberg(hmsTable);
+
+    if 
(hmsTable.getParameters().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP))
 {
+      createHMSTableInHook = true;
+    }
+
+    
assertFileFormat(catalogProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+
+    // Set whether the format is ORC, to be used during vectorization.
+    setOrcOnlyFilesParam(hmsTable);
+    // Remove hive primary key columns from table request, as iceberg doesn't 
support hive primary key.
+    request.setPrimaryKeys(null);
+    setSortOrder(hmsTable, schema, catalogProperties);
+  }
+
+  /**
+   * Method for verification that necessary catalog configs are defined in 
Session Conf.
+   *
+   * <p>If the catalog name is provided in 'iceberg.catalog' table property,
+   * and the name is not the default catalog and not hadoop catalog, checks 
that one of the two configs
+   * is defined in Session Conf: iceberg.catalog.<code>catalogName</code>.type
+   * or iceberg.catalog.<code>catalogName</code>.catalog-impl. See description 
in Catalogs.java for more details.
+   *
+   */
+  private void validateCatalogConfigsDefined() {
+    String catalogName = 
catalogProperties.getProperty(InputFormatConfig.CATALOG_NAME);
+    if (!StringUtils.isEmpty(catalogName) && 
!Catalogs.ICEBERG_HADOOP_TABLE_NAME.equals(catalogName) &&
+        !Catalogs.ICEBERG_DEFAULT_CATALOG_NAME.equals(catalogName)) {
+
+      boolean configsExist = 
!StringUtils.isEmpty(CatalogUtils.getCatalogType(conf, catalogName)) ||
+          !StringUtils.isEmpty(CatalogUtils.getCatalogImpl(conf, catalogName));
+
+      Preconditions.checkArgument(configsExist, "Catalog type or impl must be 
set for catalog: %s", catalogName);
+    }
+  }
+
+  private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, Schema schema,
+      Properties properties) {
+    String sortOderJSONString = 
hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER);
+    SortFields sortFields = null;
+    if (!Strings.isNullOrEmpty(sortOderJSONString)) {
+      try {
+        sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOderJSONString, 
SortFields.class);
+      } catch (Exception e) {
+        LOG.warn("Can not read write order json: {}", sortOderJSONString, e);
+        return;
+      }
+      if (sortFields != null && !sortFields.getSortFields().isEmpty()) {
+        SortOrder.Builder sortOderBuilder = SortOrder.builderFor(schema);
+        sortFields.getSortFields().forEach(fieldDesc -> {
+          NullOrder nullOrder = fieldDesc.getNullOrdering() == 
NullOrdering.NULLS_FIRST ?
+              NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
+          SortDirection sortDirection = fieldDesc.getDirection() == 
SortFieldDesc.SortDirection.ASC ?
+              SortDirection.ASC : SortDirection.DESC;
+          sortOderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, 
nullOrder);
+        });
+        properties.put(TableProperties.DEFAULT_SORT_ORDER, 
SortOrderParser.toJson(sortOderBuilder.build()));
+      }
+    }
+  }
+
+  @Override
+  public void rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    // do nothing
+  }
+
+  @Override
+  public void commitCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    // do nothing
+  }
+
+  @Override
+  public void preDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    // do nothing
+  }
+
+  @Override
+  public void rollbackDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    // do nothing
+  }
+
+  @Override
+  public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, boolean deleteData) {
+    // do nothing
+  }
+
+  @Override
+  public boolean createHMSTableInHook() {
+    return createHMSTableInHook;
+  }
+
+  private static void assertFileFormat(String format) {
+    if (format == null) {
+      return;
+    }
+    String lowerCaseFormat = format.toLowerCase();
+    Preconditions.checkArgument(Arrays.stream(FileFormat.values()).anyMatch(v 
-> lowerCaseFormat.contains(v.label)),
+        String.format("Unsupported fileformat %s", format));
+  }
+
+  protected void 
setCommonHmsTablePropertiesForIceberg(org.apache.hadoop.hive.metastore.api.Table
 hmsTable) {
+    // If the table is not managed by Hive, Hadoop or REST catalog, then the 
location should be set
+    if (!Catalogs.hiveCatalog(conf, catalogProperties) &&
+        !CatalogUtil.ICEBERG_CATALOG_TYPE_REST
+            .equals(Optional.ofNullable(CatalogUtils.getCatalogType(conf, 
catalogProperties)).orElse(""))) {

Review Comment:
   If we do that than the test 
`TestHiveIcebergStorageHandlerNoScan.testCreateTableError` fails on line 704 
asserting that `IllegalArgumentException` is thrown for custom catalog because 
existing design expects location to be set for any catalog which is not hive 
(and I wanted to add REST too) not only for hadoop.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java:
##########
@@ -257,7 +260,9 @@ public Class<? extends AbstractSerDe> getSerDeClass() {
   public HiveMetaHook getMetaHook() {
     // Make sure to always return a new instance here, as HiveIcebergMetaHook 
might hold state relevant for the
     // operation.
-    return new HiveIcebergMetaHook(conf);
+    String catalogType = CatalogUtils.getCatalogType(conf);
+    return StringUtils.isEmpty(catalogType) || 
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equals(catalogType) ?

Review Comment:
   All tests pass with this change and I thought we agreed that for hadoop 
catalog there is no need to use HiveIcebergMetaHook.



##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/database/create/CreateDatabaseOperation.java:
##########
@@ -65,6 +68,10 @@ public int execute() throws HiveException {
       } else { // should never be here
         throw new HiveException("Unsupported database type " + 
database.getType() + " for " + database.getName());
       }
+      String activeCatalog = MetastoreConf.get(context.getConf(), 
MetastoreConf.ConfVars.CATALOG_DEFAULT.getVarname());

Review Comment:
   fixed the naming to `defaultCatalog` for clarity.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java:
##########
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
+import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.HMSTablePropertyHelper;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
+
+public class BaseHiveIcebergMetaHook implements HiveMetaHook {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseHiveIcebergMetaHook.class);
+  private static final ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper();
+  public static final Map<String, String> COMMON_HMS_PROPERTIES = 
ImmutableMap.of(
+      BaseMetastoreTableOperations.TABLE_TYPE_PROP, 
BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase()
+  );
+  private static final Set<String> PARAMETERS_TO_REMOVE = ImmutableSet
+      .of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME, 
InputFormatConfig.PARTITION_SPEC);
+  static final String ORC_FILES_ONLY = "iceberg.orc.files.only";
+
+  protected final Configuration conf;
+  protected Table icebergTable = null;
+  protected Properties catalogProperties;
+  protected boolean createHMSTableInHook = false;
+
+  public enum FileFormat {
+    ORC("orc"), PARQUET("parquet"), AVRO("avro");
+
+    private final String label;
+
+    FileFormat(String label) {
+      this.label = label;
+    }
+
+    public String getLabel() {
+      return label;
+    }
+  }
+
+  public BaseHiveIcebergMetaHook(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    CreateTableRequest request = new CreateTableRequest(hmsTable);
+    preCreateTable(request);
+  }
+
+  @Override
+  public void preCreateTable(CreateTableRequest request) {
+    org.apache.hadoop.hive.metastore.api.Table hmsTable = request.getTable();
+    if (hmsTable.isTemporary()) {
+      throw new UnsupportedOperationException("Creation of temporary iceberg 
tables is not supported.");
+    }
+    this.catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
+
+    // Set the table type even for non HiveCatalog based tables
+    hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+        BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase());
+
+    if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
+      if 
(Boolean.parseBoolean(this.catalogProperties.getProperty(hive_metastoreConstants.TABLE_IS_CTLT)))
 {
+        throw new UnsupportedOperationException("CTLT target table must be a 
HiveCatalog table.");
+      }
+      // For non-HiveCatalog tables too, we should set the input and output 
format
+      // so that the table can be read by other engines like Impala
+      
hmsTable.getSd().setInputFormat(HiveIcebergInputFormat.class.getCanonicalName());
+      
hmsTable.getSd().setOutputFormat(HiveIcebergOutputFormat.class.getCanonicalName());
+
+      // If not using HiveCatalog check for existing table
+      try {
+        this.icebergTable = IcebergTableUtil.getTable(conf, catalogProperties, 
true);
+
+        if (Catalogs.hadoopCatalog(conf, catalogProperties) && 
hmsTable.getSd() != null &&
+                hmsTable.getSd().getLocation() == null) {
+          hmsTable.getSd().setLocation(icebergTable.location());
+        }
+        
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
+            "Iceberg table already created - can not use provided schema");
+        
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
+            "Iceberg table already created - can not use provided partition 
specification");
+
+        LOG.info("Iceberg table already exists {}", icebergTable);
+        return;
+      } catch (NoSuchTableException nte) {
+        // If the table does not exist we will create it below
+      }
+    }
+
+    // If the table does not exist collect data for table creation
+    // - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC 
takes precedence so the user can override the
+    // Iceberg schema and specification generated by the code
+
+    Set<String> identifierFields = 
Optional.ofNullable(request.getPrimaryKeys())
+        .map(primaryKeys ->
+            
primaryKeys.stream().map(SQLPrimaryKey::getColumn_name).collect(Collectors.toSet()))
+        .orElse(Collections.emptySet());
+
+    Schema schema = schema(catalogProperties, hmsTable, identifierFields);
+    PartitionSpec spec = spec(conf, schema, hmsTable);
+
+    // If there are partition keys specified remove them from the HMS table 
and add them to the column list
+    if (hmsTable.isSetPartitionKeys()) {
+      hmsTable.getSd().getCols().addAll(hmsTable.getPartitionKeys());
+      hmsTable.setPartitionKeysIsSet(false);
+    }
+
+    catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(schema));
+    String specString = PartitionSpecParser.toJson(spec);
+    catalogProperties.put(InputFormatConfig.PARTITION_SPEC, specString);
+    validateCatalogConfigsDefined();
+
+    if (request.getEnvContext() == null) {
+      request.setEnvContext(new EnvironmentContext());
+    }
+    
request.getEnvContext().putToProperties(TableProperties.DEFAULT_PARTITION_SPEC, 
specString);
+    setCommonHmsTablePropertiesForIceberg(hmsTable);
+
+    if 
(hmsTable.getParameters().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP))
 {
+      createHMSTableInHook = true;
+    }
+
+    
assertFileFormat(catalogProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+
+    // Set whether the format is ORC, to be used during vectorization.
+    setOrcOnlyFilesParam(hmsTable);
+    // Remove hive primary key columns from table request, as iceberg doesn't 
support hive primary key.
+    request.setPrimaryKeys(null);
+    setSortOrder(hmsTable, schema, catalogProperties);
+  }
+
+  /**
+   * Method for verification that necessary catalog configs are defined in 
Session Conf.
+   *
+   * <p>If the catalog name is provided in 'iceberg.catalog' table property,
+   * and the name is not the default catalog and not hadoop catalog, checks 
that one of the two configs
+   * is defined in Session Conf: iceberg.catalog.<code>catalogName</code>.type
+   * or iceberg.catalog.<code>catalogName</code>.catalog-impl. See description 
in Catalogs.java for more details.
+   *
+   */
+  private void validateCatalogConfigsDefined() {
+    String catalogName = 
catalogProperties.getProperty(InputFormatConfig.CATALOG_NAME);
+    if (!StringUtils.isEmpty(catalogName) && 
!Catalogs.ICEBERG_HADOOP_TABLE_NAME.equals(catalogName) &&
+        !Catalogs.ICEBERG_DEFAULT_CATALOG_NAME.equals(catalogName)) {

Review Comment:
   Fixed (removed the skipping). Additionally, converted this method to 
functional style.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to