This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e916a3ec26 HIVE-26576: Alter table calls on Iceberg tables can 
inadvertently change metadata_location (#3612) (Adam Szita, reviewed by Ayush 
Saxena)
2e916a3ec26 is described below

commit 2e916a3ec26a6d12653c89ad445fed91e1ba0cde
Author: Adam Szita <40628386+sz...@users.noreply.github.com>
AuthorDate: Sun Oct 2 18:50:26 2022 +0200

    HIVE-26576: Alter table calls on Iceberg tables can inadvertently change 
metadata_location (#3612) (Adam Szita, reviewed by Ayush Saxena)
---
 .../org/apache/iceberg/hive/HiveCommitLock.java    | 225 +++++++++++++++++++++
 .../apache/iceberg/hive/HiveTableOperations.java   | 154 ++------------
 .../apache/iceberg/hive/TestHiveCommitLocks.java   |   4 +-
 .../org/apache/iceberg/hive/TestHiveCommits.java   |  33 +--
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       |  59 +++++-
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  14 ++
 .../apache/iceberg/mr/hive/IcebergTableUtil.java   |  37 +++-
 .../hive/TestHiveIcebergStorageHandlerNoScan.java  |  80 ++++++++
 .../org/apache/iceberg/mr/hive/TestHiveShell.java  |   4 +
 .../alter_acid_table_to_iceberg_failure.q.out      |   4 +-
 .../alter_managed_table_to_iceberg_failure.q.out   |   4 +-
 .../ql/ddl/table/AbstractAlterTableOperation.java  |   9 +-
 .../hive/ql/metadata/HiveStorageHandler.java       |  12 ++
 13 files changed, 464 insertions(+), 175 deletions(-)

diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java
new file mode 100644
index 00000000000..63d5d40d19f
--- /dev/null
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveCommitLock {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveCommitLock.class);
+
+  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = 
"iceberg.hive.lock-timeout-ms";
+  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = 
"iceberg.hive.lock-check-min-wait-ms";
+  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = 
"iceberg.hive.lock-check-max-wait-ms";
+  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = 
"iceberg.hive.table-level-lock-evict-ms";
+  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 
1000; // 3 minutes
+  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 
milliseconds
+  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 
5 seconds
+
+  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = 
TimeUnit.MINUTES.toMillis(10);
+
+  private static Cache<String, ReentrantLock> commitLockCache;
+
+  private static synchronized void initTableLevelLockCache(long 
evictionTimeout) {
+    if (commitLockCache == null) {
+      commitLockCache = Caffeine.newBuilder()
+          .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+          .build();
+    }
+  }
+
+  private final String fullName;
+  private final String databaseName;
+  private final String tableName;
+  private final ClientPool<IMetaStoreClient, TException> metaClients;
+
+  private final long lockAcquireTimeout;
+  private final long lockCheckMinWaitTime;
+  private final long lockCheckMaxWaitTime;
+
+  private Optional<Long> hmsLockId = Optional.empty();
+  private Optional<ReentrantLock> jvmLock = Optional.empty();
+
+  public HiveCommitLock(Configuration conf, ClientPool<IMetaStoreClient, 
TException> metaClients,
+      String catalogName, String databaseName, String tableName) {
+    this.metaClients = metaClients;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+    this.fullName = catalogName + "." + databaseName + "." + tableName;
+
+    this.lockAcquireTimeout =
+        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, 
HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+    this.lockCheckMinWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, 
HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+    this.lockCheckMaxWaitTime =
+        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, 
HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+    long tableLevelLockCacheEvictionTimeout =
+        conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, 
HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+  }
+
+  public void acquire() throws UnknownHostException, TException, 
InterruptedException {
+    // getting a process-level lock per table to avoid concurrent commit 
attempts to the same table from the same
+    // JVM process, which would result in unnecessary and costly HMS lock 
acquisition requests
+    acquireJvmLock();
+    acquireLockFromHms();
+  }
+
+  public void release() {
+    releaseHmsLock();
+    releaseJvmLock();
+  }
+
+  // TODO add lock heart beating for cases where default lock timeout is too 
low.
+  private void acquireLockFromHms() throws UnknownHostException, TException, 
InterruptedException {
+    if (hmsLockId.isPresent()) {
+      throw new IllegalArgumentException(String.format("HMS lock ID=%s already 
acquired for table %s.%s",
+          hmsLockId.get(), databaseName, tableName));
+    }
+    final LockComponent lockComponent = new LockComponent(LockType.EXCL_WRITE, 
LockLevel.TABLE, databaseName);
+    lockComponent.setTablename(tableName);
+    final LockRequest lockRequest = new 
LockRequest(Lists.newArrayList(lockComponent),
+        System.getProperty("user.name"),
+        InetAddress.getLocalHost().getHostName());
+    LockResponse lockResponse = metaClients.run(client -> 
client.lock(lockRequest));
+    AtomicReference<LockState> state = new 
AtomicReference<>(lockResponse.getState());
+    long lockId = lockResponse.getLockid();
+    this.hmsLockId = Optional.of(lockId);
+
+    final long start = System.currentTimeMillis();
+    long duration = 0;
+    boolean timeout = false;
+
+    try {
+      if (state.get().equals(LockState.WAITING)) {
+        // Retry count is the typical "upper bound of retries" for Tasks.run() 
function. In fact, the maximum number of
+        // attempts the Tasks.run() would try is `retries + 1`. Here, for 
checking locks, we use timeout as the
+        // upper bound of retries. So it is just reasonable to set a large 
retry count. However, if we set
+        // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow 
into Integer.MIN_VALUE. Hence,
+        // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it 
doesn't hit any boundary issues.
+        Tasks.foreach(lockId)
+            .retry(Integer.MAX_VALUE - 100)
+            .exponentialBackoff(
+                lockCheckMinWaitTime,
+                lockCheckMaxWaitTime,
+                lockAcquireTimeout,
+                1.5)
+            .throwFailureWhenFinished()
+            .onlyRetryOn(WaitingForHmsLockException.class)
+            .run(id -> {
+              try {
+                LockResponse response = metaClients.run(client -> 
client.checkLock(id));
+                LockState newState = response.getState();
+                state.set(newState);
+                if (newState.equals(LockState.WAITING)) {
+                  throw new WaitingForHmsLockException("Waiting for lock.");
+                }
+              } catch (InterruptedException e) {
+                Thread.interrupted(); // Clear the interrupt status flag
+                LOG.warn("Interrupted while waiting for lock.", e);
+              }
+            }, TException.class);
+      }
+    } catch (WaitingForHmsLockException waitingForLockException) {
+      timeout = true;
+      duration = System.currentTimeMillis() - start;
+    } finally {
+      if (!state.get().equals(LockState.ACQUIRED)) {
+        releaseHmsLock();
+      }
+    }
+
+    // timeout and do not have lock acquired
+    if (timeout && !state.get().equals(LockState.ACQUIRED)) {
+      throw new CommitFailedException("Timed out after %s ms waiting for lock 
on %s.%s",
+          duration, databaseName, tableName);
+    }
+
+    if (!state.get().equals(LockState.ACQUIRED)) {
+      throw new CommitFailedException("Could not acquire the lock on %s.%s, " +
+          "lock request ended in state %s", databaseName, tableName, state);
+    }
+  }
+
+  private void releaseHmsLock() {
+    if (hmsLockId.isPresent()) {
+      try {
+        metaClients.run(client -> {
+          client.unlock(hmsLockId.get());
+          return null;
+        });
+        hmsLockId = Optional.empty();
+      } catch (Exception e) {
+        LOG.warn("Failed to unlock {}.{}", databaseName, tableName, e);
+      }
+    }
+  }
+
+  private void acquireJvmLock() {
+    if (jvmLock.isPresent()) {
+      throw new IllegalStateException(String.format("JVM lock already acquired 
for table %s", fullName));
+    }
+    jvmLock = Optional.of(commitLockCache.get(fullName, t -> new 
ReentrantLock(true)));
+    jvmLock.get().lock();
+  }
+
+  private void releaseJvmLock() {
+    if (jvmLock.isPresent()) {
+      jvmLock.get().unlock();
+      jvmLock = Optional.empty();
+    }
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  private static class WaitingForHmsLockException extends RuntimeException {
+    WaitingForHmsLockException(String message) {
+      super(message);
+    }
+  }
+}
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 8fdcb6b2e5a..12ade76a685 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -19,9 +19,6 @@
 
 package org.apache.iceberg.hive;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.Locale;
@@ -29,21 +26,12 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
-import org.apache.hadoop.hive.metastore.api.LockComponent;
-import org.apache.hadoop.hive.metastore.api.LockLevel;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -67,9 +55,7 @@ import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTest
 import org.apache.iceberg.relocated.com.google.common.collect.BiMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.util.Tasks;
 import org.apache.parquet.hadoop.ParquetOutputFormat;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -83,33 +69,14 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED;
  */
 public class HiveTableOperations extends BaseMetastoreTableOperations {
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableOperations.class);
-
-  private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = 
"iceberg.hive.lock-timeout-ms";
-  private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = 
"iceberg.hive.lock-check-min-wait-ms";
-  private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = 
"iceberg.hive.lock-check-max-wait-ms";
   private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = 
"iceberg.hive.metadata-refresh-max-retries";
-  private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = 
"iceberg.hive.table-level-lock-evict-ms";
-  private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 
1000; // 3 minutes
-  private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 
milliseconds
-  private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 
5 seconds
   private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 
2;
-  private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = 
TimeUnit.MINUTES.toMillis(10);
 
   private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION = 
ImmutableBiMap.of(
       // gc.enabled in Iceberg and external.table.purge in Hive are meant to 
do the same things but with different names
       GC_ENABLED, "external.table.purge",
       TableProperties.PARQUET_COMPRESSION, ParquetOutputFormat.COMPRESSION);
 
-  private static Cache<String, ReentrantLock> commitLockCache;
-
-  private static synchronized void initTableLevelLockCache(long 
evictionTimeout) {
-    if (commitLockCache == null) {
-      commitLockCache = Caffeine.newBuilder()
-              .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
-              .build();
-    }
-  }
-
   /**
    * Provides key translation where necessary between Iceberg and HMS props. 
This translation is needed because some
    * properties control the same behaviour but are named differently in 
Iceberg and Hive. Therefore changes to these
@@ -127,19 +94,11 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
     return ICEBERG_TO_HMS_TRANSLATION.inverse().getOrDefault(hmsProp, hmsProp);
   }
 
-  private static class WaitingForLockException extends RuntimeException {
-    WaitingForLockException(String message) {
-      super(message);
-    }
-  }
-
   private final String fullName;
+  private final String catalogName;
   private final String database;
   private final String tableName;
   private final Configuration conf;
-  private final long lockAcquireTimeout;
-  private final long lockCheckMinWaitTime;
-  private final long lockCheckMaxWaitTime;
   private final int metadataRefreshMaxRetries;
   private final FileIO fileIO;
   private final ClientPool<IMetaStoreClient, TException> metaClients;
@@ -150,19 +109,11 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
     this.metaClients = metaClients;
     this.fileIO = fileIO;
     this.fullName = catalogName + "." + database + "." + table;
+    this.catalogName = catalogName;
     this.database = database;
     this.tableName = table;
-    this.lockAcquireTimeout =
-        conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, 
HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
-    this.lockCheckMinWaitTime =
-        conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, 
HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
-    this.lockCheckMaxWaitTime =
-        conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, 
HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
     this.metadataRefreshMaxRetries =
         conf.getInt(HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES, 
HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT);
-    long tableLevelLockCacheEvictionTimeout =
-            conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, 
HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
-    initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
   }
 
   @Override
@@ -211,14 +162,11 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
 
     CommitStatus commitStatus = CommitStatus.FAILURE;
     boolean updateHiveTable = false;
-    Optional<Long> lockId = Optional.empty();
-    // getting a process-level lock per table to avoid concurrent commit 
attempts to the same table from the same
-    // JVM process, which would result in unnecessary and costly HMS lock 
acquisition requests
-    ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new 
ReentrantLock(true));
-    tableLevelMutex.lock();
+    HiveCommitLock commitLock = null;
+
     try {
-      lockId = Optional.of(acquireLock());
-      // TODO add lock heart beating for cases where default lock timeout is 
too low.
+      commitLock = createLock();
+      commitLock.acquire();
 
       Table tbl = loadHmsTable();
 
@@ -299,7 +247,7 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
       throw new RuntimeException("Interrupted during commit", e);
 
     } finally {
-      cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, 
tableLevelMutex);
+      cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, commitLock);
     }
   }
 
@@ -416,74 +364,12 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
   }
 
   @VisibleForTesting
-  long acquireLock() throws UnknownHostException, TException, 
InterruptedException {
-    final LockComponent lockComponent = new LockComponent(LockType.EXCL_WRITE, 
LockLevel.TABLE, database);
-    lockComponent.setTablename(tableName);
-    final LockRequest lockRequest = new 
LockRequest(Lists.newArrayList(lockComponent),
-        System.getProperty("user.name"),
-        InetAddress.getLocalHost().getHostName());
-    LockResponse lockResponse = metaClients.run(client -> 
client.lock(lockRequest));
-    AtomicReference<LockState> state = new 
AtomicReference<>(lockResponse.getState());
-    long lockId = lockResponse.getLockid();
-
-    final long start = System.currentTimeMillis();
-    long duration = 0;
-    boolean timeout = false;
-
-    try {
-      if (state.get().equals(LockState.WAITING)) {
-        // Retry count is the typical "upper bound of retries" for Tasks.run() 
function. In fact, the maximum number of
-        // attempts the Tasks.run() would try is `retries + 1`. Here, for 
checking locks, we use timeout as the
-        // upper bound of retries. So it is just reasonable to set a large 
retry count. However, if we set
-        // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow 
into Integer.MIN_VALUE. Hence,
-        // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it 
doesn't hit any boundary issues.
-        Tasks.foreach(lockId)
-            .retry(Integer.MAX_VALUE - 100)
-            .exponentialBackoff(
-                lockCheckMinWaitTime,
-                lockCheckMaxWaitTime,
-                lockAcquireTimeout,
-                1.5)
-            .throwFailureWhenFinished()
-            .onlyRetryOn(WaitingForLockException.class)
-            .run(id -> {
-              try {
-                LockResponse response = metaClients.run(client -> 
client.checkLock(id));
-                LockState newState = response.getState();
-                state.set(newState);
-                if (newState.equals(LockState.WAITING)) {
-                  throw new WaitingForLockException("Waiting for lock.");
-                }
-              } catch (InterruptedException e) {
-                Thread.interrupted(); // Clear the interrupt status flag
-                LOG.warn("Interrupted while waiting for lock.", e);
-              }
-            }, TException.class);
-      }
-    } catch (WaitingForLockException waitingForLockException) {
-      timeout = true;
-      duration = System.currentTimeMillis() - start;
-    } finally {
-      if (!state.get().equals(LockState.ACQUIRED)) {
-        unlock(Optional.of(lockId));
-      }
-    }
-
-    // timeout and do not have lock acquired
-    if (timeout && !state.get().equals(LockState.ACQUIRED)) {
-      throw new CommitFailedException("Timed out after %s ms waiting for lock 
on %s.%s",
-          duration, database, tableName);
-    }
-
-    if (!state.get().equals(LockState.ACQUIRED)) {
-      throw new CommitFailedException("Could not acquire the lock on %s.%s, " +
-          "lock request ended in state %s", database, tableName, state);
-    }
-    return lockId;
+  HiveCommitLock createLock() throws UnknownHostException, TException, 
InterruptedException {
+    return new HiveCommitLock(conf, metaClients, catalogName, database, 
tableName);
   }
 
-  private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String 
metadataLocation, Optional<Long> lockId,
-      ReentrantLock tableLevelMutex) {
+  private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String 
metadataLocation,
+      HiveCommitLock lock) {
     try {
       if (commitStatus == CommitStatus.FAILURE) {
         // If we are sure the commit failed, clean up the uncommitted metadata 
file
@@ -493,29 +379,21 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
       LOG.error("Fail to cleanup metadata file at {}", metadataLocation, e);
       throw e;
     } finally {
-      unlock(lockId);
-      tableLevelMutex.unlock();
+      doUnlock(lock);
     }
   }
 
-  private void unlock(Optional<Long> lockId) {
-    if (lockId.isPresent()) {
+  @VisibleForTesting
+  void doUnlock(HiveCommitLock lock) {
+    if (lock != null) {
       try {
-        doUnlock(lockId.get());
+        lock.release();
       } catch (Exception e) {
         LOG.warn("Failed to unlock {}.{}", database, tableName, e);
       }
     }
   }
 
-  @VisibleForTesting
-  void doUnlock(long lockId) throws TException, InterruptedException {
-    metaClients.run(client -> {
-      client.unlock(lockId);
-      return null;
-    });
-  }
-
   static void validateTableIsIceberg(Table table, String fullName) {
     String tableType = table.getParameters().get(TABLE_TYPE_PROP);
     NoSuchIcebergTableException.check(tableType != null && 
tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE),
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
index 3b4bb159590..293dd5010cd 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
@@ -132,7 +132,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
   @Test
   public void testLockAcquisitionAtFirstTime() throws TException, 
InterruptedException {
     doReturn(acquiredLockResponse).when(spyClient).lock(any());
-    doNothing().when(spyOps).doUnlock(eq(dummyLockId));
+    doNothing().when(spyClient).unlock(eq(dummyLockId));
 
     spyOps.doCommit(metadataV2, metadataV1);
 
@@ -150,7 +150,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
         .doReturn(acquiredLockResponse)
         .when(spyClient)
         .checkLock(eq(dummyLockId));
-    doNothing().when(spyOps).doUnlock(eq(dummyLockId));
+    doNothing().when(spyClient).unlock(eq(dummyLockId));
 
     spyOps.doCommit(metadataV2, metadataV1);
 
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
index e22374b6e97..1afe98d81b7 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
@@ -21,7 +21,7 @@ package org.apache.iceberg.hive;
 
 import java.io.File;
 import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.PartitionSpec;
@@ -35,7 +35,6 @@ import org.apache.iceberg.types.Types;
 import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
@@ -47,7 +46,7 @@ import static org.mockito.Mockito.when;
 public class TestHiveCommits extends HiveTableBaseTest {
 
   @Test
-  public void testSuppressUnlockExceptions() throws TException, 
InterruptedException {
+  public void testSuppressUnlockExceptions() throws TException, 
InterruptedException, UnknownHostException {
     Table table = catalog.loadTable(TABLE_IDENTIFIER);
     HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations) 
table).operations();
 
@@ -65,13 +64,21 @@ public class TestHiveCommits extends HiveTableBaseTest {
 
     HiveTableOperations spyOps = spy(ops);
 
-    ArgumentCaptor<Long> lockId = ArgumentCaptor.forClass(Long.class);
-    doThrow(new RuntimeException()).when(spyOps).doUnlock(lockId.capture());
+    AtomicReference<HiveCommitLock> lockRef = new AtomicReference<>();
+
+    when(spyOps.createLock()).thenAnswer(i -> {
+          HiveCommitLock lock = (HiveCommitLock) i.callRealMethod();
+          lockRef.set(lock);
+          return lock;
+        }
+    );
 
     try {
       spyOps.commit(metadataV2, metadataV1);
+      HiveCommitLock spyLock = spy(lockRef.get());
+      doThrow(new RuntimeException()).when(spyLock).release();
     } finally {
-      ops.doUnlock(lockId.getValue());
+      ops.doUnlock(lockRef.get());
     }
 
     ops.refresh();
@@ -258,13 +265,13 @@ public class TestHiveCommits extends HiveTableBaseTest {
 
     HiveTableOperations spyOps = spy(ops);
 
-    AtomicLong lockId = new AtomicLong();
-    doAnswer(i -> {
-      lockId.set(ops.acquireLock());
-      return lockId.get();
-    }).when(spyOps).acquireLock();
+    AtomicReference<HiveCommitLock> lock = new AtomicReference<>();
+    doAnswer(l -> {
+      lock.set(ops.createLock());
+      return lock.get();
+    }).when(spyOps).createLock();
 
-    concurrentCommitAndThrowException(ops, spyOps, table, lockId);
+    concurrentCommitAndThrowException(ops, spyOps, table, lock);
 
     /*
     This commit and our concurrent commit should succeed even though this 
commit throws an exception
@@ -306,7 +313,7 @@ public class TestHiveCommits extends HiveTableBaseTest {
   }
 
   private void concurrentCommitAndThrowException(HiveTableOperations 
realOperations, HiveTableOperations spyOperations,
-                                                 Table table, AtomicLong 
lockId)
+                                                 Table table, 
AtomicReference<HiveCommitLock> lockId)
       throws TException, InterruptedException {
     // Simulate a communication error after a successful commit
     doAnswer(i -> {
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 067057def75..c99f6747409 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -65,7 +65,6 @@ import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.DeleteFiles;
-import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
@@ -81,6 +80,8 @@ import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hive.CachedClientPool;
+import org.apache.iceberg.hive.HiveCommitLock;
 import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.hive.HiveTableOperations;
 import org.apache.iceberg.io.FileIO;
@@ -95,6 +96,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.util.Pair;
 import org.apache.thrift.TException;
@@ -126,6 +128,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
       Lists.newArrayList(org.apache.commons.lang3.tuple.Pair.of(1, new 
byte[0]));
   static final String MIGRATED_TO_ICEBERG = "MIGRATED_TO_ICEBERG";
   static final String ORC_FILES_ONLY = "iceberg.orc.files.only";
+  static final String MANUAL_ICEBERG_METADATA_LOCATION_CHANGE = 
"MANUAL_ICEBERG_METADATA_LOCATION_CHANGE";
 
   private final Configuration conf;
   private Table icebergTable = null;
@@ -140,6 +143,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
   private Transaction transaction;
   private AlterTableType currentAlterTableOp;
   private boolean createHMSTableInHook = false;
+  private HiveCommitLock commitLock;
 
   private enum FileFormat {
     ORC("orc"), PARQUET("parquet"), AVRO("avro");
@@ -175,7 +179,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
       // If not using HiveCatalog check for existing table
       try {
 
-        this.icebergTable = IcebergTableUtil.getTable(conf, catalogProperties);
+        this.icebergTable = IcebergTableUtil.getTable(conf, catalogProperties, 
true);
 
         
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
             "Iceberg table already created - can not use provided schema");
@@ -299,8 +303,24 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
       throws MetaException {
     catalogProperties = getCatalogProperties(hmsTable);
     setupAlterOperationType(hmsTable, context);
+    if (commitLock == null) {
+      commitLock = new HiveCommitLock(conf, new CachedClientPool(conf, 
Maps.fromProperties(catalogProperties)),
+          catalogProperties.getProperty(Catalogs.NAME), hmsTable.getDbName(), 
hmsTable.getTableName());
+    }
+
+    try {
+      commitLock.acquire();
+      doPreAlterTable(hmsTable, context);
+    } catch (Exception e) {
+      commitLock.release();
+      throw new MetaException(StringUtils.stringifyException(e));
+    }
+  }
+
+  private void doPreAlterTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, EnvironmentContext context)
+      throws MetaException {
     try {
-      icebergTable = IcebergTableUtil.getTable(conf, catalogProperties);
+      icebergTable = IcebergTableUtil.getTable(conf, catalogProperties, true);
     } catch (NoSuchTableException nte) {
       context.getProperties().put(MIGRATE_HIVE_TO_ICEBERG, "true");
       // If the iceberg table does not exist, then this is an ALTER command 
aimed at migrating the table to iceberg
@@ -322,8 +342,8 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
       // If there are partition keys specified remove them from the HMS table 
and add them to the column list
       try {
         Hive db = SessionState.get().getHiveDb();
-        preAlterTableProperties.partitionSpecProxy = 
db.getMSC().listPartitionSpecs(hmsTable.getCatName(),
-            hmsTable.getDbName(), hmsTable.getTableName(), Integer.MAX_VALUE);
+        preAlterTableProperties.partitionSpecProxy = 
db.getMSC().listPartitionSpecs(
+            hmsTable.getCatName(), hmsTable.getDbName(), 
hmsTable.getTableName(), Integer.MAX_VALUE);
         if (hmsTable.isSetPartitionKeys() && 
!hmsTable.getPartitionKeys().isEmpty()) {
           db.dropPartitions(hmsTable.getDbName(), hmsTable.getTableName(), 
EMPTY_FILTER, DROP_OPTIONS);
 
@@ -345,8 +365,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
 
       sd.setInputFormat(HiveIcebergInputFormat.class.getCanonicalName());
       sd.setOutputFormat(HiveIcebergOutputFormat.class.getCanonicalName());
-      sd.setSerdeInfo(new SerDeInfo("icebergSerde", 
HiveIcebergSerDe.class.getCanonicalName(),
-          Collections.emptyMap()));
+      sd.setSerdeInfo(new SerDeInfo("icebergSerde", 
HiveIcebergSerDe.class.getCanonicalName(), Collections.emptyMap()));
       setCommonHmsTablePropertiesForIceberg(hmsTable);
       // set an additional table prop to designate that this table has been 
migrated to Iceberg, i.e.
       // all or some of its data files have not been written out using the 
Iceberg writer, and therefore those data
@@ -367,8 +386,8 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
       // that users can change data types or reorder columns too with this 
alter op type, so its name is misleading..)
       assertNotMigratedTable(hmsTable.getParameters(), "CHANGE COLUMN");
       handleChangeColumn(hmsTable);
-    } else if (AlterTableType.ADDPROPS.equals(currentAlterTableOp)) {
-      assertNotCrossTableMetadataLocationChange(hmsTable.getParameters());
+    } else {
+      assertNotCrossTableMetadataLocationChange(hmsTable.getParameters(), 
context);
     }
 
     // Migration case is already handled above, in case of migration we don't 
have all the properties set till this
@@ -385,7 +404,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
    * the current metadata uuid and the new metadata uuid matches.
    * @param tblParams hms table properties, must be non-null
    */
-  private void assertNotCrossTableMetadataLocationChange(Map<String, String> 
tblParams) {
+  private void assertNotCrossTableMetadataLocationChange(Map<String, String> 
tblParams, EnvironmentContext context) {
     if 
(tblParams.containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)) {
       Preconditions.checkArgument(icebergTable != null,
           "Cannot perform table migration to Iceberg and setting the snapshot 
location in one step. " +
@@ -400,6 +419,17 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
                 icebergTable.name(), newMetadataLocation)
         );
       }
+      if (!currentMetadata.metadataFileLocation().equals(newMetadataLocation) 
&&
+          
!context.getProperties().containsKey(MANUAL_ICEBERG_METADATA_LOCATION_CHANGE)) {
+        // If we got here then this is an alter table operation where the 
table to be changed had an Iceberg commit
+        // meanwhile. The base metadata locations differ, while we know that 
this wasn't an intentional, manual
+        // metadata_location set by a user. To protect the interim commit we 
need to refresh the metadata file location.
+        tblParams.put(BaseMetastoreTableOperations.METADATA_LOCATION_PROP, 
currentMetadata.metadataFileLocation());
+        LOG.warn("Detected an alter table operation attempting to do 
alterations on an Iceberg table with a stale " +
+            "metadata_location. Considered base metadata_location: {}, Actual 
metadata_location: {}. Will override " +
+            "this request with the refreshed metadata_location in order to 
preserve the concurrent commit.",
+            newMetadataLocation, currentMetadata.metadataFileLocation());
+      }
     }
   }
 
@@ -475,6 +505,10 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
   @Override
   public void commitAlterTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, EnvironmentContext context)
       throws MetaException {
+    if (commitLock == null) {
+      throw new IllegalStateException("Hive commit lock should already be 
set");
+    }
+    commitLock.release();
     if (isTableMigration) {
       catalogProperties = getCatalogProperties(hmsTable);
       catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(preAlterTableProperties.schema));
@@ -507,6 +541,10 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
 
   @Override
   public void rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, EnvironmentContext context) {
+    if (commitLock == null) {
+      throw new IllegalStateException("Hive commit lock should already be 
set");
+    }
+    commitLock.release();
     if 
(Boolean.parseBoolean(context.getProperties().getOrDefault(MIGRATE_HIVE_TO_ICEBERG,
 "false"))) {
       LOG.debug("Initiating rollback for table {} at location {}",
           hmsTable.getTableName(), hmsTable.getSd().getLocation());
@@ -866,4 +904,5 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
     private List<FieldSchema> partitionKeys;
     private PartitionSpecProxy partitionSpecProxy;
   }
+
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index a050b0456a6..08443211851 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -42,11 +42,14 @@ import org.apache.hadoop.hive.common.type.SnapshotContext;
 import org.apache.hadoop.hive.common.type.Timestamp;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Context.Operation;
+import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc;
 import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
+import 
org.apache.hadoop.hive.ql.ddl.table.misc.properties.AlterTableSetPropertiesDesc;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -82,6 +85,7 @@ import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.JobContextImpl;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
@@ -999,4 +1003,14 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
     }
     return new SnapshotContext(current.snapshotId());
   }
+
+  @Override
+  public void prepareAlterTableEnvironmentContext(AbstractAlterTableDesc 
alterTableDesc,
+      EnvironmentContext environmentContext) {
+    if (alterTableDesc instanceof AlterTableSetPropertiesDesc &&
+        
alterTableDesc.getProps().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP))
 {
+      // signal manual iceberg metadata location updated by user
+      
environmentContext.putToProperties(HiveIcebergMetaHook.MANUAL_ICEBERG_METADATA_LOCATION_CHANGE,
 "true");
+    }
+  }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 344834ec62e..60a9158ecab 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.mr.hive;
 
 import java.util.List;
 import java.util.Properties;
+import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.QueryState;
@@ -52,9 +53,11 @@ public class IcebergTableUtil {
    * hmsTable. It then calls {@link IcebergTableUtil#getTable(Configuration, 
Properties)} with these properties.
    * @param configuration a Hadoop configuration
    * @param hmsTable the HMS table
+   * @param skipCache if set to true there won't be an attempt to retrieve the 
table from SessionState
    * @return the Iceberg table
    */
-  static Table getTable(Configuration configuration, 
org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+  static Table getTable(Configuration configuration, 
org.apache.hadoop.hive.metastore.api.Table hmsTable,
+      boolean skipCache) {
     Properties properties = new Properties();
     properties.setProperty(Catalogs.NAME, 
TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()).toString());
     properties.setProperty(Catalogs.LOCATION, hmsTable.getSd().getLocation());
@@ -63,7 +66,11 @@ public class IcebergTableUtil {
           properties.setProperty(k, v);
           return v;
         });
-    return getTable(configuration, properties);
+    return getTable(configuration, properties, skipCache);
+  }
+
+  static Table getTable(Configuration configuration, 
org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+    return getTable(configuration, hmsTable, false);
   }
 
   /**
@@ -72,9 +79,10 @@ public class IcebergTableUtil {
    * therefore we claim it through the Catalogs API and then store it in query 
state.
    * @param configuration a Hadoop configuration
    * @param properties controlling properties
+   * @param skipCache if set to true there won't be an attempt to retrieve the 
table from SessionState
    * @return an Iceberg table
    */
-  static Table getTable(Configuration configuration, Properties properties) {
+  static Table getTable(Configuration configuration, Properties properties, 
boolean skipCache) {
     String metaTable = properties.getProperty("metaTable");
     String tableName = properties.getProperty(Catalogs.NAME);
     String location = properties.getProperty(Catalogs.LOCATION);
@@ -86,14 +94,27 @@ public class IcebergTableUtil {
     }
 
     String tableIdentifier = properties.getProperty(Catalogs.NAME);
-    return SessionStateUtil.getResource(configuration, 
tableIdentifier).filter(o -> o instanceof Table)
-        .map(o -> (Table) o).orElseGet(() -> {
-          LOG.debug("Iceberg table {} is not found in QueryState. Loading 
table from configured catalog",
-              tableIdentifier);
+    Function<Void, Table> tableLoadFunc =
+        unused -> {
           Table tab = Catalogs.loadTable(configuration, properties);
           SessionStateUtil.addResource(configuration, tableIdentifier, tab);
           return tab;
-        });
+        };
+
+    if (skipCache) {
+      return tableLoadFunc.apply(null);
+    } else {
+      return SessionStateUtil.getResource(configuration, 
tableIdentifier).filter(o -> o instanceof Table)
+          .map(o -> (Table) o).orElseGet(() -> {
+            LOG.debug("Iceberg table {} is not found in QueryState. Loading 
table from configured catalog",
+                tableIdentifier);
+            return tableLoadFunc.apply(null);
+          });
+    }
+  }
+
+  static Table getTable(Configuration configuration, Properties properties) {
+    return getTable(configuration, properties, false);
   }
 
   /**
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index bfb5997d2bd..97c2eb83ef9 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -29,12 +29,17 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -70,6 +75,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.hadoop.ParquetOutputFormat;
@@ -92,6 +98,10 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.junit.runners.Parameterized.Parameter;
 import static org.junit.runners.Parameterized.Parameters;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 
 @RunWith(Parameterized.class)
 public class TestHiveIcebergStorageHandlerNoScan {
@@ -1642,6 +1652,76 @@ public class TestHiveIcebergStorageHandlerNoScan {
     Assert.assertEquals("SNAPPY", 
icebergTable.properties().get(TableProperties.PARQUET_COMPRESSION).toUpperCase());
   }
 
+  @Test
+  public void testConcurrentIcebergCommitsAndHiveAlterTableCalls() throws 
Exception {
+    
Assume.assumeTrue(testTableType.equals(TestTables.TestTableType.HIVE_CATALOG));
+    TableIdentifier identifier = TableIdentifier.of("default", "customers");
+
+    testTables.createTable(
+        shell,
+        identifier.name(),
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        SPEC,
+        FileFormat.PARQUET,
+        ImmutableList.of());
+
+    org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
+
+    // Avoid commit retry limits preventing any changes from being committed.
+    icebergTable.updateProperties().set("commit.retry.num-retries", 
"1000000").commit();
+
+    // Swap metastore client used by TestHiveShell with our test stub
+    IMetaStoreClient realMSC = shell.getSession().getMetaStoreClient();
+    IMetaStoreClient spyMSC = spy(realMSC);
+    shell.getSession().getSessionHive().setMSC(spyMSC);
+
+    // Simulate delay on alter table calls from Hive queries to ensure they 
will have worked on outdated Table objects
+    // by the time they intend to persist their changes into HMS
+    doAnswer(i -> {
+      Thread.sleep(3000);
+      return i.callRealMethod();
+    }).when(spyMSC).alter_table(any(String.class), any(String.class), 
any(String.class),
+        any(org.apache.hadoop.hive.metastore.api.Table.class), 
any(EnvironmentContext.class), isNull());
+
+    ExecutorService executorService =
+        MoreExecutors.getExitingExecutorService(
+            (ThreadPoolExecutor) Executors.newFixedThreadPool(1));
+
+    // Concurrent Insert
+    executorService.submit(
+        () -> {
+          try {
+            testTables.appendIcebergTable(
+                shell.getHiveConf(),
+                icebergTable,
+                FileFormat.PARQUET,
+                null,
+                HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+    // Concurrent ALTER TABLE properties change
+    shell.executeStatement("ALTER TABLE default.customers SET TBLPROPERTIES 
('dummyKey'='dummyValue')");
+
+    executorService.shutdown();
+    executorService.awaitTermination(1, TimeUnit.MINUTES);
+
+    // Verify that the insert was effective
+    Assert.assertEquals(((BaseTable) 
testTables.loadTable(identifier)).operations().current().metadataFileLocation(),
+        (long) HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.size(),
+        shell.executeStatement("select count(*) from customers").get(0)[0]
+    );
+
+    // Verify that the alter table call was effective
+    Assert.assertEquals("dummyValue", 
shell.metastore().getTable(identifier).getParameters().get("dummyKey"));
+
+    // Should be the 4rd metadata version (1 empty base + 1 commit retry 
change + 1 insert + 1 property change)
+    Assert.assertEquals(3,
+        ((BaseTable) 
testTables.loadTable(identifier)).operations().current().previousFiles().size());
+  }
+
 
   /**
    * Checks that the new schema has newintcol and newstring col columns on 
both HMS and Iceberg sides
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
index f55324e8651..9d17d85ca77 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java
@@ -176,6 +176,10 @@ public class TestHiveShell {
     }
   }
 
+  public HiveSession getSession() {
+    return session;
+  }
+
   private HiveConf initializeConf() {
     HiveConf hiveConf = new HiveConf();
 
diff --git 
a/iceberg/iceberg-handler/src/test/results/negative/alter_acid_table_to_iceberg_failure.q.out
 
b/iceberg/iceberg-handler/src/test/results/negative/alter_acid_table_to_iceberg_failure.q.out
index 32ac650eee6..1262e0f75c0 100644
--- 
a/iceberg/iceberg-handler/src/test/results/negative/alter_acid_table_to_iceberg_failure.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/negative/alter_acid_table_to_iceberg_failure.q.out
@@ -14,4 +14,6 @@ PREHOOK: query: alter table tbl_orc set tblproperties 
('storage_handler'='org.ap
 PREHOOK: type: ALTERTABLE_PROPERTIES
 PREHOOK: Input: default@tbl_orc
 PREHOOK: Output: default@tbl_orc
-FAILED: Execution Error, return code 40013 from 
org.apache.hadoop.hive.ql.ddl.DDLTask. Unable to alter table. Converting 
non-external, temporary or transactional hive table to iceberg table is not 
allowed.
+FAILED: Execution Error, return code 40013 from 
org.apache.hadoop.hive.ql.ddl.DDLTask. Unable to alter table. 
MetaException(message:Converting non-external, temporary or transactional hive 
table to iceberg table is not allowed.)
+#### A masked pattern was here ####
+
diff --git 
a/iceberg/iceberg-handler/src/test/results/negative/alter_managed_table_to_iceberg_failure.q.out
 
b/iceberg/iceberg-handler/src/test/results/negative/alter_managed_table_to_iceberg_failure.q.out
index 2e16600d0df..82c80c3eff3 100644
--- 
a/iceberg/iceberg-handler/src/test/results/negative/alter_managed_table_to_iceberg_failure.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/negative/alter_managed_table_to_iceberg_failure.q.out
@@ -14,4 +14,6 @@ PREHOOK: query: alter table tbl_orc set tblproperties 
('storage_handler'='org.ap
 PREHOOK: type: ALTERTABLE_PROPERTIES
 PREHOOK: Input: default@tbl_orc
 PREHOOK: Output: default@tbl_orc
-FAILED: Execution Error, return code 40013 from 
org.apache.hadoop.hive.ql.ddl.DDLTask. Unable to alter table. Converting 
non-external, temporary or transactional hive table to iceberg table is not 
allowed.
+FAILED: Execution Error, return code 40013 from 
org.apache.hadoop.hive.ql.ddl.DDLTask. Unable to alter table. 
MetaException(message:Converting non-external, temporary or transactional hive 
table to iceberg table is not allowed.)
+#### A masked pattern was here ####
+
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java
index 5b2b19bb79e..970cdd5b11a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.hive.ql.ddl.table.constraint.add.AlterTableAddConstrain
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -68,7 +69,7 @@ public abstract class AbstractAlterTableOperation<T extends 
AbstractAlterTableDe
     // Don't change the table object returned by the metastore, as we'll mess 
with it's caches.
     Table table = oldTable.copy();
 
-    environmentContext = 
initializeEnvironmentContext(desc.getEnvironmentContext());
+    environmentContext = initializeEnvironmentContext(oldTable, 
desc.getEnvironmentContext());
 
     if (partitions == null) {
       doAlteration(table, null);
@@ -106,13 +107,17 @@ public abstract class AbstractAlterTableOperation<T 
extends AbstractAlterTableDe
     return partitions;
   }
 
-  private EnvironmentContext initializeEnvironmentContext(EnvironmentContext 
environmentContext) {
+  private EnvironmentContext initializeEnvironmentContext(Table table, 
EnvironmentContext environmentContext) {
     EnvironmentContext result = environmentContext == null ? new 
EnvironmentContext() : environmentContext;
     // do not need update stats in alter table/partition operations
     if (result.getProperties() == null ||
         result.getProperties().get(StatsSetupConst.DO_NOT_UPDATE_STATS) == 
null) {
       result.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, 
StatsSetupConst.TRUE);
     }
+    HiveStorageHandler storageHandler = table.getStorageHandler();
+    if (storageHandler != null) {
+      storageHandler.prepareAlterTableEnvironmentContext(desc, result);
+    }
     return result;
   }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index bdfdf3fde36..b70ab657381 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -29,11 +29,13 @@ import 
org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hadoop.hive.common.type.SnapshotContext;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.Context.Operation;
+import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc;
 import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
@@ -510,4 +512,14 @@ public interface HiveStorageHandler extends Configurable {
   default SnapshotContext 
getCurrentSnapshotContext(org.apache.hadoop.hive.ql.metadata.Table table) {
     return null;
   }
+
+  /**
+   * Alter table operations can rely on this to customize the 
EnvironmentContext to be used during the alter table
+   * invocation (both on client and server side of HMS)
+   * @param alterTableDesc the alter table desc (e.g.: 
AlterTableSetPropertiesDesc) containing the work to do
+   * @param environmentContext an existing EnvironmentContext created prior, 
now to be filled/amended
+   */
+  default void prepareAlterTableEnvironmentContext(AbstractAlterTableDesc 
alterTableDesc,
+      EnvironmentContext environmentContext) {
+  }
 }

Reply via email to