This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 3088b77  HIVE-25613: Port Iceberg Hive fixes to the iceberg module 
(#2721)
3088b77 is described below

commit 3088b77ab1d0bd9fd69addf5f859b3c175e28dba
Author: pvary <pv...@cloudera.com>
AuthorDate: Fri Oct 22 14:13:18 2021 +0200

    HIVE-25613: Port Iceberg Hive fixes to the iceberg module (#2721)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Hive: Add table-level JVM lock on commits (#2547)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Core: Move ClientPool and ClientPoolImpl to core (#2491)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Hive: Improve code style (#2641)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Style: Delete blank line of CachedClientPool.java 
(#2787)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Hive: Fix some message typos in HiveCatalog: Matastore 
=> Metastore (#2950)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Hive: Fix toString NPE with recommended constructor 
(#3021)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Build: Upgrade to Gradle 7.x (#2826)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Hive: Ensure tableLevelMutex is unlocked when 
uncommitted metadata delete fails (#3264)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Doc: refactor Hive documentation with catalog loading 
examples (#2544)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Hive: unify catalog experience across engines (#2565)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Core: Add HadoopConfigurable interface to serialize 
custom FileIO (#2678)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Move Assert.assertTrue(..) instance checks to AssertJ 
assertions (#2756)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Core: Support multiple specs in OutputFileFactory 
(#2858)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - MR: Use SerializableTable in IcebergSplit (#2988)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Fix exception exception message in IcebergInputFormat 
(#3153)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Data: Fix equality deletes with date/time types (#3135)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Build: Fix ErrorProne UnnecessarilyQualified warnings 
(#3262)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Build: Fix ErrorProne NewHashMapInt warnings (#3260)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Core: Fail if both Catalog type and catalog-impl are 
configured (#3162)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - MR: Support imported data in InputFormat using name 
mapping (#3312)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Hive: Fix Catalog initialization without Configuration 
(#3252)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Hive: Switch to RetryingHMSClient (allows configuration 
of retryDelays and retries) (#3099)
    
    * HIVE-25613: Port Iceberg Hive fixes to the iceberg module
    Source Iceberg PR - Hive: Fix Catalogs.hiveCatalog method for default 
catalogs (#3338)
---
 iceberg/iceberg-catalog/pom.xml                    |   6 +-
 .../org/apache/iceberg/hive/CachedClientPool.java  |  16 +-
 .../java/org/apache/iceberg/hive/HiveCatalog.java  |  38 +--
 .../org/apache/iceberg/hive/HiveClientPool.java    |  32 +--
 .../org/apache/iceberg/hive/HiveSchemaUtil.java    |   6 +-
 .../apache/iceberg/hive/HiveTableOperations.java   |  40 ++-
 .../org/apache/iceberg/hive/TestHiveCatalog.java   |  30 +++
 .../apache/iceberg/hive/TestHiveClientPool.java    |  98 ++++++++
 .../apache/iceberg/hive/TestHiveCommitLocks.java   |  48 +++-
 iceberg/iceberg-handler/pom.xml                    |  13 +-
 .../main/java/org/apache/iceberg/mr/Catalogs.java  |  89 ++++---
 .../org/apache/iceberg/mr/InputFormatConfig.java   |  33 +++
 .../org/apache/iceberg/mr/hive/Deserializer.java   |   3 +-
 .../iceberg/mr/hive/HiveIcebergInputFormat.java    |   2 +-
 .../iceberg/mr/hive/HiveIcebergOutputFormat.java   |  18 +-
 .../apache/iceberg/mr/hive/HiveIcebergSerDe.java   |   4 +-
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   |  26 +-
 .../apache/iceberg/mr/mapreduce/IcebergSplit.java  |  51 ++--
 .../java/org/apache/iceberg/mr/TestCatalogs.java   | 189 +++++++++-----
 .../apache/iceberg/mr/TestIcebergInputFormats.java |   9 +-
 .../iceberg/mr/TestInputFormatReaderDeletes.java   |   2 +
 .../iceberg/mr/hive/TestHiveIcebergCTAS.java       |  11 +-
 .../iceberg/mr/hive/TestHiveIcebergInserts.java    |   6 +-
 .../iceberg/mr/hive/TestHiveIcebergMigration.java  |  28 ++-
 .../mr/hive/TestHiveIcebergOutputCommitter.java    |  16 +-
 .../iceberg/mr/hive/TestHiveIcebergSelects.java    |   7 +-
 .../iceberg/mr/hive/TestHiveIcebergStatistics.java |  12 +-
 .../TestHiveIcebergStorageHandlerLocalScan.java    |   3 +-
 .../hive/TestHiveIcebergStorageHandlerNoScan.java  |  32 +--
 ...eIcebergStorageHandlerWithMultipleCatalogs.java |   8 +-
 .../org/apache/iceberg/mr/hive/TestTables.java     |  30 ++-
 iceberg/patched-iceberg-core/pom.xml               |  18 +-
 .../main/java/org/apache/iceberg/CatalogUtil.java  | 276 +++++++++++++++++++++
 .../main/java/org/apache/iceberg}/ClientPool.java  |   4 +-
 .../java/org/apache/iceberg}/ClientPoolImpl.java   |  17 +-
 .../org/apache/iceberg/jdbc/JdbcClientPool.java    |  71 ++++++
 .../java/org/apache/iceberg/jdbc/JdbcUtil.java     | 102 ++++++++
 iceberg/pom.xml                                    |  13 +
 38 files changed, 1109 insertions(+), 298 deletions(-)

diff --git a/iceberg/iceberg-catalog/pom.xml b/iceberg/iceberg-catalog/pom.xml
index a3af94d..a425eec 100644
--- a/iceberg/iceberg-catalog/pom.xml
+++ b/iceberg/iceberg-catalog/pom.xml
@@ -64,6 +64,10 @@
             <classifier>tests</classifier>
             <scope>test</scope>
         </dependency>
-
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
index eca505c..e2dc990 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
@@ -25,13 +25,14 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ClientPool;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.thrift.TException;
 
-public class CachedClientPool implements ClientPool<HiveMetaStoreClient, 
TException> {
+public class CachedClientPool implements ClientPool<IMetaStoreClient, 
TException> {
 
   private static Cache<String, HiveClientPool> clientPoolCache;
 
@@ -40,7 +41,7 @@ public class CachedClientPool implements 
ClientPool<HiveMetaStoreClient, TExcept
   private final int clientPoolSize;
   private final long evictionInterval;
 
-  CachedClientPool(Configuration conf, Map<String, String> properties) {
+  public CachedClientPool(Configuration conf, Map<String, String> properties) {
     this.conf = conf;
     this.metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
     this.clientPoolSize = PropertyUtil.propertyAsInt(properties,
@@ -57,7 +58,6 @@ public class CachedClientPool implements 
ClientPool<HiveMetaStoreClient, TExcept
     return clientPoolCache.get(metastoreUri, k -> new 
HiveClientPool(clientPoolSize, conf));
   }
 
-
   private synchronized void init() {
     if (clientPoolCache == null) {
       clientPoolCache = 
Caffeine.newBuilder().expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
@@ -72,7 +72,13 @@ public class CachedClientPool implements 
ClientPool<HiveMetaStoreClient, TExcept
   }
 
   @Override
-  public <R> R run(Action<R, HiveMetaStoreClient, TException> action) throws 
TException, InterruptedException {
+  public <R> R run(Action<R, IMetaStoreClient, TException> action) throws 
TException, InterruptedException {
     return clientPool().run(action);
   }
+
+  @Override
+  public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean 
retry)
+          throws TException, InterruptedException {
+    return clientPool().run(action, retry);
+  }
 }
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 477e591..880d60d 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -38,6 +38,7 @@ import org.apache.iceberg.BaseMetastoreCatalog;
 import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ClientPool;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.Namespace;
@@ -63,7 +64,7 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
   private String name;
   private Configuration conf;
   private FileIO fileIO;
-  private ClientPool<HiveMetaStoreClient, TException> clients;
+  private ClientPool<IMetaStoreClient, TException> clients;
 
   public HiveCatalog() {
   }
@@ -71,9 +72,9 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
   /**
    * Hive Catalog constructor.
    *
+   * @param conf Hadoop Configuration
    * @deprecated please use the no-arg constructor, setConf and initialize to 
construct the catalog. Will be removed in
    * v0.13.0
-   * @param conf Hadoop Configuration
    */
   @Deprecated
   public HiveCatalog(Configuration conf) {
@@ -94,6 +95,11 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
   @Override
   public void initialize(String inputName, Map<String, String> properties) {
     this.name = inputName;
+    if (conf == null) {
+      LOG.warn("No Hadoop Configuration was set, using the default environment 
Configuration");
+      this.conf = new Configuration();
+    }
+
     if (properties.containsKey(CatalogProperties.URI)) {
       this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, 
properties.get(CatalogProperties.URI));
     }
@@ -235,7 +241,7 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
         !namespace.isEmpty(),
         "Cannot create namespace with invalid name: %s", namespace);
     Preconditions.checkArgument(isValidateNamespace(namespace),
-        "Cannot support multi part namespace in Hive MetaStore: %s", 
namespace);
+        "Cannot support multi part namespace in Hive Metastore: %s", 
namespace);
 
     try {
       clients.run(client -> {
@@ -250,12 +256,12 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
             namespace);
 
     } catch (TException e) {
-      throw new RuntimeException("Failed to create namespace " + namespace + " 
in Hive MataStore", e);
+      throw new RuntimeException("Failed to create namespace " + namespace + " 
in Hive Matastore", e);
 
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException(
-          "Interrupted in call to createDatabase(name) " + namespace + " in 
Hive MataStore", e);
+          "Interrupted in call to createDatabase(name) " + namespace + " in 
Hive Matastore", e);
     }
   }
 
@@ -268,7 +274,7 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
       return ImmutableList.of();
     }
     try {
-      List<Namespace> namespaces = 
clients.run(HiveMetaStoreClient::getAllDatabases)
+      List<Namespace> namespaces = 
clients.run(IMetaStoreClient::getAllDatabases)
           .stream()
           .map(Namespace::of)
           .collect(Collectors.toList());
@@ -277,12 +283,12 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
       return namespaces;
 
     } catch (TException e) {
-      throw new RuntimeException("Failed to list all namespace: " + namespace 
+ " in Hive MataStore",  e);
+      throw new RuntimeException("Failed to list all namespace: " + namespace 
+ " in Hive Matastore",  e);
 
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException(
-          "Interrupted in call to getAllDatabases() " + namespace + " in Hive 
MataStore", e);
+          "Interrupted in call to getAllDatabases() " + namespace + " in Hive 
Matastore", e);
     }
   }
 
@@ -311,12 +317,12 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
       return false;
 
     } catch (TException e) {
-      throw new RuntimeException("Failed to drop namespace " + namespace + " 
in Hive MataStore", e);
+      throw new RuntimeException("Failed to drop namespace " + namespace + " 
in Hive Matastore", e);
 
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException(
-          "Interrupted in call to drop dropDatabase(name) " + namespace + " in 
Hive MataStore", e);
+          "Interrupted in call to drop dropDatabase(name) " + namespace + " in 
Hive Matastore", e);
     }
   }
 
@@ -362,11 +368,11 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
 
     } catch (TException e) {
       throw new RuntimeException(
-          "Failed to list namespace under namespace: " + namespace + " in Hive 
MataStore", e);
+          "Failed to list namespace under namespace: " + namespace + " in Hive 
Matastore", e);
 
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      throw new RuntimeException("Interrupted in call to getDatabase(name) " + 
namespace + " in Hive MataStore", e);
+      throw new RuntimeException("Interrupted in call to getDatabase(name) " + 
namespace + " in Hive Matastore", e);
     }
   }
 
@@ -386,12 +392,12 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
       throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", 
namespace);
 
     } catch (TException e) {
-      throw new RuntimeException("Failed to list namespace under namespace: " 
+ namespace + " in Hive MataStore", e);
+      throw new RuntimeException("Failed to list namespace under namespace: " 
+ namespace + " in Hive Matastore", e);
 
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException(
-          "Interrupted in call to getDatabase(name) " + namespace + " in Hive 
MataStore", e);
+          "Interrupted in call to getDatabase(name) " + namespace + " in Hive 
Matastore", e);
     }
   }
 
@@ -514,7 +520,7 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements SupportsNamespa
   public String toString() {
     return MoreObjects.toStringHelper(this)
         .add("name", name)
-        .add("uri", this.conf.get(HiveConf.ConfVars.METASTOREURIS.varname))
+        .add("uri", this.conf == null ? "" : 
this.conf.get(HiveConf.ConfVars.METASTOREURIS.varname))
         .toString();
   }
 
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
index ef31e3b..e322792 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
@@ -21,35 +21,39 @@ package org.apache.iceberg.hive;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.ClientPoolImpl;
+import org.apache.iceberg.common.DynMethods;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 
-public class HiveClientPool extends ClientPoolImpl<HiveMetaStoreClient, 
TException> {
+public class HiveClientPool extends ClientPoolImpl<IMetaStoreClient, 
TException> {
 
-  // use appropriate ctor depending on whether we're working with Hive2 or 
Hive3 dependencies
-  // we need to do this because there is a breaking API change between Hive2 
and Hive3
-  private static final DynConstructors.Ctor<HiveMetaStoreClient> CLIENT_CTOR = 
DynConstructors.builder()
-          .impl(HiveMetaStoreClient.class, HiveConf.class)
-          .impl(HiveMetaStoreClient.class, Configuration.class)
-          .build();
+  // use appropriate ctor depending on whether we're working with Hive1, 
Hive2, or Hive3 dependencies
+  // we need to do this because there is a breaking API change between Hive1, 
Hive2, and Hive3
+  private static final DynMethods.StaticMethod GET_CLIENT = 
DynMethods.builder("getProxy")
+      .impl(RetryingMetaStoreClient.class, HiveConf.class)
+      .impl(RetryingMetaStoreClient.class, HiveConf.class, Boolean.TYPE)
+      .impl(RetryingMetaStoreClient.class, Configuration.class, Boolean.TYPE)
+      .buildStatic();
 
   private final HiveConf hiveConf;
 
   public HiveClientPool(int poolSize, Configuration conf) {
-    super(poolSize, TTransportException.class);
+    // Do not allow retry by default as we rely on RetryingHiveClient
+    super(poolSize, TTransportException.class, false);
     this.hiveConf = new HiveConf(conf, HiveClientPool.class);
     this.hiveConf.addResource(conf);
   }
 
   @Override
-  protected HiveMetaStoreClient newClient()  {
+  protected IMetaStoreClient newClient()  {
     try {
       try {
-        return CLIENT_CTOR.newInstance(hiveConf);
+        return GET_CLIENT.invoke(hiveConf, true);
       } catch (RuntimeException e) {
         // any MetaException would be wrapped into RuntimeException during 
reflection, so let's double-check type here
         if (e.getCause() instanceof MetaException) {
@@ -71,7 +75,7 @@ public class HiveClientPool extends 
ClientPoolImpl<HiveMetaStoreClient, TExcepti
   }
 
   @Override
-  protected HiveMetaStoreClient reconnect(HiveMetaStoreClient client) {
+  protected IMetaStoreClient reconnect(IMetaStoreClient client) {
     try {
       client.close();
       client.reconnect();
@@ -88,7 +92,7 @@ public class HiveClientPool extends 
ClientPoolImpl<HiveMetaStoreClient, TExcepti
   }
 
   @Override
-  protected void close(HiveMetaStoreClient client) {
+  protected void close(IMetaStoreClient client) {
     client.close();
   }
 
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
index e6c6cf5..57123e1 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
@@ -262,6 +262,8 @@ public final class HiveSchemaUtil {
       case DATE:
         return "date";
       case TIME:
+      case STRING:
+      case UUID:
         return "string";
       case TIMESTAMP:
         Types.TimestampType timestampType = (Types.TimestampType) type;
@@ -269,11 +271,7 @@ public final class HiveSchemaUtil {
           return "timestamp with local time zone";
         }
         return "timestamp";
-      case STRING:
-      case UUID:
-        return "string";
       case FIXED:
-        return "binary";
       case BINARY:
         return "binary";
       case DECIMAL:
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 386d9ff..0ade33f 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -19,6 +19,8 @@
 
 package org.apache.iceberg.hive;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
@@ -28,12 +30,14 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
@@ -48,6 +52,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.ClientPool;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.TableMetadata;
@@ -82,14 +87,18 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
   private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = 
"iceberg.hive.lock-timeout-ms";
   private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = 
"iceberg.hive.lock-check-min-wait-ms";
   private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = 
"iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = 
"iceberg.hive.table-level-lock-evict-ms";
   private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 
1000; // 3 minutes
   private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 
milliseconds
   private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 
5 seconds
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = 
TimeUnit.MINUTES.toMillis(10);
   private static final DynMethods.UnboundMethod ALTER_TABLE = 
DynMethods.builder("alter_table")
-      .impl(HiveMetaStoreClient.class, "alter_table_with_environmentContext",
+      .impl(IMetaStoreClient.class, "alter_table_with_environmentContext",
           String.class, String.class, Table.class, EnvironmentContext.class)
-      .impl(HiveMetaStoreClient.class, "alter_table",
+      .impl(IMetaStoreClient.class, "alter_table",
           String.class, String.class, Table.class, EnvironmentContext.class)
+      .impl(IMetaStoreClient.class, "alter_table",
+          String.class, String.class, Table.class)
       .build();
   private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION = 
ImmutableBiMap.of(
       // gc.enabled in Iceberg and external.table.purge in Hive are meant to 
do the same things but with different names
@@ -99,6 +108,16 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
   // Should be in org.apache.iceberg.hadoop.ConfigProperties, but that is not 
ported to Hive codebase
   public static final String KEEP_HIVE_STATS = "iceberg.hive.keep.stats";
 
+  private static Cache<String, ReentrantLock> commitLockCache;
+
+  private static synchronized void initTableLevelLockCache(long 
evictionTimeout) {
+    if (commitLockCache == null) {
+      commitLockCache = Caffeine.newBuilder()
+              .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+              .build();
+    }
+  }
+
   /**
    * Provides key translation where necessary between Iceberg and HMS props. 
This translation is needed because some
    * properties control the same behaviour but are named differently in 
Iceberg and Hive. Therefore changes to these
@@ -130,7 +149,7 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
   private final long lockCheckMinWaitTime;
   private final long lockCheckMaxWaitTime;
   private final FileIO fileIO;
-  private final ClientPool<HiveMetaStoreClient, TException> metaClients;
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
 
   protected HiveTableOperations(Configuration conf, ClientPool metaClients, 
FileIO fileIO,
                                 String catalogName, String database, String 
table) {
@@ -146,6 +165,9 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
         conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, 
HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
     this.lockCheckMaxWaitTime =
         conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, 
HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+            conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, 
HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
   }
 
   @Override
@@ -195,6 +217,10 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
     CommitStatus commitStatus = CommitStatus.FAILURE;
     boolean updateHiveTable = false;
     Optional<Long> lockId = Optional.empty();
+    // getting a process-level lock per table to avoid concurrent commit 
attempts to the same table from the same
+    // JVM process, which would result in unnecessary and costly HMS lock 
acquisition requests
+    ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new 
ReentrantLock(true));
+    tableLevelMutex.lock();
     try {
       lockId = Optional.of(acquireLock());
       // TODO add lock heart beating for cases where default lock timeout is 
too low.
@@ -275,7 +301,7 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
       throw new RuntimeException("Interrupted during commit", e);
 
     } finally {
-      cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId);
+      cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, 
tableLevelMutex);
     }
   }
 
@@ -457,7 +483,8 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
     return lockId;
   }
 
-  private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String 
metadataLocation, Optional<Long> lockId) {
+  private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String 
metadataLocation, Optional<Long> lockId,
+      ReentrantLock tableLevelMutex) {
     try {
       if (commitStatus == CommitStatus.FAILURE) {
         // If we are sure the commit failed, clean up the uncommitted metadata 
file
@@ -468,6 +495,7 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
       throw e;
     } finally {
       unlock(lockId);
+      tableLevelMutex.unlock();
     }
   }
 
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
index d4741a1..9518540 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
@@ -38,6 +38,7 @@ import 
org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.transforms.Transform;
 import org.apache.iceberg.transforms.Transforms;
 import org.apache.iceberg.types.Types;
@@ -45,6 +46,7 @@ import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.rules.TemporaryFolder;
 
 import static org.apache.iceberg.NullOrder.NULLS_FIRST;
@@ -119,6 +121,34 @@ public class TestHiveCatalog extends HiveMetastoreTest {
   }
 
   @Test
+  public void testInitialize() {
+    Assertions.assertDoesNotThrow(() -> {
+      HiveCatalog catalog = new HiveCatalog();
+      catalog.initialize("hive", Maps.newHashMap());
+    });
+  }
+
+  @Test
+  public void testToStringWithoutSetConf() {
+    Assertions.assertDoesNotThrow(() -> {
+      HiveCatalog catalog = new HiveCatalog();
+      catalog.toString();
+    });
+  }
+
+  @Test
+  public void testInitializeCatalogWithProperties() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("uri", "thrift://examplehost:9083");
+    properties.put("warehouse", "/user/hive/testwarehouse");
+    HiveCatalog catalog = new HiveCatalog();
+    catalog.initialize("hive", properties);
+
+    Assert.assertEquals(catalog.getConf().get("hive.metastore.uris"), 
"thrift://examplehost:9083");
+    Assert.assertEquals(catalog.getConf().get("hive.metastore.warehouse.dir"), 
"/user/hive/testwarehouse");
+  }
+
+  @Test
   public void testCreateTableTxnBuilder() throws Exception {
     Schema schema = new Schema(
         required(1, "id", Types.IntegerType.get(), "unique ID"),
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
index b7870af..36996e3 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
@@ -23,9 +23,23 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestHiveClientPool {
 
@@ -38,6 +52,20 @@ public class TestHiveClientPool {
           "  </property>\n" +
           "</configuration>\n";
 
+  HiveClientPool clients;
+
+  @Before
+  public void before() {
+    HiveClientPool clientPool = new HiveClientPool(2, new Configuration());
+    clients = Mockito.spy(clientPool);
+  }
+
+  @After
+  public void after() {
+    clients.close();
+    clients = null;
+  }
+
   @Test
   public void testConf() {
     HiveConf conf = createHiveConf();
@@ -65,4 +93,74 @@ public class TestHiveClientPool {
     }
     return hiveConf;
   }
+
+  @Test
+  public void testNewClientFailure() {
+    Mockito.doThrow(new RuntimeException("Connection 
exception")).when(clients).newClient();
+    AssertHelpers.assertThrows("Should throw exception", 
RuntimeException.class,
+            "Connection exception", () -> clients.run(Object::toString));
+  }
+
+  @Test
+  public void testGetTablesFailsForNonReconnectableException() throws 
Exception {
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+    Mockito.doReturn(hmsClient).when(clients).newClient();
+    Mockito.doThrow(new MetaException("Another meta exception"))
+            .when(hmsClient).getTables(Mockito.anyString(), 
Mockito.anyString());
+    AssertHelpers.assertThrows("Should throw exception", MetaException.class,
+            "Another meta exception", () -> clients.run(client -> 
client.getTables("default", "t")));
+  }
+
+  @Test
+  public void testConnectionFailureRestoreForMetaException() throws Exception {
+    HiveMetaStoreClient hmsClient = newClient();
+
+    // Throwing an exception may trigger the client to reconnect.
+    String metaMessage = "Got exception: 
org.apache.thrift.transport.TTransportException";
+    Mockito.doThrow(new 
MetaException(metaMessage)).when(hmsClient).getAllDatabases();
+
+    // Create a new client when the reconnect method is called.
+    HiveMetaStoreClient newClient = reconnect(hmsClient);
+
+    List<String> databases = Lists.newArrayList("db1", "db2");
+
+    Mockito.doReturn(databases).when(newClient).getAllDatabases();
+    // The return is OK when the reconnect method is called.
+    Assert.assertEquals(databases, clients.run(client -> 
client.getAllDatabases(), true));
+
+    // Verify that the method is called.
+    Mockito.verify(clients).reconnect(hmsClient);
+    Mockito.verify(clients, Mockito.never()).reconnect(newClient);
+  }
+
+  @Test
+  public void testConnectionFailureRestoreForTTransportException() throws 
Exception {
+    HiveMetaStoreClient hmsClient = newClient();
+    Mockito.doThrow(new 
TTransportException()).when(hmsClient).getAllFunctions();
+
+    // Create a new client when getAllFunctions() failed.
+    HiveMetaStoreClient newClient = reconnect(hmsClient);
+
+    GetAllFunctionsResponse response = new GetAllFunctionsResponse();
+    response.addToFunctions(
+            new Function("concat", "db1", "classname", "root", 
PrincipalType.USER, 100, FunctionType.JAVA, null));
+    Mockito.doReturn(response).when(newClient).getAllFunctions();
+
+    Assert.assertEquals(response, clients.run(client -> 
client.getAllFunctions(), true));
+
+    Mockito.verify(clients).reconnect(hmsClient);
+    Mockito.verify(clients, Mockito.never()).reconnect(newClient);
+  }
+
+  private HiveMetaStoreClient newClient() {
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+    Mockito.doReturn(hmsClient).when(clients).newClient();
+    return hmsClient;
+  }
+
+  private HiveMetaStoreClient reconnect(HiveMetaStoreClient obsoleteClient) {
+    HiveMetaStoreClient newClient = Mockito.mock(HiveMetaStoreClient.class);
+    Mockito.doReturn(newClient).when(clients).reconnect(obsoleteClient);
+    return newClient;
+  }
 }
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
index 531e511..3b4bb15 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
@@ -20,9 +20,15 @@
 package org.apache.iceberg.hive;
 
 import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.iceberg.AssertHelpers;
@@ -42,7 +48,11 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestHiveCommitLocks extends HiveTableBaseTest {
@@ -50,8 +60,8 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
   private static HiveClientPool spyClientPool = null;
   private static CachedClientPool spyCachedClientPool = null;
   private static Configuration overriddenHiveConf = new 
Configuration(hiveConf);
-  private static AtomicReference<HiveMetaStoreClient> spyClientRef = new 
AtomicReference<>();
-  private static HiveMetaStoreClient spyClient = null;
+  private static AtomicReference<IMetaStoreClient> spyClientRef = new 
AtomicReference<>();
+  private static IMetaStoreClient spyClient = null;
   HiveTableOperations ops = null;
   TableMetadata metadataV1 = null;
   TableMetadata metadataV2 = null;
@@ -71,12 +81,13 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
     // The spy clients are reused between methods and closed at the end of all 
tests in this class.
     spyClientPool = spy(new HiveClientPool(1, overriddenHiveConf));
     when(spyClientPool.newClient()).thenAnswer(invocation -> {
-      HiveMetaStoreClient client = (HiveMetaStoreClient) 
invocation.callRealMethod();
-      spyClientRef.set(spy(client));
+      // cannot spy on RetryingHiveMetastoreClient as it is a proxy
+      IMetaStoreClient client = spy(new HiveMetaStoreClient(hiveConf));
+      spyClientRef.set(client);
       return spyClientRef.get();
     });
 
-    spyClientPool.run(HiveMetaStoreClient::isLocalMetaStore); // To ensure new 
client is created.
+    spyClientPool.run(IMetaStoreClient::isLocalMetaStore); // To ensure new 
client is created.
 
     spyCachedClientPool = spy(new CachedClientPool(hiveConf, 
Collections.emptyMap()));
     when(spyCachedClientPool.clientPool()).thenAnswer(invocation -> 
spyClientPool);
@@ -210,4 +221,31 @@ public class TestHiveCommitLocks extends HiveTableBaseTest 
{
         "Could not acquire the lock on",
         () -> spyOps.doCommit(metadataV2, metadataV1));
   }
+
+  @Test
+  public void 
testTableLevelProcessLockBlocksConcurrentHMSRequestsForSameTable() throws 
Exception {
+    int numConcurrentCommits = 10;
+    // resetting the spy client to forget about prior call history
+    reset(spyClient);
+
+    // simulate several concurrent commit operations on the same table
+    ExecutorService executor = 
Executors.newFixedThreadPool(numConcurrentCommits);
+    IntStream.range(0, numConcurrentCommits).forEach(i ->
+        executor.submit(() -> {
+          try {
+            spyOps.doCommit(metadataV2, metadataV1);
+          } catch (CommitFailedException e) {
+            // failures are expected here when checking the base version
+            // it's no problem, we're not testing the actual commit success 
here, only the HMS lock acquisition attempts
+          }
+        }));
+    executor.shutdown();
+    executor.awaitTermination(30, TimeUnit.SECONDS);
+
+    // intra-process commits to the same table should be serialized now
+    // i.e. no thread should receive WAITING state from HMS and have to call 
checkLock periodically
+    verify(spyClient, never()).checkLock(any(Long.class));
+    // all threads eventually got their turn
+    verify(spyClient, 
times(numConcurrentCommits)).lock(any(LockRequest.class));
+  }
 }
diff --git a/iceberg/iceberg-handler/pom.xml b/iceberg/iceberg-handler/pom.xml
index c60e0ef..87c21fa 100644
--- a/iceberg/iceberg-handler/pom.xml
+++ b/iceberg/iceberg-handler/pom.xml
@@ -97,10 +97,15 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-           <groupId>org.apache.hadoop</groupId>
-           <artifactId>hadoop-hdfs</artifactId>
-           <scope>test</scope>
-       </dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
index 710150a..ccd3c6e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
@@ -19,7 +19,6 @@
 
 package org.apache.iceberg.mr;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -35,35 +34,46 @@ import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 
 /**
  * Class for catalog resolution and accessing the common functions for {@link 
Catalog} API.
  * <p>
- * Catalog resolution happens in this order:
- * <ol>
- * <li>Custom catalog if specified by {@link 
InputFormatConfig#CATALOG_LOADER_CLASS}
- * <li>Hadoop or Hive catalog if specified by {@link InputFormatConfig#CATALOG}
- * <li>Hadoop Tables
- * </ol>
+ * If the catalog name is provided, get the catalog type from 
iceberg.catalog.<code>catalogName</code>.type config.
+ * <p>
+ * In case the catalog name is {@link #ICEBERG_HADOOP_TABLE_NAME 
location_based_table},
+ * type is ignored and tables will be loaded using {@link HadoopTables}.
+ * <p>
+ * In case the value of catalog type is null, 
iceberg.catalog.<code>catalogName</code>.catalog-impl config
+ * is used to determine the catalog implementation class.
+ * <p>
+ * If catalog name is null, get the catalog type from {@link 
InputFormatConfig#CATALOG iceberg.mr.catalog} config:
+ * <ul>
+ *   <li>hive: HiveCatalog</li>
+ *   <li>location: HadoopTables</li>
+ *   <li>hadoop: HadoopCatalog</li>
+ * </ul>
+ * <p>
+ * In case the value of catalog type is null,
+ * {@link InputFormatConfig#CATALOG_LOADER_CLASS 
iceberg.mr.catalog.loader.class} is used to determine
+ * the catalog implementation class.
+ * <p>
+ * Note: null catalog name mode is only supported for backwards compatibility. 
Using this mode is NOT RECOMMENDED.
  */
 public final class Catalogs {
 
   public static final String ICEBERG_DEFAULT_CATALOG_NAME = "default_iceberg";
   public static final String ICEBERG_HADOOP_TABLE_NAME = 
"location_based_table";
 
-  private static final String HIVE_CATALOG_TYPE = "hive";
-  private static final String HADOOP_CATALOG_TYPE = "hadoop";
-  private static final String NO_CATALOG_TYPE = "no catalog";
-
   public static final String NAME = "name";
   public static final String LOCATION = "location";
 
+  private static final String NO_CATALOG_TYPE = "no catalog";
   private static final Set<String> PROPERTIES_TO_REMOVE =
       ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA, 
InputFormatConfig.PARTITION_SPEC, LOCATION, NAME,
               InputFormatConfig.CATALOG_NAME);
@@ -141,7 +151,7 @@ public final class Catalogs {
     String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
 
     // Create a table property map without the controlling properties
-    Map<String, String> map = new HashMap<>(props.size());
+    Map<String, String> map = Maps.newHashMapWithExpectedSize(props.size());
     for (Object key : props.keySet()) {
       if (!PROPERTIES_TO_REMOVE.contains(key)) {
         map.put(key.toString(), props.get(key).toString());
@@ -193,16 +203,20 @@ public final class Catalogs {
    */
   public static boolean hiveCatalog(Configuration conf, Properties props) {
     String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
-    return 
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(getCatalogType(conf, 
catalogName));
+    String catalogType = getCatalogType(conf, catalogName);
+    if (catalogType != null) {
+      return 
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType);
+    }
+    catalogType = getCatalogType(conf, ICEBERG_DEFAULT_CATALOG_NAME);
+    if (catalogType != null) {
+      return 
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType);
+    }
+    return getCatalogProperties(conf, catalogName, 
catalogType).get(CatalogProperties.CATALOG_IMPL) == null;
   }
 
   @VisibleForTesting
   static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
     String catalogType = getCatalogType(conf, catalogName);
-    if (catalogType == null) {
-      throw new NoSuchNamespaceException("Catalog definition for %s is not 
found.", catalogName);
-    }
-
     if (NO_CATALOG_TYPE.equalsIgnoreCase(catalogType)) {
       return Optional.empty();
     } else {
@@ -238,10 +252,18 @@ public final class Catalogs {
    */
   private static Map<String, String> 
addCatalogPropertiesIfMissing(Configuration conf, String catalogType,
                                                                    Map<String, 
String> catalogProperties) {
-    catalogProperties.putIfAbsent(CatalogUtil.ICEBERG_CATALOG_TYPE, 
catalogType);
-    if (catalogType.equalsIgnoreCase(HADOOP_CATALOG_TYPE)) {
-      catalogProperties.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION,
-              conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION));
+    if (catalogType != null) {
+      catalogProperties.putIfAbsent(CatalogUtil.ICEBERG_CATALOG_TYPE, 
catalogType);
+    }
+
+    String legacyCatalogImpl = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
+    if (legacyCatalogImpl != null) {
+      catalogProperties.putIfAbsent(CatalogProperties.CATALOG_IMPL, 
legacyCatalogImpl);
+    }
+
+    String legacyWarehouseLocation = 
conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION);
+    if (legacyWarehouseLocation != null) {
+      catalogProperties.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION, 
legacyWarehouseLocation);
     }
     return catalogProperties;
   }
@@ -249,35 +271,24 @@ public final class Catalogs {
   /**
    * Return the catalog type based on the catalog name.
    * <p>
-   * If the catalog name is provided get the catalog type from 
'iceberg.catalog.<code>catalogName</code>.type' config.
-   * In case the value of this property is null, return with no catalog 
definition (Hadoop Table)
-   * </p>
-   * <p>
-   * If catalog name is null, check the global conf for 'iceberg.mr.catalog' 
property. If the value of the property is:
-   * <ul>
-   *     <li>null/hive -> Hive Catalog</li>
-   *     <li>location -> Hadoop Table</li>
-   *     <li>hadoop -> Hadoop Catalog</li>
-   *     <li>any other value -> Custom Catalog</li>
-   * </ul>
-   * </p>
+   * See {@link Catalogs} documentation for catalog type resolution strategy.
+   *
    * @param conf global hive configuration
    * @param catalogName name of the catalog
    * @return type of the catalog, can be null
    */
   private static String getCatalogType(Configuration conf, String catalogName) 
{
     if (catalogName != null) {
-      String catalogType = 
conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName));
-      if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME) || catalogType == 
null) {
+      String catalogType = conf.get(InputFormatConfig.catalogPropertyConfigKey(
+          catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE));
+      if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME)) {
         return NO_CATALOG_TYPE;
       } else {
         return catalogType;
       }
     } else {
       String catalogType = conf.get(InputFormatConfig.CATALOG);
-      if (catalogType == null) {
-        return HIVE_CATALOG_TYPE;
-      } else if (catalogType.equals(LOCATION)) {
+      if (catalogType != null && catalogType.equals(LOCATION)) {
         return NO_CATALOG_TYPE;
       } else {
         return catalogType;
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index e8e2163..32b777b 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -51,9 +51,30 @@ public class InputFormatConfig {
   public static final String SERIALIZED_TABLE_PREFIX = 
"iceberg.mr.serialized.table.";
   public static final String TABLE_CATALOG_PREFIX = 
"iceberg.mr.table.catalog.";
   public static final String LOCALITY = "iceberg.mr.locality";
+
+  /**
+   * @deprecated please use {@link #catalogPropertyConfigKey(String, String)}
+   * with config key {@link 
org.apache.iceberg.CatalogUtil#ICEBERG_CATALOG_TYPE} to specify the type of a 
catalog.
+   */
+  @Deprecated
   public static final String CATALOG = "iceberg.mr.catalog";
+
+  /**
+   * @deprecated please use {@link #catalogPropertyConfigKey(String, String)}
+   * with config key {@link 
org.apache.iceberg.CatalogProperties#WAREHOUSE_LOCATION}
+   * to specify the warehouse location of a catalog.
+   */
+  @Deprecated
   public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION = 
"iceberg.mr.catalog.hadoop.warehouse.location";
+
+  /**
+   * @deprecated please use {@link #catalogPropertyConfigKey(String, String)}
+   * with config key {@link org.apache.iceberg.CatalogProperties#CATALOG_IMPL}
+   * to specify the implementation of a catalog.
+   */
+  @Deprecated
   public static final String CATALOG_LOADER_CLASS = 
"iceberg.mr.catalog.loader.class";
+
   public static final String CTAS_TABLE_NAME = "iceberg.mr.ctas.table.name";
   public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns";
   public static final String EXTERNAL_TABLE_PURGE = "external.table.purge";
@@ -211,6 +232,18 @@ public class InputFormatConfig {
     return readColumns.split(conf.get(serdeConstants.COLUMN_NAME_DELIMITER, 
String.valueOf(SerDeUtils.COMMA)));
   }
 
+  /**
+   * Get Hadoop config key of a catalog property based on catalog name
+   * @param catalogName catalog name
+   * @param catalogProperty catalog property, can be any custom property,
+   *                        a commonly used list of properties can be found
+   *                        at {@link org.apache.iceberg.CatalogProperties}
+   * @return Hadoop config key of a catalog property for the catalog name
+   */
+  public static String catalogPropertyConfigKey(String catalogName, String 
catalogProperty) {
+    return String.format("%s%s.%s", CATALOG_CONFIG_PREFIX, catalogName, 
catalogProperty);
+  }
+
   private static Schema schema(Configuration conf, String key) {
     String json = conf.get(key);
     return json == null ? null : SchemaParser.fromJson(json);
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
index 458affd..47e9f3e 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.mr.hive.serde.objectinspector.WriteObjectInspector;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
 import org.apache.iceberg.types.Type.PrimitiveType;
 import org.apache.iceberg.types.Types.ListType;
@@ -232,7 +233,7 @@ class Deserializer {
     FixNameMappingObjectInspectorPair(Schema schema, ObjectInspectorPair pair) 
{
       super(pair.writerInspector(), pair.sourceInspector());
 
-      this.sourceNameMap = new HashMap<>(schema.columns().size());
+      this.sourceNameMap = 
Maps.newHashMapWithExpectedSize(schema.columns().size());
 
       List<? extends StructField> fields = ((StructObjectInspector) 
sourceInspector()).getAllStructFieldRefs();
       for (int i = 0; i < schema.columns().size(); ++i) {
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index 6fc8cc6..fc66890 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -116,7 +116,7 @@ public class HiveIcebergInputFormat extends 
MapredIcebergInputFormat<Record>
       IcebergSplit icebergSplit = ((IcebergSplitContainer) 
split).icebergSplit();
       // bogus cast for favouring code reuse over syntax
       return (RecordReader) HIVE_VECTORIZED_RECORDREADER_CTOR.newInstance(
-          new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>(),
+          new IcebergInputFormat<>(),
           icebergSplit,
           job,
           reporter);
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 9a3c54a..c7951ff 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -39,9 +39,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericAppenderFactory;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.mapred.Container;
@@ -78,15 +76,15 @@ public class HiveIcebergOutputFormat<T> implements 
OutputFormat<NullWritable, Co
     long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), 
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
             TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
     FileIO io = table.io();
-    LocationProvider location = table.locationProvider();
-    EncryptionManager encryption = table.encryption();
-    OutputFileFactory outputFileFactory =
-        new OutputFileFactory(spec, fileFormat, location, io, encryption, 
taskAttemptID.getTaskID().getId(),
-            taskAttemptID.getId(), 
jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID());
+    int partitionId = taskAttemptID.getTaskID().getId();
+    int taskId = taskAttemptID.getId();
+    String operationId = jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + 
taskAttemptID.getJobID();
+    OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, 
partitionId, taskId)
+        .format(fileFormat)
+        .operationId(operationId)
+        .build();
     String tableName = jc.get(Catalogs.NAME);
-    HiveIcebergRecordWriter writer = new HiveIcebergRecordWriter(schema, spec, 
fileFormat,
+    return new HiveIcebergRecordWriter(schema, spec, fileFormat,
         new GenericAppenderFactory(schema, spec), outputFileFactory, io, 
targetFileSize, taskAttemptID, tableName);
-
-    return writer;
   }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index 6bd4214..c215e48 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.mr.hive;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -56,6 +55,7 @@ import 
org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +70,7 @@ public class HiveIcebergSerDe extends AbstractSerDe {
   private ObjectInspector inspector;
   private Schema tableSchema;
   private Collection<String> partitionColumns;
-  private Map<ObjectInspector, Deserializer> deserializers = new HashMap<>(1);
+  private Map<ObjectInspector, Deserializer> deserializers = 
Maps.newHashMapWithExpectedSize(1);
   private Container<Record> row = new Container<>();
 
   @Override
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index c6525aa..fdba3df 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -66,6 +67,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
@@ -136,13 +138,14 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
     InputFormatConfig.InMemoryDataModel model = 
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
         InputFormatConfig.InMemoryDataModel.GENERIC);
     try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) 
{
+      Table serializableTable = SerializableTable.copyOf(table);
       tasksIterable.forEach(task -> {
         if (applyResidual && (model == 
InputFormatConfig.InMemoryDataModel.HIVE ||
             model == InputFormatConfig.InMemoryDataModel.PIG)) {
           // TODO: We do not support residual evaluation for HIVE and PIG in 
memory data model yet
           checkResiduals(task);
         }
-        splits.add(new IcebergSplit(conf, task, table.io(), 
table.encryption()));
+        splits.add(new IcebergSplit(serializableTable, conf, task));
       });
     } catch (IOException e) {
       throw new UncheckedIOException(String.format("Failed to close table 
scan: %s", scan), e);
@@ -190,6 +193,7 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
     private TaskAttemptContext context;
     private Schema tableSchema;
     private Schema expectedSchema;
+    private String nameMapping;
     private boolean reuseContainers;
     private boolean caseSensitive;
     private InputFormatConfig.InMemoryDataModel inMemoryDataModel;
@@ -205,10 +209,12 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
       // For now IcebergInputFormat does its own split planning and does not 
accept FileSplit instances
       CombinedScanTask task = ((IcebergSplit) split).task();
       this.context = newContext;
-      this.io = ((IcebergSplit) split).io();
-      this.encryptionManager = ((IcebergSplit) split).encryptionManager();
+      Table table = ((IcebergSplit) split).table();
+      this.io = table.io();
+      this.encryptionManager = table.encryption();
       this.tasks = task.files().iterator();
       this.tableSchema = InputFormatConfig.tableSchema(conf);
+      this.nameMapping = 
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
       this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, 
InputFormatConfig.CASE_SENSITIVE_DEFAULT);
       this.expectedSchema = readSchema(conf, tableSchema, caseSensitive);
       this.reuseContainers = 
conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false);
@@ -333,6 +339,9 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
       if (reuseContainers) {
         avroReadBuilder.reuseContainers();
       }
+      if (nameMapping != null) {
+        
avroReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+      }
 
       switch (inMemoryDataModel) {
         case PIG:
@@ -357,6 +366,9 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
       if (reuseContainers) {
         parquetReadBuilder.reuseContainers();
       }
+      if (nameMapping != null) {
+        
parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+      }
 
       switch (inMemoryDataModel) {
         case PIG:
@@ -380,8 +392,8 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
       // ORC does not support reuse containers yet
       switch (inMemoryDataModel) {
         case PIG:
-          // TODO: implement value readers for Pig and Hive
-          throw new UnsupportedOperationException("ORC support not yet 
supported for Pig and Hive");
+          // TODO: implement value readers for Pig
+          throw new UnsupportedOperationException("ORC support not yet 
supported for Pig");
         case HIVE:
           if (MetastoreUtil.hive3PresentOnClasspath()) {
             orcIterator = HIVE_VECTORIZED_READER_BUILDER.invoke(inputFile, 
task, idToConstant, context);
@@ -398,6 +410,10 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
           orcReadBuilder.createReaderFunc(
               fileSchema -> GenericOrcReader.buildReader(
                   readSchema, fileSchema, idToConstant));
+
+          if (nameMapping != null) {
+            
orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+          }
           orcIterator = orcReadBuilder.build();
       }
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
index 43b78d6..8bc332e 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
@@ -26,11 +26,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.hadoop.Util;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.util.SerializationUtil;
 
@@ -40,9 +37,8 @@ public class IcebergSplit extends InputSplit implements 
org.apache.hadoop.mapred
 
   public static final String[] ANYWHERE = new String[]{"*"};
 
+  private Table table;
   private CombinedScanTask task;
-  private FileIO io;
-  private EncryptionManager encryptionManager;
 
   private transient String[] locations;
   private transient Configuration conf;
@@ -51,11 +47,10 @@ public class IcebergSplit extends InputSplit implements 
org.apache.hadoop.mapred
   public IcebergSplit() {
   }
 
-  IcebergSplit(Configuration conf, CombinedScanTask task, FileIO io, 
EncryptionManager encryptionManager) {
+  IcebergSplit(Table table, Configuration conf, CombinedScanTask task) {
+    this.table = table;
     this.task = task;
     this.conf = conf;
-    this.io = io;
-    this.encryptionManager = encryptionManager;
   }
 
   public CombinedScanTask task() {
@@ -88,45 +83,27 @@ public class IcebergSplit extends InputSplit implements 
org.apache.hadoop.mapred
 
   @Override
   public void write(DataOutput out) throws IOException {
+    byte[] tableData = SerializationUtil.serializeToBytes(table);
+    out.writeInt(tableData.length);
+    out.write(tableData);
+
     byte[] data = SerializationUtil.serializeToBytes(this.task);
     out.writeInt(data.length);
     out.write(data);
-
-    byte[] ioData;
-    if (io instanceof HadoopFileIO) {
-      SerializableConfiguration serializableConf = new 
SerializableConfiguration(((HadoopFileIO) io).conf());
-      ioData = SerializationUtil.serializeToBytes(new 
HadoopFileIO(serializableConf::get));
-    } else {
-      ioData = SerializationUtil.serializeToBytes(io);
-    }
-    out.writeInt(ioData.length);
-    out.write(ioData);
-
-    byte[] encryptionManagerData = 
SerializationUtil.serializeToBytes(encryptionManager);
-    out.writeInt(encryptionManagerData.length);
-    out.write(encryptionManagerData);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
+    byte[] tableData = new byte[in.readInt()];
+    in.readFully(tableData);
+    this.table = SerializationUtil.deserializeFromBytes(tableData);
+
     byte[] data = new byte[in.readInt()];
     in.readFully(data);
     this.task = SerializationUtil.deserializeFromBytes(data);
-
-    byte[] ioData = new byte[in.readInt()];
-    in.readFully(ioData);
-    this.io = SerializationUtil.deserializeFromBytes(ioData);
-
-    byte[] encryptionManagerData = new byte[in.readInt()];
-    in.readFully(encryptionManagerData);
-    this.encryptionManager = 
SerializationUtil.deserializeFromBytes(encryptionManagerData);
-  }
-
-  public FileIO io() {
-    return io;
   }
 
-  public EncryptionManager encryptionManager() {
-    return encryptionManager;
+  public Table table() {
+    return table;
   }
 }
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
index 04168eb..9b3ee40 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.PartitionSpecParser;
@@ -38,6 +39,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -191,91 +193,149 @@ public class TestCatalogs {
   }
 
   @Test
-  public void testLoadCatalog() throws IOException {
-    conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
-    Assert.assertFalse(Catalogs.loadCatalog(conf, null).isPresent());
-
-    String nonExistentCatalogType = "fooType";
+  public void testLegacyLoadCatalogDefault() {
+    Optional<Catalog> defaultCatalog = Catalogs.loadCatalog(conf, null);
+    Assert.assertTrue(defaultCatalog.isPresent());
+    
Assertions.assertThat(defaultCatalog.get()).isInstanceOf(HiveCatalog.class);
+    Assert.assertTrue(Catalogs.hiveCatalog(conf, new Properties()));
+  }
 
-    conf.set(InputFormatConfig.CATALOG, nonExistentCatalogType);
-    AssertHelpers.assertThrows(
-            "should complain about catalog not supported", 
UnsupportedOperationException.class,
-            "Unknown catalog type", () -> Catalogs.loadCatalog(conf, null));
+  @Test
+  public void testLegacyLoadCatalogHive() {
+    conf.set(InputFormatConfig.CATALOG, CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
+    Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, null);
+    Assert.assertTrue(hiveCatalog.isPresent());
+    Assertions.assertThat(hiveCatalog.get()).isInstanceOf(HiveCatalog.class);
+    Assert.assertTrue(Catalogs.hiveCatalog(conf, new Properties()));
+  }
 
+  @Test
+  public void testLegacyLoadCatalogHadoop() {
     conf.set(InputFormatConfig.CATALOG, 
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
     conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, 
"/tmp/mylocation");
     Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, null);
-
     Assert.assertTrue(hadoopCatalog.isPresent());
-    Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog);
-
-    conf.set(InputFormatConfig.CATALOG, CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
-    Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, null);
+    
Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class);
+    Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties()));
+  }
 
-    Assert.assertTrue(hiveCatalog.isPresent());
-    Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog);
+  @Test
+  public void testLegacyLoadCatalogCustom() {
+    conf.set(InputFormatConfig.CATALOG_LOADER_CLASS, 
CustomHadoopCatalog.class.getName());
+    conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, 
"/tmp/mylocation");
+    Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf, null);
+    Assert.assertTrue(customHadoopCatalog.isPresent());
+    
Assertions.assertThat(customHadoopCatalog.get()).isInstanceOf(CustomHadoopCatalog.class);
+    Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties()));
+  }
 
+  @Test
+  public void testLegacyLoadCatalogLocation() {
     conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
     Assert.assertFalse(Catalogs.loadCatalog(conf, null).isPresent());
+  }
 
-    // arbitrary catalog name with non existent catalog type
-    String catalogName = "barCatalog";
-    conf.unset(InputFormatConfig.CATALOG);
-    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName), nonExistentCatalogType);
+  @Test
+  public void testLegacyLoadCatalogUnknown() {
+    conf.set(InputFormatConfig.CATALOG, "fooType");
     AssertHelpers.assertThrows(
-            "should complain about catalog not supported", 
UnsupportedOperationException.class,
-            "Unknown catalog type:", () -> Catalogs.loadCatalog(conf, 
catalogName));
+        "should complain about catalog not supported", 
UnsupportedOperationException.class,
+        "Unknown catalog type", () -> Catalogs.loadCatalog(conf, null));
+  }
 
-    // arbitrary catalog name with hadoop catalog type and default warehouse 
location
-    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName),
-            CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
-    hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
+  @Test
+  public void testLoadCatalogDefault() {
+    String catalogName = "barCatalog";
+    Optional<Catalog> defaultCatalog = Catalogs.loadCatalog(conf, catalogName);
+    Assert.assertTrue(defaultCatalog.isPresent());
+    
Assertions.assertThat(defaultCatalog.get()).isInstanceOf(HiveCatalog.class);
+    Properties properties = new Properties();
+    properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+    Assert.assertTrue(Catalogs.hiveCatalog(conf, properties));
+  }
 
-    Assert.assertTrue(hadoopCatalog.isPresent());
-    Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog);
+  @Test
+  public void testLoadCatalogHive() {
+    String catalogName = "barCatalog";
+    conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
+        CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
+    Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, catalogName);
+    Assert.assertTrue(hiveCatalog.isPresent());
+    Assertions.assertThat(hiveCatalog.get()).isInstanceOf(HiveCatalog.class);
+    Properties properties = new Properties();
+    properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+    Assert.assertTrue(Catalogs.hiveCatalog(conf, properties));
+  }
 
-    // arbitrary catalog name with hadoop catalog type and provided warehouse 
location
-    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName),
-            CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
-    conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, 
catalogName), "/tmp/mylocation");
-    hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
+  @Test
+  public void testLegacyLoadCustomCatalogWithHiveCatalogTypeSet() {
+    String catalogName = "barCatalog";
+    conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
+            CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
+    conf.set(InputFormatConfig.CATALOG_LOADER_CLASS, 
CustomHadoopCatalog.class.getName());
+    conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, 
"/tmp/mylocation");
+    AssertHelpers.assertThrows("Should complain about both configs being set", 
IllegalArgumentException.class,
+            "both type and catalog-impl are set", () -> 
Catalogs.loadCatalog(conf, catalogName));
+  }
 
+  @Test
+  public void testLoadCatalogHadoop() {
+    String catalogName = "barCatalog";
+    conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
+        CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, 
CatalogProperties.WAREHOUSE_LOCATION),
+        "/tmp/mylocation");
+    Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
     Assert.assertTrue(hadoopCatalog.isPresent());
-    Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog);
+    
Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class);
     Assert.assertEquals("HadoopCatalog{name=barCatalog, 
location=/tmp/mylocation}", hadoopCatalog.get().toString());
+    Properties properties = new Properties();
+    properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+    Assert.assertFalse(Catalogs.hiveCatalog(conf, properties));
+  }
 
-    // arbitrary catalog name with hive catalog type
-    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName),
-            CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
-    hiveCatalog = Catalogs.loadCatalog(conf, catalogName);
-
-    Assert.assertTrue(hiveCatalog.isPresent());
-    Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog);
-
-    // arbitrary catalog name with custom catalog type without specific 
classloader
-    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName), "custom");
-    AssertHelpers.assertThrows(
-            "should complain about catalog not supported", 
UnsupportedOperationException.class,
-            "Unknown catalog type:", () -> Catalogs.loadCatalog(conf, 
catalogName));
+  @Test
+  public void testLoadCatalogHadoopWithLegacyWarehouseLocation() {
+    String catalogName = "barCatalog";
+    conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
+        CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, 
"/tmp/mylocation");
+    Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
+    Assert.assertTrue(hadoopCatalog.isPresent());
+    
Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class);
+    Assert.assertEquals("HadoopCatalog{name=barCatalog, 
location=/tmp/mylocation}", hadoopCatalog.get().toString());
+    Properties properties = new Properties();
+    properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+    Assert.assertFalse(Catalogs.hiveCatalog(conf, properties));
+  }
 
-    // arbitrary catalog name with custom catalog type and provided classloader
-    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName), "custom");
-    conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, 
catalogName), CustomHadoopCatalog.class.getName());
+  @Test
+  public void testLoadCatalogCustom() {
+    String catalogName = "barCatalog";
+    conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, 
CatalogProperties.CATALOG_IMPL),
+        CustomHadoopCatalog.class.getName());
+    conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, 
CatalogProperties.WAREHOUSE_LOCATION),
+        "/tmp/mylocation");
     Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf, 
catalogName);
-
     Assert.assertTrue(customHadoopCatalog.isPresent());
-    Assert.assertTrue(customHadoopCatalog.get() instanceof 
CustomHadoopCatalog);
-
-    // arbitrary catalog name with location catalog type
-    conf.unset(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName));
-    Assert.assertFalse(Catalogs.loadCatalog(conf, catalogName).isPresent());
+    
Assertions.assertThat(customHadoopCatalog.get()).isInstanceOf(CustomHadoopCatalog.class);
+    Properties properties = new Properties();
+    properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+    Assert.assertFalse(Catalogs.hiveCatalog(conf, properties));
+  }
 
-    // default catalog configuration
-    conf.unset(InputFormatConfig.CATALOG);
-    hiveCatalog = Catalogs.loadCatalog(conf, null);
+  @Test
+  public void testLoadCatalogLocation() {
+    Assert.assertFalse(Catalogs.loadCatalog(conf, 
Catalogs.ICEBERG_HADOOP_TABLE_NAME).isPresent());
+  }
 
-    Assert.assertTrue(hiveCatalog.isPresent());
-    Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog);
+  @Test
+  public void testLoadCatalogUnknown() {
+    String catalogName = "barCatalog";
+    conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, 
CatalogUtil.ICEBERG_CATALOG_TYPE), "fooType");
+    AssertHelpers.assertThrows(
+        "should complain about catalog not supported", 
UnsupportedOperationException.class,
+        "Unknown catalog type:", () -> Catalogs.loadCatalog(conf, 
catalogName));
   }
 
   public static class CustomHadoopCatalog extends HadoopCatalog {
@@ -291,9 +351,10 @@ public class TestCatalogs {
   }
 
   private void setCustomCatalogProperties(String catalogName, String 
warehouseLocation) {
-    conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, 
catalogName), warehouseLocation);
-    conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, 
catalogName), CustomHadoopCatalog.class.getName());
-    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName), "custom");
+    conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, 
CatalogProperties.WAREHOUSE_LOCATION),
+        warehouseLocation);
+    conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, 
CatalogProperties.CATALOG_IMPL),
+        CustomHadoopCatalog.class.getName());
     conf.set(InputFormatConfig.CATALOG_NAME, catalogName);
   }
 }
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
index 6f3cbee..bdc1014 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
@@ -351,10 +352,10 @@ public class TestIcebergInputFormats {
     String warehouseLocation = 
temp.newFolder("hadoop_catalog").getAbsolutePath();
     conf.set("warehouse.location", warehouseLocation);
     conf.set(InputFormatConfig.CATALOG_NAME, 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
-    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME),
-            CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
-    conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME),
-            warehouseLocation);
+    
conf.set(InputFormatConfig.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME,
+        CatalogUtil.ICEBERG_CATALOG_TYPE), 
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    
conf.set(InputFormatConfig.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME,
+        CatalogProperties.WAREHOUSE_LOCATION), warehouseLocation);
 
     Catalog catalog = new HadoopCatalog(conf, conf.get("warehouse.location"));
     TableIdentifier identifier = TableIdentifier.of("db", "t");
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
index ef964f8..2ba4e50 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.data.DeleteReadTests;
+import org.apache.iceberg.data.InternalRecordWrapper;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.util.StructLikeSet;
 import org.junit.Assert;
@@ -104,6 +105,7 @@ public class TestInputFormatReaderDeletes extends 
DeleteReadTests {
         .filter(recordFactory -> recordFactory.name().equals(inputFormat))
         .map(recordFactory -> 
recordFactory.create(builder.project(projected).conf()).getRecords())
         .flatMap(List::stream)
+        .map(record -> new 
InternalRecordWrapper(projected.asStruct()).wrap(record))
         .collect(Collectors.toList())
     );
 
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
index 72a04a4..14760ee 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -45,9 +46,10 @@ public class TestHiveIcebergCTAS extends 
HiveIcebergStorageHandlerWithEngineBase
         HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 
TableIdentifier.of("default", "source"), false));
 
     shell.executeStatement(String.format(
-        "CREATE TABLE target STORED BY ICEBERG %s TBLPROPERTIES ('%s'='%s') AS 
SELECT * FROM source",
+        "CREATE TABLE target STORED BY ICEBERG %s %s AS SELECT * FROM source",
         testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
"target")),
-        TableProperties.DEFAULT_FILE_FORMAT, fileFormat));
+        testTables.propertiesForCreateTableSQL(
+            ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
fileFormat.toString()))));
 
     List<Object[]> objects = shell.executeStatement("SELECT * FROM target 
ORDER BY id");
     
HiveIcebergTestUtils.validateData(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS,
@@ -64,8 +66,9 @@ public class TestHiveIcebergCTAS extends 
HiveIcebergStorageHandlerWithEngineBase
 
     shell.executeStatement(String.format(
         "CREATE TABLE target PARTITIONED BY (dept, name) " +
-            "STORED BY ICEBERG TBLPROPERTIES ('%s'='%s') AS SELECT * FROM 
source",
-        TableProperties.DEFAULT_FILE_FORMAT, fileFormat));
+            "STORED BY ICEBERG %s AS SELECT * FROM source",
+        testTables.propertiesForCreateTableSQL(
+            ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
fileFormat.toString()))));
 
     // check table can be read back correctly
     List<Object[]> objects = shell.executeStatement("SELECT id, name, dept 
FROM target ORDER BY id");
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
index 3143d32..5222361 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.mr.TestHelper;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
@@ -305,10 +306,11 @@ public class TestHiveIcebergInserts extends 
HiveIcebergStorageHandlerWithEngineB
 
     // create Iceberg table without specifying a write format in the tbl 
properties
     // it should fall back to using the default file format
-    shell.executeStatement(String.format("CREATE EXTERNAL TABLE %s (id bigint, 
name string) STORED BY '%s' %s",
+    shell.executeStatement(String.format("CREATE EXTERNAL TABLE %s (id bigint, 
name string) STORED BY '%s' %s %s",
         identifier,
         HiveIcebergStorageHandler.class.getName(),
-        testTables.locationForCreateTableSQL(identifier)));
+        testTables.locationForCreateTableSQL(identifier),
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of())));
 
     shell.executeStatement(String.format("INSERT INTO %s VALUES (10, 
'Linda')", identifier));
     List<Object[]> results = shell.executeStatement(String.format("SELECT * 
FROM %s", identifier));
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergMigration.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergMigration.java
index 205f9c0..c8de0d1 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergMigration.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergMigration.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -51,7 +52,8 @@ public class TestHiveIcebergMigration extends 
HiveIcebergStorageHandlerWithEngin
     Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == 
FileFormat.PARQUET);
     String tableName = "tbl";
     String createQuery = "CREATE EXTERNAL TABLE " +  tableName + " (a int) 
STORED AS " + fileFormat.name() + " " +
-        testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName));
+        testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName)) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of());
     shell.executeStatement(createQuery);
     shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2), 
(3)");
     validateMigration(tableName);
@@ -62,7 +64,8 @@ public class TestHiveIcebergMigration extends 
HiveIcebergStorageHandlerWithEngin
     Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == 
FileFormat.PARQUET);
     String tableName = "tbl_part";
     shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) 
PARTITIONED BY (b string) STORED AS " +
-        fileFormat.name() + " " + 
testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)));
+        fileFormat.name() + " " + 
testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa') 
VALUES (1), (2), (3)");
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb') 
VALUES (4), (5)");
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc') 
VALUES (6)");
@@ -76,7 +79,8 @@ public class TestHiveIcebergMigration extends 
HiveIcebergStorageHandlerWithEngin
     String tableName = "tbl_part_bucketed";
     shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) 
PARTITIONED BY (b string) clustered by " +
         "(a) INTO 2 BUCKETS STORED AS " + fileFormat.name() + " " +
-        testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName)));
+        testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName)) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa') 
VALUES (1), (2), (3)");
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb') 
VALUES (4), (5)");
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc') 
VALUES (6)");
@@ -89,7 +93,8 @@ public class TestHiveIcebergMigration extends 
HiveIcebergStorageHandlerWithEngin
     Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == 
FileFormat.PARQUET);
     String tableName = "tbl_rollback";
     shell.executeStatement("CREATE EXTERNAL TABLE " +  tableName + " (a int) 
STORED AS " + fileFormat.name() + " " +
-        testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName)));
+        testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName)) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
     shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2), 
(3)");
     validateMigrationRollback(tableName);
   }
@@ -99,7 +104,8 @@ public class TestHiveIcebergMigration extends 
HiveIcebergStorageHandlerWithEngin
     Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == 
FileFormat.PARQUET);
     String tableName = "tbl_rollback";
     shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) 
PARTITIONED BY (b string) STORED AS " +
-        fileFormat.name() + " " + 
testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)));
+        fileFormat.name() + " " + 
testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa') 
VALUES (1), (2), (3)");
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb') 
VALUES (4), (5)");
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc') 
VALUES (6)");
@@ -113,7 +119,8 @@ public class TestHiveIcebergMigration extends 
HiveIcebergStorageHandlerWithEngin
     String tableName = "tbl_rollback";
     shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) 
PARTITIONED BY (b string, c int) " +
         "STORED AS " + fileFormat.name() + " " +
-        testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName)));
+        testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName)) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa', 
c='111') VALUES (1), (2), (3)");
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb', 
c='111') VALUES (4), (5)");
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa', 
c='222') VALUES (6)");
@@ -127,7 +134,8 @@ public class TestHiveIcebergMigration extends 
HiveIcebergStorageHandlerWithEngin
     String tableName = "tbl_part_bucketed";
     shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) 
PARTITIONED BY (b string) clustered by " +
         "(a) INTO 2 BUCKETS STORED AS " + fileFormat.name() + " " +
-        testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName)));
+        testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName)) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa') 
VALUES (1), (2), (3)");
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb') 
VALUES (4), (5)");
     shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc') 
VALUES (6)");
@@ -144,7 +152,8 @@ public class TestHiveIcebergMigration extends 
HiveIcebergStorageHandlerWithEngin
     List<String> formats = ImmutableList.of("TEXTFILE", "JSONFILE", "RCFILE", 
"SEQUENCEFILE");
     formats.forEach(format -> {
       shell.executeStatement("CREATE EXTERNAL TABLE " +  tableName + " (a int) 
STORED AS " + format + " " +
-          testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName)));
+          testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName)) +
+          testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
       shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2), 
(3)");
       AssertHelpers.assertThrows("Migrating a " + format + " table to Iceberg 
should have thrown an exception.",
           IllegalArgumentException.class, "Cannot convert hive table to 
iceberg with input format: ",
@@ -161,7 +170,8 @@ public class TestHiveIcebergMigration extends 
HiveIcebergStorageHandlerWithEngin
         testTableType == TestTables.TestTableType.HIVE_CATALOG);
     String tableName = "tbl_unsupported";
     shell.executeStatement("CREATE MANAGED TABLE " +  tableName + " (a int) 
STORED AS " + fileFormat + " " +
-        testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName)));
+        testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
tableName)) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
     shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2), 
(3)");
     AssertHelpers.assertThrows("Migrating a managed table to Iceberg should 
have thrown an exception.",
         IllegalArgumentException.class, "Converting non-external, temporary or 
transactional hive table to iceberg",
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index af8c383..7b4dce5 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -41,10 +41,8 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.GenericAppenderFactory;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
@@ -266,18 +264,20 @@ public class TestHiveIcebergOutputCommitter {
 
     Table table = HiveIcebergStorageHandler.table(conf, name);
     FileIO io = table.io();
-    LocationProvider location = table.locationProvider();
-    EncryptionManager encryption = table.encryption();
     Schema schema = HiveIcebergStorageHandler.schema(conf);
     PartitionSpec spec = table.spec();
 
     for (int i = 0; i < taskNum; ++i) {
       List<Record> records = TestHelper.generateRandomRecords(schema, 
RECORD_NUM, i + attemptNum);
       TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), 
JOB_ID.getId(), TaskType.MAP, i, attemptNum);
-      OutputFileFactory outputFileFactory =
-          new OutputFileFactory(spec, FileFormat.PARQUET, location, io, 
encryption, taskId.getTaskID().getId(),
-              attemptNum, QUERY_ID + "-" + JOB_ID);
-      HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, 
spec, FileFormat.PARQUET,
+      int partitionId = taskId.getTaskID().getId();
+      String operationId = QUERY_ID + "-" + JOB_ID;
+      FileFormat fileFormat = FileFormat.PARQUET;
+      OutputFileFactory outputFileFactory = 
OutputFileFactory.builderFor(table, partitionId, attemptNum)
+          .format(fileFormat)
+          .operationId(operationId)
+          .build();
+      HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, 
spec, fileFormat,
           new GenericAppenderFactory(schema), outputFileFactory, io, 
TARGET_FILE_SIZE,
           TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME));
 
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
index da21af3..84e8b57 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
@@ -26,9 +26,8 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.mr.Catalogs;
-import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
@@ -171,9 +170,9 @@ public class TestHiveIcebergSelects extends 
HiveIcebergStorageHandlerWithEngineB
     // note: the Chinese character seems to be accepted in the column name, 
but not
     // in the table name - this is the case for both Iceberg and standard Hive 
tables.
     shell.executeStatement(String.format(
-        "CREATE TABLE `%s` (id bigint, `dep,! 是,t` string) STORED BY ICEBERG 
STORED AS %s %s TBLPROPERTIES ('%s'='%s')",
+        "CREATE TABLE `%s` (id bigint, `dep,! 是,t` string) STORED BY ICEBERG 
STORED AS %s %s %s",
         table.name(), fileFormat, testTables.locationForCreateTableSQL(table),
-        InputFormatConfig.CATALOG_NAME, 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME));
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of())));
     shell.executeStatement(String.format("INSERT INTO `%s` VALUES (1, 'moon'), 
(2, 'star')", table.name()));
 
     List<Object[]> result = shell.executeStatement(String.format(
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
index d601100..4dca587 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hive.HiveTableOperations;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -147,9 +148,10 @@ public class TestHiveIcebergStatistics extends 
HiveIcebergStorageHandlerWithEngi
 
     shell.setHiveSessionValue(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname, 
true);
     shell.executeStatement(String.format(
-        "CREATE TABLE target STORED BY ICEBERG %s TBLPROPERTIES ('%s'='%s') AS 
SELECT * FROM source",
+        "CREATE TABLE target STORED BY ICEBERG %s %s AS SELECT * FROM source",
         testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
"target")),
-        TableProperties.DEFAULT_FILE_FORMAT, fileFormat));
+        testTables.propertiesForCreateTableSQL(
+            ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
fileFormat.toString()))));
 
     checkColStat("target", "id", true);
     checkColStatMinMaxValue("target", "id", 0, 2);
@@ -165,9 +167,9 @@ public class TestHiveIcebergStatistics extends 
HiveIcebergStorageHandlerWithEngi
 
     shell.setHiveSessionValue(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname, 
true);
     shell.executeStatement(String.format(
-        "CREATE TABLE target PARTITIONED BY (dept, name) " +
-            "STORED BY ICEBERG TBLPROPERTIES ('%s'='%s') AS SELECT * FROM 
source s",
-        TableProperties.DEFAULT_FILE_FORMAT, fileFormat));
+        "CREATE TABLE target PARTITIONED BY (dept, name) STORED BY ICEBERG %s 
AS SELECT * FROM source s",
+        testTables.propertiesForCreateTableSQL(
+            ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
fileFormat.toString()))));
 
     checkColStat("target", "id", true);
     checkColStat("target", "dept", true);
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
index dcadacf..218114b 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
@@ -36,7 +36,6 @@ import org.apache.iceberg.TestHelpers.Row;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -233,7 +232,7 @@ public class TestHiveIcebergStorageHandlerLocalScan {
         "TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + 
PartitionSpecParser.toJson(spec) + "', " +
         "'" + InputFormatConfig.TABLE_SCHEMA + "'='" +
         
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " 
 +
-        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')";
+        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
testTables.catalogName() + "')";
     runCreateAndReadTest(identifier, createSql, 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, data);
   }
 
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index ec0cd4e..fc90722 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -175,7 +175,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
         testTables.locationForCreateTableSQL(identifier) +
         " TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
         SchemaParser.toJson(schema) + "', " +
-        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
testTables.catalogName() + "')");
 
     shell.executeStatement("ALTER TABLE " + identifier + " SET PARTITION SPEC 
(month(ts))");
 
@@ -219,7 +219,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
         testTables.locationForCreateTableSQL(identifier) +
         "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
         SchemaParser.toJson(schema) + "', " +
-        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
testTables.catalogName() + "')");
 
     PartitionSpec spec = PartitionSpec.builderFor(schema)
         .year("year_field")
@@ -280,7 +280,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
         testTables.locationForCreateTableSQL(identifier) +
         " TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
         SchemaParser.toJson(schema) + "', " +
-        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
testTables.catalogName() + "')");
     Table table = testTables.loadTable(identifier);
     Assert.assertEquals(spec, table.spec());
   }
@@ -297,7 +297,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
         "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
         PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "', " +
         "'dummy'='test', " +
-        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
testTables.catalogName() + "')");
 
     // Check the Iceberg table data
     org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -377,7 +377,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
         "STRING) STORED BY iceBerg %s TBLPROPERTIES ('%s'='%s')",
         testTables.locationForCreateTableSQL(identifier),
         InputFormatConfig.CATALOG_NAME,
-        Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
+        testTables.catalogName());
     shell.executeStatement(query);
     Assert.assertNotNull(testTables.loadTable(identifier));
   }
@@ -391,7 +391,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
         "orc",
         testTables.locationForCreateTableSQL(identifier),
         InputFormatConfig.CATALOG_NAME,
-        Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
+        testTables.catalogName());
     shell.executeStatement(query);
     Table table = testTables.loadTable(identifier);
     Assert.assertNotNull(table);
@@ -407,7 +407,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
         testTables.locationForCreateTableSQL(identifier) +
         "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
         
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "','" 
+
-        InputFormatConfig.CATALOG_NAME + "'='" + 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+        InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + 
"')");
 
     // Check the Iceberg table partition data
     org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -425,7 +425,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
         
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " 
+
         "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
         PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "', " +
-        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
testTables.catalogName() + "')");
 
     // Check the Iceberg table partition data
     org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -442,7 +442,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
         "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
         
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " 
+
         "'" + InputFormatConfig.EXTERNAL_TABLE_PURGE + "'='FALSE', " +
-        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+        "'" + InputFormatConfig.CATALOG_NAME + "'='" + 
testTables.catalogName() + "')");
 
     org.apache.hadoop.hive.metastore.api.Table hmsTable = 
shell.metastore().getTable("default", "customers");
     Properties tableProperties = new Properties();
@@ -514,7 +514,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
               "STORED BY ICEBERG " +
               testTables.locationForCreateTableSQL(identifier) +
               "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + 
"'='WrongSchema'" +
-              ",'" + InputFormatConfig.CATALOG_NAME + "'='" + 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+              ",'" + InputFormatConfig.CATALOG_NAME + "'='" + 
testTables.catalogName() + "')");
         }
     );
 
@@ -536,7 +536,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
                 "STORED BY ICEBERG " +
                 "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
                 
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "','" 
+
-                InputFormatConfig.CATALOG_NAME + "'='" + 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+                InputFormatConfig.CATALOG_NAME + "'='" + 
testTables.catalogName() + "')");
           }
       );
     }
@@ -556,7 +556,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
                 "STORED BY ICEBERG " +
                 "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
                 
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "',' 
" +
-                InputFormatConfig.CATALOG_NAME + "'='" + 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+                InputFormatConfig.CATALOG_NAME + "'='" + 
testTables.catalogName() + "')");
           }
       );
     } else {
@@ -579,8 +579,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
               "PARTITIONED BY (first_name STRING) " +
               "STORED BY ICEBERG " +
               
testTables.locationForCreateTableSQL(TableIdentifier.of("default", 
"customers")) +
-              " TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" +
-              PartitionSpecParser.toJson(spec) + "')");
+              testTables.propertiesForCreateTableSQL(
+                      ImmutableMap.of(InputFormatConfig.PARTITION_SPEC, 
PartitionSpecParser.toJson(spec))));
         }
     );
   }
@@ -791,7 +791,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
         InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA),
         InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(SPEC),
         "custom_property", "initial_val",
-        InputFormatConfig.CATALOG_NAME, 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME));
+        InputFormatConfig.CATALOG_NAME, testTables.catalogName()));
 
 
     // Check the Iceberg table parameters
@@ -985,7 +985,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
     TableIdentifier identifier = TableIdentifier.of("default", "customers");
     shell.executeStatement("CREATE EXTERNAL TABLE customers (id int, name 
string) STORED BY ICEBERG " +
         testTables.locationForCreateTableSQL(identifier) + " TBLPROPERTIES ('" 
+
-        InputFormatConfig.CATALOG_NAME + "'='" + 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "', " +
+        InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + 
"', " +
         "'" + TableProperties.FORMAT_VERSION + "'='2')");
 
     org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java
index 2a0e33a..f32fb10 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java
@@ -118,9 +118,9 @@ public class 
TestHiveIcebergStorageHandlerWithMultipleCatalogs {
 
   @Test
   public void testJoinTablesFromDifferentCatalogs() throws IOException {
-    createAndAddRecords(testTables1, fileFormat1, 
TableIdentifier.of("default", "customers1"), table1CatalogName,
+    createAndAddRecords(testTables1, fileFormat1, 
TableIdentifier.of("default", "customers1"),
         HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
-    createAndAddRecords(testTables2, fileFormat2, 
TableIdentifier.of("default", "customers2"), table2CatalogName,
+    createAndAddRecords(testTables2, fileFormat2, 
TableIdentifier.of("default", "customers2"),
         HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
 
     List<Object[]> rows = shell.executeStatement("SELECT c2.customer_id, 
c2.first_name, c2.last_name " +
@@ -164,14 +164,14 @@ public class 
TestHiveIcebergStorageHandlerWithMultipleCatalogs {
   }
 
   private void createAndAddRecords(TestTables testTables, FileFormat 
fileFormat, TableIdentifier identifier,
-                                   String catalogName, List<Record> records) 
throws IOException {
+                                   List<Record> records) throws IOException {
     String createSql = String.format(
         "CREATE EXTERNAL TABLE %s (customer_id BIGINT, first_name STRING, 
last_name STRING)" +
         " STORED BY ICEBERG %s " +
         " TBLPROPERTIES ('%s'='%s', '%s'='%s')",
         identifier,
         testTables.locationForCreateTableSQL(identifier),
-        InputFormatConfig.CATALOG_NAME, catalogName,
+        InputFormatConfig.CATALOG_NAME, testTables.catalogName(),
         TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
     shell.executeStatement(createSql);
     Table icebergTable = testTables.loadTable(identifier);
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 7cc9848..5a6f38c 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
@@ -50,6 +51,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.TestCatalogs;
 import org.apache.iceberg.mr.TestHelper;
@@ -98,6 +100,10 @@ abstract class TestTables {
     return tables;
   }
 
+  public String catalogName() {
+    return catalog;
+  }
+
   /**
    * The location string needed to be provided for CREATE TABLE ... commands,
    * like "LOCATION 'file:///tmp/warehouse/default/tablename'. Empty ("") if 
LOCATION is not needed.
@@ -108,8 +114,8 @@ abstract class TestTables {
 
   /**
    * The table properties string needed for the CREATE TABLE ... commands,
-   * like "TBLPROPERTIES('iceberg.catalog'='mycatalog')
-   * @return
+   * like {@code TBLPROPERTIES('iceberg.catalog'='mycatalog')}
+   * @return the tables properties string, such as {@code 
TBLPROPERTIES('iceberg.catalog'='mycatalog')}
    */
   public String propertiesForCreateTableSQL(Map<String, String> 
tableProperties) {
     Map<String, String> properties = new HashMap<>(tableProperties);
@@ -371,10 +377,9 @@ abstract class TestTables {
     @Override
     public Map<String, String> properties() {
       return ImmutableMap.of(
-              String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog), 
"custom",
-              String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, catalog),
+              InputFormatConfig.catalogPropertyConfigKey(catalog, 
CatalogProperties.CATALOG_IMPL),
               TestCatalogs.CustomHadoopCatalog.class.getName(),
-              String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, 
catalog),
+              InputFormatConfig.catalogPropertyConfigKey(catalog, 
CatalogProperties.WAREHOUSE_LOCATION),
               warehouseLocation
       );
     }
@@ -403,8 +408,10 @@ abstract class TestTables {
     @Override
     public Map<String, String> properties() {
       return ImmutableMap.of(
-              String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog), 
"hadoop",
-              String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, 
catalog), warehouseLocation
+              InputFormatConfig.catalogPropertyConfigKey(catalog, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
+              CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP,
+              InputFormatConfig.catalogPropertyConfigKey(catalog, 
CatalogProperties.WAREHOUSE_LOCATION),
+              warehouseLocation
       );
     }
 
@@ -414,8 +421,8 @@ abstract class TestTables {
   }
 
   static class HadoopTestTables extends TestTables {
-    HadoopTestTables(Configuration conf, TemporaryFolder temp, String 
catalogName) {
-      super(new HadoopTables(conf), temp, catalogName);
+    HadoopTestTables(Configuration conf, TemporaryFolder temp) {
+      super(new HadoopTables(conf), temp, Catalogs.ICEBERG_HADOOP_TABLE_NAME);
     }
 
     @Override
@@ -454,7 +461,8 @@ abstract class TestTables {
 
     @Override
     public Map<String, String> properties() {
-      return 
ImmutableMap.of(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalog), "hive");
+      return 
ImmutableMap.of(InputFormatConfig.catalogPropertyConfigKey(catalog, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
+              CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
     }
 
     @Override
@@ -489,7 +497,7 @@ abstract class TestTables {
   enum TestTableType {
     HADOOP_TABLE {
       public TestTables instance(Configuration conf, TemporaryFolder 
temporaryFolder, String catalogName) {
-        return new HadoopTestTables(conf, temporaryFolder, catalogName);
+        return new HadoopTestTables(conf, temporaryFolder);
       }
     },
     HADOOP_CATALOG {
diff --git a/iceberg/patched-iceberg-core/pom.xml 
b/iceberg/patched-iceberg-core/pom.xml
index c2d2d38..507818c 100644
--- a/iceberg/patched-iceberg-core/pom.xml
+++ b/iceberg/patched-iceberg-core/pom.xml
@@ -48,6 +48,11 @@
             <groupId>org.apache.iceberg</groupId>
             <artifactId>patched-iceberg-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <optional>true</optional>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
@@ -71,14 +76,11 @@
                                     <overWrite>true</overWrite>
                                     
<outputDirectory>${project.build.directory}/classes</outputDirectory>
                                     <excludes>
-                                        **/SnapshotProducer.class
-                                        **/SnapshotSummary.class
-                                        **/BaseMeta.class
-                                        **/BaseMetadataTable.class
-                                        **/StaticTableOperations.class
-                                        **/CatalogProperties.class
-                                        **/BaseMetastoreCatalog.class
-                                        **/BaseMetastoreTableOperations.class
+                                        **/JdbcClientPool.class
+                                        **/JdbcUtil.class
+                                        **/CatalogUtil.class
+                                        **/ClientPool.class
+                                        **/ClientPoolImpl.class
                                     </excludes>
                                 </artifactItem>
                             </artifactItems>
diff --git 
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/CatalogUtil.java
 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/CatalogUtil.java
new file mode 100644
index 0000000..bada6fd
--- /dev/null
+++ 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/CatalogUtil.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.GC_ENABLED;
+import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
+
+public class CatalogUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(CatalogUtil.class);
+
+  /**
+   * Shortcut catalog property to load a catalog implementation through a 
short type name,
+   * instead of specifying a full java class through {@link 
CatalogProperties#CATALOG_IMPL}.
+   * Currently the following type to implementation mappings are supported:
+   * <ul>
+   *   <li>hive: org.apache.iceberg.hive.HiveCatalog</li>
+   *   <li>hadoop: org.apache.iceberg.hadoop.HadoopCatalog</li>
+   * </ul>
+   */
+  public static final String ICEBERG_CATALOG_TYPE = "type";
+  public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+  public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+  public static final String ICEBERG_CATALOG_HIVE = 
"org.apache.iceberg.hive.HiveCatalog";
+  public static final String ICEBERG_CATALOG_HADOOP = 
"org.apache.iceberg.hadoop.HadoopCatalog";
+
+  private CatalogUtil() {
+  }
+
+  /**
+   * Drops all data and metadata files referenced by TableMetadata.
+   * <p>
+   * This should be called by dropTable implementations to clean up table 
files once the table has been dropped in the
+   * metastore.
+   *
+   * @param io a FileIO to use for deletes
+   * @param metadata the last valid TableMetadata instance for a dropped table.
+   */
+  public static void dropTableData(FileIO io, TableMetadata metadata) {
+    // Reads and deletes are done using 
Tasks.foreach(...).suppressFailureWhenFinished to complete
+    // as much of the delete work as possible and avoid orphaned data or 
manifest files.
+
+    Set<String> manifestListsToDelete = Sets.newHashSet();
+    Set<ManifestFile> manifestsToDelete = Sets.newHashSet();
+    for (Snapshot snapshot : metadata.snapshots()) {
+      // add all manifests to the delete set because both data and delete 
files should be removed
+      Iterables.addAll(manifestsToDelete, snapshot.allManifests());
+      // add the manifest list to the delete set, if present
+      if (snapshot.manifestListLocation() != null) {
+        manifestListsToDelete.add(snapshot.manifestListLocation());
+      }
+    }
+
+    LOG.info("Manifests to delete: {}", Joiner.on(", 
").join(manifestsToDelete));
+
+    // run all of the deletes
+
+    boolean gcEnabled = PropertyUtil.propertyAsBoolean(metadata.properties(), 
GC_ENABLED, GC_ENABLED_DEFAULT);
+
+    if (gcEnabled) {
+      // delete data files only if we are sure this won't corrupt other tables
+      deleteFiles(io, manifestsToDelete);
+    }
+
+    Tasks.foreach(Iterables.transform(manifestsToDelete, ManifestFile::path))
+        .noRetry().suppressFailureWhenFinished()
+        .onFailure((manifest, exc) -> LOG.warn("Delete failed for manifest: 
{}", manifest, exc))
+        .run(io::deleteFile);
+
+    Tasks.foreach(manifestListsToDelete)
+        .noRetry().suppressFailureWhenFinished()
+        .onFailure((list, exc) -> LOG.warn("Delete failed for manifest list: 
{}", list, exc))
+        .run(io::deleteFile);
+
+    Tasks.foreach(metadata.metadataFileLocation())
+        .noRetry().suppressFailureWhenFinished()
+        .onFailure((list, exc) -> LOG.warn("Delete failed for metadata file: 
{}", list, exc))
+        .run(io::deleteFile);
+  }
+
+  @SuppressWarnings("DangerousStringInternUsage")
+  private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
+    // keep track of deleted files in a map that can be cleaned up when memory 
runs low
+    Map<String, Boolean> deletedFiles = new MapMaker()
+        .concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE)
+        .weakKeys()
+        .makeMap();
+
+    Tasks.foreach(allManifests)
+        .noRetry().suppressFailureWhenFinished()
+        .executeWith(ThreadPools.getWorkerPool())
+        .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this 
may cause orphaned data files", exc))
+        .run(manifest -> {
+          try (ManifestReader<?> reader = ManifestFiles.open(manifest, io)) {
+            for (ManifestEntry<?> entry : reader.entries()) {
+              // intern the file path because the weak key map uses identity 
(==) instead of equals
+              String path = entry.file().path().toString().intern();
+              Boolean alreadyDeleted = deletedFiles.putIfAbsent(path, true);
+              if (alreadyDeleted == null || !alreadyDeleted) {
+                try {
+                  io.deleteFile(path);
+                } catch (RuntimeException e) {
+                  // this may happen if the map of deleted files gets cleaned 
up by gc
+                  LOG.warn("Delete failed for data file: {}", path, e);
+                }
+              }
+            }
+          } catch (IOException e) {
+            throw new RuntimeIOException(e, "Failed to read manifest file: 
%s", manifest.path());
+          }
+        });
+  }
+
+  /**
+   * Load a custom catalog implementation.
+   * <p>
+   * The catalog must have a no-arg constructor.
+   * If the class implements {@link Configurable},
+   * a Hadoop config will be passed using {@link 
Configurable#setConf(Configuration)}.
+   * {@link Catalog#initialize(String catalogName, Map options)} is called to 
complete the initialization.
+   *
+   * @param impl catalog implementation full class name
+   * @param catalogName catalog name
+   * @param properties catalog properties
+   * @param hadoopConf hadoop configuration if needed
+   * @return initialized catalog object
+   * @throws IllegalArgumentException if no-arg constructor not found or error 
during initialization
+   */
+  public static Catalog loadCatalog(
+      String impl,
+      String catalogName,
+      Map<String, String> properties,
+      Configuration hadoopConf) {
+    Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog, impl 
class name is null");
+    DynConstructors.Ctor<Catalog> ctor;
+    try {
+      ctor = DynConstructors.builder(Catalog.class).impl(impl).buildChecked();
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot initialize Catalog implementation %s: %s", impl, 
e.getMessage()), e);
+    }
+
+    Catalog catalog;
+    try {
+      catalog = ctor.newInstance();
+
+    } catch (ClassCastException e) {
+      throw new IllegalArgumentException(
+          String.format("Cannot initialize Catalog, %s does not implement 
Catalog.", impl), e);
+    }
+
+    if (catalog instanceof Configurable) {
+      ((Configurable) catalog).setConf(hadoopConf);
+    }
+
+    catalog.initialize(catalogName, properties);
+    return catalog;
+  }
+
+  /**
+   * Build an Iceberg {@link Catalog} based on a map of catalog properties and 
optional Hadoop configuration.
+   * <p>
+   * This method examines both the {@link #ICEBERG_CATALOG_TYPE} and {@link 
CatalogProperties#CATALOG_IMPL} properties
+   * to determine the catalog implementation to load.
+   * If nothing is specified for both properties, Hive catalog will be loaded 
by default.
+   *
+   * @param name catalog name
+   * @param options catalog properties
+   * @param conf Hadoop configuration
+   * @return initialized catalog
+   */
+  public static Catalog buildIcebergCatalog(String name, Map<String, String> 
options, Configuration conf) {
+    String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);
+    if (catalogImpl == null) {
+      String catalogType = PropertyUtil.propertyAsString(options, 
ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
+      switch (catalogType.toLowerCase(Locale.ENGLISH)) {
+        case ICEBERG_CATALOG_TYPE_HIVE:
+          catalogImpl = ICEBERG_CATALOG_HIVE;
+          break;
+        case ICEBERG_CATALOG_TYPE_HADOOP:
+          catalogImpl = ICEBERG_CATALOG_HADOOP;
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown catalog type: " + 
catalogType);
+      }
+    } else {
+      String catalogType = options.get(ICEBERG_CATALOG_TYPE);
+      Preconditions.checkArgument(catalogType == null,
+          "Cannot create catalog %s, both type and catalog-impl are set: 
type=%s, catalog-impl=%s",
+          name, catalogType, catalogImpl);
+    }
+
+    return CatalogUtil.loadCatalog(catalogImpl, name, options, conf);
+  }
+
+  /**
+   * Load a custom {@link FileIO} implementation.
+   * <p>
+   * The implementation must have a no-arg constructor.
+   * If the class implements {@link Configurable},
+   * a Hadoop config will be passed using {@link 
Configurable#setConf(Configuration)}.
+   * {@link FileIO#initialize(Map properties)} is called to complete the 
initialization.
+   *
+   * @param impl full class name of a custom FileIO implementation
+   * @param hadoopConf hadoop configuration
+   * @return FileIO class
+   * @throws IllegalArgumentException if class path not found or
+   *  right constructor not found or
+   *  the loaded class cannot be casted to the given interface type
+   */
+  public static FileIO loadFileIO(
+      String impl,
+      Map<String, String> properties,
+      Configuration hadoopConf) {
+    LOG.info("Loading custom FileIO implementation: {}", impl);
+    DynConstructors.Ctor<FileIO> ctor;
+    try {
+      ctor = DynConstructors.builder(FileIO.class).impl(impl).buildChecked();
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot initialize FileIO, missing no-arg constructor: %s", impl), 
e);
+    }
+
+    FileIO fileIO;
+    try {
+      fileIO = ctor.newInstance();
+    } catch (ClassCastException e) {
+      throw new IllegalArgumentException(
+          String.format("Cannot initialize FileIO, %s does not implement 
FileIO.", impl), e);
+    }
+
+    if (fileIO instanceof Configurable) {
+      ((Configurable) fileIO).setConf(hadoopConf);
+    }
+
+    fileIO.initialize(properties);
+    return fileIO;
+  }
+}
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPool.java 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPool.java
similarity index 90%
rename from 
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPool.java
rename to 
iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPool.java
index 7f32cf9..117939a 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPool.java
+++ 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPool.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.hive;
+package org.apache.iceberg;
 
 public interface ClientPool<C, E extends Exception> {
   interface Action<R, C, E extends Exception> {
@@ -25,4 +25,6 @@ public interface ClientPool<C, E extends Exception> {
   }
 
   <R> R run(Action<R, C, E> action) throws E, InterruptedException;
+
+  <R> R run(Action<R, C, E> action, boolean retry) throws E, 
InterruptedException;
 }
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
similarity index 90%
rename from 
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
rename to 
iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
index d1a44d3..18f7afa 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
+++ 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
@@ -17,12 +17,11 @@
  * under the License.
  */
 
-package org.apache.iceberg.hive;
+package org.apache.iceberg;
 
 import java.io.Closeable;
 import java.util.ArrayDeque;
 import java.util.Deque;
-import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,25 +33,32 @@ public abstract class ClientPoolImpl<C, E extends 
Exception> implements Closeabl
   private final Deque<C> clients;
   private final Class<? extends E> reconnectExc;
   private final Object signal = new Object();
+  private final boolean retryByDefault;
   private volatile int currentSize;
   private boolean closed;
 
-  ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc) {
+  public ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc, boolean 
retryByDefault) {
     this.poolSize = poolSize;
     this.reconnectExc = reconnectExc;
     this.clients = new ArrayDeque<>(poolSize);
     this.currentSize = 0;
     this.closed = false;
+    this.retryByDefault = retryByDefault;
   }
 
   @Override
   public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
+    return run(action, retryByDefault);
+  }
+
+  @Override
+  public <R> R run(Action<R, C, E> action, boolean retry) throws E, 
InterruptedException {
     C client = get();
     try {
       return action.run(client);
 
     } catch (Exception exc) {
-      if (isConnectionException(exc)) {
+      if (retry && isConnectionException(exc)) {
         try {
           client = reconnect(client);
         } catch (Exception ignored) {
@@ -138,8 +144,7 @@ public abstract class ClientPoolImpl<C, E extends 
Exception> implements Closeabl
     }
   }
 
-  @VisibleForTesting
-  int poolSize() {
+  public int poolSize() {
     return poolSize;
   }
 }
diff --git 
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
new file mode 100644
index 0000000..ba5edf5
--- /dev/null
+++ 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ClientPoolImpl;
+
+class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
+
+  private final String dbUrl;
+  private final Map<String, String> properties;
+
+  JdbcClientPool(String dbUrl, Map<String, String> props) {
+    
this(Integer.parseInt(props.getOrDefault(CatalogProperties.CLIENT_POOL_SIZE,
+        String.valueOf(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT))), dbUrl, 
props);
+  }
+
+  JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
+    super(poolSize, SQLNonTransientConnectionException.class, true);
+    properties = props;
+    this.dbUrl = dbUrl;
+  }
+
+  @Override
+  protected Connection newClient() {
+    try {
+      Properties dbProps = JdbcUtil.filterAndRemovePrefix(properties, 
JdbcCatalog.PROPERTY_PREFIX);
+      return DriverManager.getConnection(dbUrl, dbProps);
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to connect: %s", dbUrl);
+    }
+  }
+
+  @Override
+  protected Connection reconnect(Connection client) {
+    close(client);
+    return newClient();
+  }
+
+  @Override
+  protected void close(Connection client) {
+    try {
+      client.close();
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to close connection");
+    }
+  }
+}
diff --git 
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
new file mode 100644
index 0000000..0b55839
--- /dev/null
+++ 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.jdbc;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+final class JdbcUtil {
+  protected static final String CATALOG_TABLE_NAME = "iceberg_tables";
+  protected static final String CATALOG_NAME = "catalog_name";
+  protected static final String TABLE_NAMESPACE = "table_namespace";
+  protected static final String TABLE_NAME = "table_name";
+  protected static final String METADATA_LOCATION = "metadata_location";
+  protected static final String PREVIOUS_METADATA_LOCATION = 
"previous_metadata_location";
+  public static final String DO_COMMIT_SQL = "UPDATE " + CATALOG_TABLE_NAME +
+      " SET " + METADATA_LOCATION + " = ? , " + PREVIOUS_METADATA_LOCATION + " 
= ? " +
+      " WHERE " + CATALOG_NAME + " = ? AND " +
+      TABLE_NAMESPACE + " = ? AND " +
+      TABLE_NAME + " = ? AND " +
+      METADATA_LOCATION + " = ?";
+  protected static final String CREATE_CATALOG_TABLE =
+      "CREATE TABLE " + CATALOG_TABLE_NAME +
+          "(" +
+          CATALOG_NAME + " VARCHAR(255) NOT NULL," +
+          TABLE_NAMESPACE + " VARCHAR(255) NOT NULL," +
+          TABLE_NAME + " VARCHAR(255) NOT NULL," +
+          METADATA_LOCATION + " VARCHAR(5500)," +
+          PREVIOUS_METADATA_LOCATION + " VARCHAR(5500)," +
+          "PRIMARY KEY (" + CATALOG_NAME + ", " + TABLE_NAMESPACE + ", " + 
TABLE_NAME + ")" +
+          ")";
+  protected static final String GET_TABLE_SQL = "SELECT * FROM " + 
CATALOG_TABLE_NAME +
+      " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " + 
TABLE_NAME + " = ? ";
+  protected static final String LIST_TABLES_SQL = "SELECT * FROM " + 
CATALOG_TABLE_NAME +
+      " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ?";
+  protected static final String RENAME_TABLE_SQL = "UPDATE " + 
CATALOG_TABLE_NAME +
+      " SET " + TABLE_NAMESPACE + " = ? , " + TABLE_NAME + " = ? " +
+      " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " + 
TABLE_NAME + " = ? ";
+  protected static final String DROP_TABLE_SQL = "DELETE FROM " + 
CATALOG_TABLE_NAME +
+      " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " + 
TABLE_NAME + " = ? ";
+  protected static final String GET_NAMESPACE_SQL = "SELECT " + 
TABLE_NAMESPACE + " FROM " + CATALOG_TABLE_NAME +
+      " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " LIKE ? 
LIMIT 1";
+  protected static final String LIST_NAMESPACES_SQL = "SELECT DISTINCT " + 
TABLE_NAMESPACE +
+      " FROM " + CATALOG_TABLE_NAME +
+      " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " LIKE ?";
+  protected static final String DO_COMMIT_CREATE_TABLE_SQL = "INSERT INTO " + 
CATALOG_TABLE_NAME +
+      " (" + CATALOG_NAME + ", " + TABLE_NAMESPACE + ", " + TABLE_NAME +
+      ", " + METADATA_LOCATION + ", " + PREVIOUS_METADATA_LOCATION + ") " +
+      " VALUES (?,?,?,?,null)";
+  private static final Joiner JOINER_DOT = Joiner.on('.');
+  private static final Splitter SPLITTER_DOT = Splitter.on('.');
+
+  private JdbcUtil() {
+  }
+
+  public static Namespace stringToNamespace(String namespace) {
+    Preconditions.checkArgument(namespace != null, "Invalid namespace %s", 
namespace);
+    return Namespace.of(Iterables.toArray(SPLITTER_DOT.split(namespace), 
String.class));
+  }
+
+  public static String namespaceToString(Namespace namespace) {
+    return JOINER_DOT.join(namespace.levels());
+  }
+
+  public static TableIdentifier stringToTableIdentifier(String tableNamespace, 
String tableName) {
+    return TableIdentifier.of(JdbcUtil.stringToNamespace(tableNamespace), 
tableName);
+  }
+
+  public static Properties filterAndRemovePrefix(Map<String, String> 
properties,
+                                                 String prefix) {
+    Properties result = new Properties();
+    properties.forEach((key, value) -> {
+      if (key.startsWith(prefix)) {
+        result.put(key.substring(prefix.length()), value);
+      }
+    });
+
+    return result;
+  }
+}
diff --git a/iceberg/pom.xml b/iceberg/pom.xml
index 4a2f2f5..547baaf 100644
--- a/iceberg/pom.xml
+++ b/iceberg/pom.xml
@@ -38,6 +38,8 @@
         <spotless.maven.plugin.version>2.5.0</spotless.maven.plugin.version>
         
<google.errorprone.javac.version>9+181-r4173-1</google.errorprone.javac.version>
         <google.errorprone.version>2.5.1</google.errorprone.version>
+        <assertj.version>3.19.0</assertj.version>
+        <junit.jupiter.version>5.7.2</junit.jupiter.version>
     </properties>
 
     <modules>
@@ -190,6 +192,17 @@
                 <classifier>tests</classifier>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.assertj</groupId>
+                <artifactId>assertj-core</artifactId>
+                <version>${assertj.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.junit.jupiter</groupId>
+                <artifactId>junit-jupiter-api</artifactId>
+                <version>${junit.jupiter.version}</version>
+            </dependency>
+
 
             <dependency>
                 <groupId>com.esotericsoftware</groupId>

Reply via email to