This is an automated email from the ASF dual-hosted git repository. szita pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 2e916a3ec26 HIVE-26576: Alter table calls on Iceberg tables can inadvertently change metadata_location (#3612) (Adam Szita, reviewed by Ayush Saxena) 2e916a3ec26 is described below commit 2e916a3ec26a6d12653c89ad445fed91e1ba0cde Author: Adam Szita <40628386+sz...@users.noreply.github.com> AuthorDate: Sun Oct 2 18:50:26 2022 +0200 HIVE-26576: Alter table calls on Iceberg tables can inadvertently change metadata_location (#3612) (Adam Szita, reviewed by Ayush Saxena) --- .../org/apache/iceberg/hive/HiveCommitLock.java | 225 +++++++++++++++++++++ .../apache/iceberg/hive/HiveTableOperations.java | 154 ++------------ .../apache/iceberg/hive/TestHiveCommitLocks.java | 4 +- .../org/apache/iceberg/hive/TestHiveCommits.java | 33 +-- .../iceberg/mr/hive/HiveIcebergMetaHook.java | 59 +++++- .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 14 ++ .../apache/iceberg/mr/hive/IcebergTableUtil.java | 37 +++- .../hive/TestHiveIcebergStorageHandlerNoScan.java | 80 ++++++++ .../org/apache/iceberg/mr/hive/TestHiveShell.java | 4 + .../alter_acid_table_to_iceberg_failure.q.out | 4 +- .../alter_managed_table_to_iceberg_failure.q.out | 4 +- .../ql/ddl/table/AbstractAlterTableOperation.java | 9 +- .../hive/ql/metadata/HiveStorageHandler.java | 12 ++ 13 files changed, 464 insertions(+), 175 deletions(-) diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java new file mode 100644 index 00000000000..63d5d40d19f --- /dev/null +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCommitLock.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockLevel; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Tasks; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HiveCommitLock { + + private static final Logger LOG = LoggerFactory.getLogger(HiveCommitLock.class); + + private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms"; + private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms"; + private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms"; + private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = "iceberg.hive.table-level-lock-evict-ms"; + private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes + private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds + private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds + + private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10); + + private static Cache<String, ReentrantLock> commitLockCache; + + private static synchronized void initTableLevelLockCache(long evictionTimeout) { + if (commitLockCache == null) { + commitLockCache = Caffeine.newBuilder() + .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS) + .build(); + } + } + + private final String fullName; + private final String databaseName; + private final String tableName; + private final ClientPool<IMetaStoreClient, TException> metaClients; + + private final long lockAcquireTimeout; + private final long lockCheckMinWaitTime; + private final long lockCheckMaxWaitTime; + + private Optional<Long> hmsLockId = Optional.empty(); + private Optional<ReentrantLock> jvmLock = Optional.empty(); + + public HiveCommitLock(Configuration conf, ClientPool<IMetaStoreClient, TException> metaClients, + String catalogName, String databaseName, String tableName) { + this.metaClients = metaClients; + this.databaseName = databaseName; + this.tableName = tableName; + this.fullName = catalogName + "." + databaseName + "." + tableName; + + this.lockAcquireTimeout = + conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT); + this.lockCheckMinWaitTime = + conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT); + this.lockCheckMaxWaitTime = + conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT); + long tableLevelLockCacheEvictionTimeout = + conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT); + initTableLevelLockCache(tableLevelLockCacheEvictionTimeout); + } + + public void acquire() throws UnknownHostException, TException, InterruptedException { + // getting a process-level lock per table to avoid concurrent commit attempts to the same table from the same + // JVM process, which would result in unnecessary and costly HMS lock acquisition requests + acquireJvmLock(); + acquireLockFromHms(); + } + + public void release() { + releaseHmsLock(); + releaseJvmLock(); + } + + // TODO add lock heart beating for cases where default lock timeout is too low. + private void acquireLockFromHms() throws UnknownHostException, TException, InterruptedException { + if (hmsLockId.isPresent()) { + throw new IllegalArgumentException(String.format("HMS lock ID=%s already acquired for table %s.%s", + hmsLockId.get(), databaseName, tableName)); + } + final LockComponent lockComponent = new LockComponent(LockType.EXCL_WRITE, LockLevel.TABLE, databaseName); + lockComponent.setTablename(tableName); + final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent), + System.getProperty("user.name"), + InetAddress.getLocalHost().getHostName()); + LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest)); + AtomicReference<LockState> state = new AtomicReference<>(lockResponse.getState()); + long lockId = lockResponse.getLockid(); + this.hmsLockId = Optional.of(lockId); + + final long start = System.currentTimeMillis(); + long duration = 0; + boolean timeout = false; + + try { + if (state.get().equals(LockState.WAITING)) { + // Retry count is the typical "upper bound of retries" for Tasks.run() function. In fact, the maximum number of + // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use timeout as the + // upper bound of retries. So it is just reasonable to set a large retry count. However, if we set + // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow into Integer.MIN_VALUE. Hence, + // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it doesn't hit any boundary issues. + Tasks.foreach(lockId) + .retry(Integer.MAX_VALUE - 100) + .exponentialBackoff( + lockCheckMinWaitTime, + lockCheckMaxWaitTime, + lockAcquireTimeout, + 1.5) + .throwFailureWhenFinished() + .onlyRetryOn(WaitingForHmsLockException.class) + .run(id -> { + try { + LockResponse response = metaClients.run(client -> client.checkLock(id)); + LockState newState = response.getState(); + state.set(newState); + if (newState.equals(LockState.WAITING)) { + throw new WaitingForHmsLockException("Waiting for lock."); + } + } catch (InterruptedException e) { + Thread.interrupted(); // Clear the interrupt status flag + LOG.warn("Interrupted while waiting for lock.", e); + } + }, TException.class); + } + } catch (WaitingForHmsLockException waitingForLockException) { + timeout = true; + duration = System.currentTimeMillis() - start; + } finally { + if (!state.get().equals(LockState.ACQUIRED)) { + releaseHmsLock(); + } + } + + // timeout and do not have lock acquired + if (timeout && !state.get().equals(LockState.ACQUIRED)) { + throw new CommitFailedException("Timed out after %s ms waiting for lock on %s.%s", + duration, databaseName, tableName); + } + + if (!state.get().equals(LockState.ACQUIRED)) { + throw new CommitFailedException("Could not acquire the lock on %s.%s, " + + "lock request ended in state %s", databaseName, tableName, state); + } + } + + private void releaseHmsLock() { + if (hmsLockId.isPresent()) { + try { + metaClients.run(client -> { + client.unlock(hmsLockId.get()); + return null; + }); + hmsLockId = Optional.empty(); + } catch (Exception e) { + LOG.warn("Failed to unlock {}.{}", databaseName, tableName, e); + } + } + } + + private void acquireJvmLock() { + if (jvmLock.isPresent()) { + throw new IllegalStateException(String.format("JVM lock already acquired for table %s", fullName)); + } + jvmLock = Optional.of(commitLockCache.get(fullName, t -> new ReentrantLock(true))); + jvmLock.get().lock(); + } + + private void releaseJvmLock() { + if (jvmLock.isPresent()) { + jvmLock.get().unlock(); + jvmLock = Optional.empty(); + } + } + + public String getDatabaseName() { + return databaseName; + } + + public String getTableName() { + return tableName; + } + + private static class WaitingForHmsLockException extends RuntimeException { + WaitingForHmsLockException(String message) { + super(message); + } + } +} diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 8fdcb6b2e5a..12ade76a685 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -19,9 +19,6 @@ package org.apache.iceberg.hive; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; import java.util.Locale; @@ -29,21 +26,12 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; -import org.apache.hadoop.hive.metastore.api.LockComponent; -import org.apache.hadoop.hive.metastore.api.LockLevel; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.LockState; -import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -67,9 +55,7 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTest import org.apache.iceberg.relocated.com.google.common.collect.BiMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.Tasks; import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -83,33 +69,14 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED; */ public class HiveTableOperations extends BaseMetastoreTableOperations { private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class); - - private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms"; - private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms"; - private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms"; private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = "iceberg.hive.metadata-refresh-max-retries"; - private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = "iceberg.hive.table-level-lock-evict-ms"; - private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes - private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds - private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2; - private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10); private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION = ImmutableBiMap.of( // gc.enabled in Iceberg and external.table.purge in Hive are meant to do the same things but with different names GC_ENABLED, "external.table.purge", TableProperties.PARQUET_COMPRESSION, ParquetOutputFormat.COMPRESSION); - private static Cache<String, ReentrantLock> commitLockCache; - - private static synchronized void initTableLevelLockCache(long evictionTimeout) { - if (commitLockCache == null) { - commitLockCache = Caffeine.newBuilder() - .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS) - .build(); - } - } - /** * Provides key translation where necessary between Iceberg and HMS props. This translation is needed because some * properties control the same behaviour but are named differently in Iceberg and Hive. Therefore changes to these @@ -127,19 +94,11 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { return ICEBERG_TO_HMS_TRANSLATION.inverse().getOrDefault(hmsProp, hmsProp); } - private static class WaitingForLockException extends RuntimeException { - WaitingForLockException(String message) { - super(message); - } - } - private final String fullName; + private final String catalogName; private final String database; private final String tableName; private final Configuration conf; - private final long lockAcquireTimeout; - private final long lockCheckMinWaitTime; - private final long lockCheckMaxWaitTime; private final int metadataRefreshMaxRetries; private final FileIO fileIO; private final ClientPool<IMetaStoreClient, TException> metaClients; @@ -150,19 +109,11 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { this.metaClients = metaClients; this.fileIO = fileIO; this.fullName = catalogName + "." + database + "." + table; + this.catalogName = catalogName; this.database = database; this.tableName = table; - this.lockAcquireTimeout = - conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT); - this.lockCheckMinWaitTime = - conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT); - this.lockCheckMaxWaitTime = - conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT); this.metadataRefreshMaxRetries = conf.getInt(HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES, HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT); - long tableLevelLockCacheEvictionTimeout = - conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT); - initTableLevelLockCache(tableLevelLockCacheEvictionTimeout); } @Override @@ -211,14 +162,11 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { CommitStatus commitStatus = CommitStatus.FAILURE; boolean updateHiveTable = false; - Optional<Long> lockId = Optional.empty(); - // getting a process-level lock per table to avoid concurrent commit attempts to the same table from the same - // JVM process, which would result in unnecessary and costly HMS lock acquisition requests - ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true)); - tableLevelMutex.lock(); + HiveCommitLock commitLock = null; + try { - lockId = Optional.of(acquireLock()); - // TODO add lock heart beating for cases where default lock timeout is too low. + commitLock = createLock(); + commitLock.acquire(); Table tbl = loadHmsTable(); @@ -299,7 +247,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { throw new RuntimeException("Interrupted during commit", e); } finally { - cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex); + cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, commitLock); } } @@ -416,74 +364,12 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { } @VisibleForTesting - long acquireLock() throws UnknownHostException, TException, InterruptedException { - final LockComponent lockComponent = new LockComponent(LockType.EXCL_WRITE, LockLevel.TABLE, database); - lockComponent.setTablename(tableName); - final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent), - System.getProperty("user.name"), - InetAddress.getLocalHost().getHostName()); - LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest)); - AtomicReference<LockState> state = new AtomicReference<>(lockResponse.getState()); - long lockId = lockResponse.getLockid(); - - final long start = System.currentTimeMillis(); - long duration = 0; - boolean timeout = false; - - try { - if (state.get().equals(LockState.WAITING)) { - // Retry count is the typical "upper bound of retries" for Tasks.run() function. In fact, the maximum number of - // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use timeout as the - // upper bound of retries. So it is just reasonable to set a large retry count. However, if we set - // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow into Integer.MIN_VALUE. Hence, - // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it doesn't hit any boundary issues. - Tasks.foreach(lockId) - .retry(Integer.MAX_VALUE - 100) - .exponentialBackoff( - lockCheckMinWaitTime, - lockCheckMaxWaitTime, - lockAcquireTimeout, - 1.5) - .throwFailureWhenFinished() - .onlyRetryOn(WaitingForLockException.class) - .run(id -> { - try { - LockResponse response = metaClients.run(client -> client.checkLock(id)); - LockState newState = response.getState(); - state.set(newState); - if (newState.equals(LockState.WAITING)) { - throw new WaitingForLockException("Waiting for lock."); - } - } catch (InterruptedException e) { - Thread.interrupted(); // Clear the interrupt status flag - LOG.warn("Interrupted while waiting for lock.", e); - } - }, TException.class); - } - } catch (WaitingForLockException waitingForLockException) { - timeout = true; - duration = System.currentTimeMillis() - start; - } finally { - if (!state.get().equals(LockState.ACQUIRED)) { - unlock(Optional.of(lockId)); - } - } - - // timeout and do not have lock acquired - if (timeout && !state.get().equals(LockState.ACQUIRED)) { - throw new CommitFailedException("Timed out after %s ms waiting for lock on %s.%s", - duration, database, tableName); - } - - if (!state.get().equals(LockState.ACQUIRED)) { - throw new CommitFailedException("Could not acquire the lock on %s.%s, " + - "lock request ended in state %s", database, tableName, state); - } - return lockId; + HiveCommitLock createLock() throws UnknownHostException, TException, InterruptedException { + return new HiveCommitLock(conf, metaClients, catalogName, database, tableName); } - private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation, Optional<Long> lockId, - ReentrantLock tableLevelMutex) { + private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation, + HiveCommitLock lock) { try { if (commitStatus == CommitStatus.FAILURE) { // If we are sure the commit failed, clean up the uncommitted metadata file @@ -493,29 +379,21 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { LOG.error("Fail to cleanup metadata file at {}", metadataLocation, e); throw e; } finally { - unlock(lockId); - tableLevelMutex.unlock(); + doUnlock(lock); } } - private void unlock(Optional<Long> lockId) { - if (lockId.isPresent()) { + @VisibleForTesting + void doUnlock(HiveCommitLock lock) { + if (lock != null) { try { - doUnlock(lockId.get()); + lock.release(); } catch (Exception e) { LOG.warn("Failed to unlock {}.{}", database, tableName, e); } } } - @VisibleForTesting - void doUnlock(long lockId) throws TException, InterruptedException { - metaClients.run(client -> { - client.unlock(lockId); - return null; - }); - } - static void validateTableIsIceberg(Table table, String fullName) { String tableType = table.getParameters().get(TABLE_TYPE_PROP); NoSuchIcebergTableException.check(tableType != null && tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE), diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java index 3b4bb159590..293dd5010cd 100644 --- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java +++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java @@ -132,7 +132,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest { @Test public void testLockAcquisitionAtFirstTime() throws TException, InterruptedException { doReturn(acquiredLockResponse).when(spyClient).lock(any()); - doNothing().when(spyOps).doUnlock(eq(dummyLockId)); + doNothing().when(spyClient).unlock(eq(dummyLockId)); spyOps.doCommit(metadataV2, metadataV1); @@ -150,7 +150,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest { .doReturn(acquiredLockResponse) .when(spyClient) .checkLock(eq(dummyLockId)); - doNothing().when(spyOps).doUnlock(eq(dummyLockId)); + doNothing().when(spyClient).unlock(eq(dummyLockId)); spyOps.doCommit(metadataV2, metadataV1); diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java index e22374b6e97..1afe98d81b7 100644 --- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java +++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java @@ -21,7 +21,7 @@ package org.apache.iceberg.hive; import java.io.File; import java.net.UnknownHostException; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; @@ -35,7 +35,6 @@ import org.apache.iceberg.types.Types; import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Test; -import org.mockito.ArgumentCaptor; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; @@ -47,7 +46,7 @@ import static org.mockito.Mockito.when; public class TestHiveCommits extends HiveTableBaseTest { @Test - public void testSuppressUnlockExceptions() throws TException, InterruptedException { + public void testSuppressUnlockExceptions() throws TException, InterruptedException, UnknownHostException { Table table = catalog.loadTable(TABLE_IDENTIFIER); HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations) table).operations(); @@ -65,13 +64,21 @@ public class TestHiveCommits extends HiveTableBaseTest { HiveTableOperations spyOps = spy(ops); - ArgumentCaptor<Long> lockId = ArgumentCaptor.forClass(Long.class); - doThrow(new RuntimeException()).when(spyOps).doUnlock(lockId.capture()); + AtomicReference<HiveCommitLock> lockRef = new AtomicReference<>(); + + when(spyOps.createLock()).thenAnswer(i -> { + HiveCommitLock lock = (HiveCommitLock) i.callRealMethod(); + lockRef.set(lock); + return lock; + } + ); try { spyOps.commit(metadataV2, metadataV1); + HiveCommitLock spyLock = spy(lockRef.get()); + doThrow(new RuntimeException()).when(spyLock).release(); } finally { - ops.doUnlock(lockId.getValue()); + ops.doUnlock(lockRef.get()); } ops.refresh(); @@ -258,13 +265,13 @@ public class TestHiveCommits extends HiveTableBaseTest { HiveTableOperations spyOps = spy(ops); - AtomicLong lockId = new AtomicLong(); - doAnswer(i -> { - lockId.set(ops.acquireLock()); - return lockId.get(); - }).when(spyOps).acquireLock(); + AtomicReference<HiveCommitLock> lock = new AtomicReference<>(); + doAnswer(l -> { + lock.set(ops.createLock()); + return lock.get(); + }).when(spyOps).createLock(); - concurrentCommitAndThrowException(ops, spyOps, table, lockId); + concurrentCommitAndThrowException(ops, spyOps, table, lock); /* This commit and our concurrent commit should succeed even though this commit throws an exception @@ -306,7 +313,7 @@ public class TestHiveCommits extends HiveTableBaseTest { } private void concurrentCommitAndThrowException(HiveTableOperations realOperations, HiveTableOperations spyOperations, - Table table, AtomicLong lockId) + Table table, AtomicReference<HiveCommitLock> lockId) throws TException, InterruptedException { // Simulate a communication error after a successful commit doAnswer(i -> { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 067057def75..c99f6747409 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -65,7 +65,6 @@ import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DeleteFiles; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; @@ -81,6 +80,8 @@ import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hive.CachedClientPool; +import org.apache.iceberg.hive.HiveCommitLock; import org.apache.iceberg.hive.HiveSchemaUtil; import org.apache.iceberg.hive.HiveTableOperations; import org.apache.iceberg.io.FileIO; @@ -95,6 +96,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.util.Pair; import org.apache.thrift.TException; @@ -126,6 +128,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook { Lists.newArrayList(org.apache.commons.lang3.tuple.Pair.of(1, new byte[0])); static final String MIGRATED_TO_ICEBERG = "MIGRATED_TO_ICEBERG"; static final String ORC_FILES_ONLY = "iceberg.orc.files.only"; + static final String MANUAL_ICEBERG_METADATA_LOCATION_CHANGE = "MANUAL_ICEBERG_METADATA_LOCATION_CHANGE"; private final Configuration conf; private Table icebergTable = null; @@ -140,6 +143,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook { private Transaction transaction; private AlterTableType currentAlterTableOp; private boolean createHMSTableInHook = false; + private HiveCommitLock commitLock; private enum FileFormat { ORC("orc"), PARQUET("parquet"), AVRO("avro"); @@ -175,7 +179,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook { // If not using HiveCatalog check for existing table try { - this.icebergTable = IcebergTableUtil.getTable(conf, catalogProperties); + this.icebergTable = IcebergTableUtil.getTable(conf, catalogProperties, true); Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA) == null, "Iceberg table already created - can not use provided schema"); @@ -299,8 +303,24 @@ public class HiveIcebergMetaHook implements HiveMetaHook { throws MetaException { catalogProperties = getCatalogProperties(hmsTable); setupAlterOperationType(hmsTable, context); + if (commitLock == null) { + commitLock = new HiveCommitLock(conf, new CachedClientPool(conf, Maps.fromProperties(catalogProperties)), + catalogProperties.getProperty(Catalogs.NAME), hmsTable.getDbName(), hmsTable.getTableName()); + } + + try { + commitLock.acquire(); + doPreAlterTable(hmsTable, context); + } catch (Exception e) { + commitLock.release(); + throw new MetaException(StringUtils.stringifyException(e)); + } + } + + private void doPreAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context) + throws MetaException { try { - icebergTable = IcebergTableUtil.getTable(conf, catalogProperties); + icebergTable = IcebergTableUtil.getTable(conf, catalogProperties, true); } catch (NoSuchTableException nte) { context.getProperties().put(MIGRATE_HIVE_TO_ICEBERG, "true"); // If the iceberg table does not exist, then this is an ALTER command aimed at migrating the table to iceberg @@ -322,8 +342,8 @@ public class HiveIcebergMetaHook implements HiveMetaHook { // If there are partition keys specified remove them from the HMS table and add them to the column list try { Hive db = SessionState.get().getHiveDb(); - preAlterTableProperties.partitionSpecProxy = db.getMSC().listPartitionSpecs(hmsTable.getCatName(), - hmsTable.getDbName(), hmsTable.getTableName(), Integer.MAX_VALUE); + preAlterTableProperties.partitionSpecProxy = db.getMSC().listPartitionSpecs( + hmsTable.getCatName(), hmsTable.getDbName(), hmsTable.getTableName(), Integer.MAX_VALUE); if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) { db.dropPartitions(hmsTable.getDbName(), hmsTable.getTableName(), EMPTY_FILTER, DROP_OPTIONS); @@ -345,8 +365,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook { sd.setInputFormat(HiveIcebergInputFormat.class.getCanonicalName()); sd.setOutputFormat(HiveIcebergOutputFormat.class.getCanonicalName()); - sd.setSerdeInfo(new SerDeInfo("icebergSerde", HiveIcebergSerDe.class.getCanonicalName(), - Collections.emptyMap())); + sd.setSerdeInfo(new SerDeInfo("icebergSerde", HiveIcebergSerDe.class.getCanonicalName(), Collections.emptyMap())); setCommonHmsTablePropertiesForIceberg(hmsTable); // set an additional table prop to designate that this table has been migrated to Iceberg, i.e. // all or some of its data files have not been written out using the Iceberg writer, and therefore those data @@ -367,8 +386,8 @@ public class HiveIcebergMetaHook implements HiveMetaHook { // that users can change data types or reorder columns too with this alter op type, so its name is misleading..) assertNotMigratedTable(hmsTable.getParameters(), "CHANGE COLUMN"); handleChangeColumn(hmsTable); - } else if (AlterTableType.ADDPROPS.equals(currentAlterTableOp)) { - assertNotCrossTableMetadataLocationChange(hmsTable.getParameters()); + } else { + assertNotCrossTableMetadataLocationChange(hmsTable.getParameters(), context); } // Migration case is already handled above, in case of migration we don't have all the properties set till this @@ -385,7 +404,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook { * the current metadata uuid and the new metadata uuid matches. * @param tblParams hms table properties, must be non-null */ - private void assertNotCrossTableMetadataLocationChange(Map<String, String> tblParams) { + private void assertNotCrossTableMetadataLocationChange(Map<String, String> tblParams, EnvironmentContext context) { if (tblParams.containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)) { Preconditions.checkArgument(icebergTable != null, "Cannot perform table migration to Iceberg and setting the snapshot location in one step. " + @@ -400,6 +419,17 @@ public class HiveIcebergMetaHook implements HiveMetaHook { icebergTable.name(), newMetadataLocation) ); } + if (!currentMetadata.metadataFileLocation().equals(newMetadataLocation) && + !context.getProperties().containsKey(MANUAL_ICEBERG_METADATA_LOCATION_CHANGE)) { + // If we got here then this is an alter table operation where the table to be changed had an Iceberg commit + // meanwhile. The base metadata locations differ, while we know that this wasn't an intentional, manual + // metadata_location set by a user. To protect the interim commit we need to refresh the metadata file location. + tblParams.put(BaseMetastoreTableOperations.METADATA_LOCATION_PROP, currentMetadata.metadataFileLocation()); + LOG.warn("Detected an alter table operation attempting to do alterations on an Iceberg table with a stale " + + "metadata_location. Considered base metadata_location: {}, Actual metadata_location: {}. Will override " + + "this request with the refreshed metadata_location in order to preserve the concurrent commit.", + newMetadataLocation, currentMetadata.metadataFileLocation()); + } } } @@ -475,6 +505,10 @@ public class HiveIcebergMetaHook implements HiveMetaHook { @Override public void commitAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context) throws MetaException { + if (commitLock == null) { + throw new IllegalStateException("Hive commit lock should already be set"); + } + commitLock.release(); if (isTableMigration) { catalogProperties = getCatalogProperties(hmsTable); catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(preAlterTableProperties.schema)); @@ -507,6 +541,10 @@ public class HiveIcebergMetaHook implements HiveMetaHook { @Override public void rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context) { + if (commitLock == null) { + throw new IllegalStateException("Hive commit lock should already be set"); + } + commitLock.release(); if (Boolean.parseBoolean(context.getProperties().getOrDefault(MIGRATE_HIVE_TO_ICEBERG, "false"))) { LOG.debug("Initiating rollback for table {} at location {}", hmsTable.getTableName(), hmsTable.getSd().getLocation()); @@ -866,4 +904,5 @@ public class HiveIcebergMetaHook implements HiveMetaHook { private List<FieldSchema> partitionKeys; private PartitionSpecProxy partitionSpecProxy; } + } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index a050b0456a6..08443211851 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -42,11 +42,14 @@ import org.apache.hadoop.hive.common.type.SnapshotContext; import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.Context.Operation; +import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc; import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; +import org.apache.hadoop.hive.ql.ddl.table.misc.properties.AlterTableSetPropertiesDesc; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -82,6 +85,7 @@ import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.JobContextImpl; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.BaseTable; import org.apache.iceberg.FileFormat; import org.apache.iceberg.ManifestFile; @@ -999,4 +1003,14 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H } return new SnapshotContext(current.snapshotId()); } + + @Override + public void prepareAlterTableEnvironmentContext(AbstractAlterTableDesc alterTableDesc, + EnvironmentContext environmentContext) { + if (alterTableDesc instanceof AlterTableSetPropertiesDesc && + alterTableDesc.getProps().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)) { + // signal manual iceberg metadata location updated by user + environmentContext.putToProperties(HiveIcebergMetaHook.MANUAL_ICEBERG_METADATA_LOCATION_CHANGE, "true"); + } + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 344834ec62e..60a9158ecab 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -21,6 +21,7 @@ package org.apache.iceberg.mr.hive; import java.util.List; import java.util.Properties; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.QueryState; @@ -52,9 +53,11 @@ public class IcebergTableUtil { * hmsTable. It then calls {@link IcebergTableUtil#getTable(Configuration, Properties)} with these properties. * @param configuration a Hadoop configuration * @param hmsTable the HMS table + * @param skipCache if set to true there won't be an attempt to retrieve the table from SessionState * @return the Iceberg table */ - static Table getTable(Configuration configuration, org.apache.hadoop.hive.metastore.api.Table hmsTable) { + static Table getTable(Configuration configuration, org.apache.hadoop.hive.metastore.api.Table hmsTable, + boolean skipCache) { Properties properties = new Properties(); properties.setProperty(Catalogs.NAME, TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()).toString()); properties.setProperty(Catalogs.LOCATION, hmsTable.getSd().getLocation()); @@ -63,7 +66,11 @@ public class IcebergTableUtil { properties.setProperty(k, v); return v; }); - return getTable(configuration, properties); + return getTable(configuration, properties, skipCache); + } + + static Table getTable(Configuration configuration, org.apache.hadoop.hive.metastore.api.Table hmsTable) { + return getTable(configuration, hmsTable, false); } /** @@ -72,9 +79,10 @@ public class IcebergTableUtil { * therefore we claim it through the Catalogs API and then store it in query state. * @param configuration a Hadoop configuration * @param properties controlling properties + * @param skipCache if set to true there won't be an attempt to retrieve the table from SessionState * @return an Iceberg table */ - static Table getTable(Configuration configuration, Properties properties) { + static Table getTable(Configuration configuration, Properties properties, boolean skipCache) { String metaTable = properties.getProperty("metaTable"); String tableName = properties.getProperty(Catalogs.NAME); String location = properties.getProperty(Catalogs.LOCATION); @@ -86,14 +94,27 @@ public class IcebergTableUtil { } String tableIdentifier = properties.getProperty(Catalogs.NAME); - return SessionStateUtil.getResource(configuration, tableIdentifier).filter(o -> o instanceof Table) - .map(o -> (Table) o).orElseGet(() -> { - LOG.debug("Iceberg table {} is not found in QueryState. Loading table from configured catalog", - tableIdentifier); + Function<Void, Table> tableLoadFunc = + unused -> { Table tab = Catalogs.loadTable(configuration, properties); SessionStateUtil.addResource(configuration, tableIdentifier, tab); return tab; - }); + }; + + if (skipCache) { + return tableLoadFunc.apply(null); + } else { + return SessionStateUtil.getResource(configuration, tableIdentifier).filter(o -> o instanceof Table) + .map(o -> (Table) o).orElseGet(() -> { + LOG.debug("Iceberg table {} is not found in QueryState. Loading table from configured catalog", + tableIdentifier); + return tableLoadFunc.apply(null); + }); + } + } + + static Table getTable(Configuration configuration, Properties properties) { + return getTable(configuration, properties, false); } /** diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index bfb5997d2bd..97c2eb83ef9 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -29,12 +29,17 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -70,6 +75,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.parquet.hadoop.ParquetOutputFormat; @@ -92,6 +98,10 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.junit.runners.Parameterized.Parameter; import static org.junit.runners.Parameterized.Parameters; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; @RunWith(Parameterized.class) public class TestHiveIcebergStorageHandlerNoScan { @@ -1642,6 +1652,76 @@ public class TestHiveIcebergStorageHandlerNoScan { Assert.assertEquals("SNAPPY", icebergTable.properties().get(TableProperties.PARQUET_COMPRESSION).toUpperCase()); } + @Test + public void testConcurrentIcebergCommitsAndHiveAlterTableCalls() throws Exception { + Assume.assumeTrue(testTableType.equals(TestTables.TestTableType.HIVE_CATALOG)); + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + + testTables.createTable( + shell, + identifier.name(), + HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + SPEC, + FileFormat.PARQUET, + ImmutableList.of()); + + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); + + // Avoid commit retry limits preventing any changes from being committed. + icebergTable.updateProperties().set("commit.retry.num-retries", "1000000").commit(); + + // Swap metastore client used by TestHiveShell with our test stub + IMetaStoreClient realMSC = shell.getSession().getMetaStoreClient(); + IMetaStoreClient spyMSC = spy(realMSC); + shell.getSession().getSessionHive().setMSC(spyMSC); + + // Simulate delay on alter table calls from Hive queries to ensure they will have worked on outdated Table objects + // by the time they intend to persist their changes into HMS + doAnswer(i -> { + Thread.sleep(3000); + return i.callRealMethod(); + }).when(spyMSC).alter_table(any(String.class), any(String.class), any(String.class), + any(org.apache.hadoop.hive.metastore.api.Table.class), any(EnvironmentContext.class), isNull()); + + ExecutorService executorService = + MoreExecutors.getExitingExecutorService( + (ThreadPoolExecutor) Executors.newFixedThreadPool(1)); + + // Concurrent Insert + executorService.submit( + () -> { + try { + testTables.appendIcebergTable( + shell.getHiveConf(), + icebergTable, + FileFormat.PARQUET, + null, + HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + // Concurrent ALTER TABLE properties change + shell.executeStatement("ALTER TABLE default.customers SET TBLPROPERTIES ('dummyKey'='dummyValue')"); + + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.MINUTES); + + // Verify that the insert was effective + Assert.assertEquals(((BaseTable) testTables.loadTable(identifier)).operations().current().metadataFileLocation(), + (long) HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.size(), + shell.executeStatement("select count(*) from customers").get(0)[0] + ); + + // Verify that the alter table call was effective + Assert.assertEquals("dummyValue", shell.metastore().getTable(identifier).getParameters().get("dummyKey")); + + // Should be the 4rd metadata version (1 empty base + 1 commit retry change + 1 insert + 1 property change) + Assert.assertEquals(3, + ((BaseTable) testTables.loadTable(identifier)).operations().current().previousFiles().size()); + } + /** * Checks that the new schema has newintcol and newstring col columns on both HMS and Iceberg sides diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java index f55324e8651..9d17d85ca77 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java @@ -176,6 +176,10 @@ public class TestHiveShell { } } + public HiveSession getSession() { + return session; + } + private HiveConf initializeConf() { HiveConf hiveConf = new HiveConf(); diff --git a/iceberg/iceberg-handler/src/test/results/negative/alter_acid_table_to_iceberg_failure.q.out b/iceberg/iceberg-handler/src/test/results/negative/alter_acid_table_to_iceberg_failure.q.out index 32ac650eee6..1262e0f75c0 100644 --- a/iceberg/iceberg-handler/src/test/results/negative/alter_acid_table_to_iceberg_failure.q.out +++ b/iceberg/iceberg-handler/src/test/results/negative/alter_acid_table_to_iceberg_failure.q.out @@ -14,4 +14,6 @@ PREHOOK: query: alter table tbl_orc set tblproperties ('storage_handler'='org.ap PREHOOK: type: ALTERTABLE_PROPERTIES PREHOOK: Input: default@tbl_orc PREHOOK: Output: default@tbl_orc -FAILED: Execution Error, return code 40013 from org.apache.hadoop.hive.ql.ddl.DDLTask. Unable to alter table. Converting non-external, temporary or transactional hive table to iceberg table is not allowed. +FAILED: Execution Error, return code 40013 from org.apache.hadoop.hive.ql.ddl.DDLTask. Unable to alter table. MetaException(message:Converting non-external, temporary or transactional hive table to iceberg table is not allowed.) +#### A masked pattern was here #### + diff --git a/iceberg/iceberg-handler/src/test/results/negative/alter_managed_table_to_iceberg_failure.q.out b/iceberg/iceberg-handler/src/test/results/negative/alter_managed_table_to_iceberg_failure.q.out index 2e16600d0df..82c80c3eff3 100644 --- a/iceberg/iceberg-handler/src/test/results/negative/alter_managed_table_to_iceberg_failure.q.out +++ b/iceberg/iceberg-handler/src/test/results/negative/alter_managed_table_to_iceberg_failure.q.out @@ -14,4 +14,6 @@ PREHOOK: query: alter table tbl_orc set tblproperties ('storage_handler'='org.ap PREHOOK: type: ALTERTABLE_PROPERTIES PREHOOK: Input: default@tbl_orc PREHOOK: Output: default@tbl_orc -FAILED: Execution Error, return code 40013 from org.apache.hadoop.hive.ql.ddl.DDLTask. Unable to alter table. Converting non-external, temporary or transactional hive table to iceberg table is not allowed. +FAILED: Execution Error, return code 40013 from org.apache.hadoop.hive.ql.ddl.DDLTask. Unable to alter table. MetaException(message:Converting non-external, temporary or transactional hive table to iceberg table is not allowed.) +#### A masked pattern was here #### + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java index 5b2b19bb79e..970cdd5b11a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/AbstractAlterTableOperation.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.ddl.table.constraint.add.AlterTableAddConstrain import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.session.SessionState; @@ -68,7 +69,7 @@ public abstract class AbstractAlterTableOperation<T extends AbstractAlterTableDe // Don't change the table object returned by the metastore, as we'll mess with it's caches. Table table = oldTable.copy(); - environmentContext = initializeEnvironmentContext(desc.getEnvironmentContext()); + environmentContext = initializeEnvironmentContext(oldTable, desc.getEnvironmentContext()); if (partitions == null) { doAlteration(table, null); @@ -106,13 +107,17 @@ public abstract class AbstractAlterTableOperation<T extends AbstractAlterTableDe return partitions; } - private EnvironmentContext initializeEnvironmentContext(EnvironmentContext environmentContext) { + private EnvironmentContext initializeEnvironmentContext(Table table, EnvironmentContext environmentContext) { EnvironmentContext result = environmentContext == null ? new EnvironmentContext() : environmentContext; // do not need update stats in alter table/partition operations if (result.getProperties() == null || result.getProperties().get(StatsSetupConst.DO_NOT_UPDATE_STATS) == null) { result.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); } + HiveStorageHandler storageHandler = table.getStorageHandler(); + if (storageHandler != null) { + storageHandler.prepareAlterTableEnvironmentContext(desc, result); + } return result; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index bdfdf3fde36..b70ab657381 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -29,11 +29,13 @@ import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.common.type.SnapshotContext; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.Context.Operation; +import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc; import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec; @@ -510,4 +512,14 @@ public interface HiveStorageHandler extends Configurable { default SnapshotContext getCurrentSnapshotContext(org.apache.hadoop.hive.ql.metadata.Table table) { return null; } + + /** + * Alter table operations can rely on this to customize the EnvironmentContext to be used during the alter table + * invocation (both on client and server side of HMS) + * @param alterTableDesc the alter table desc (e.g.: AlterTableSetPropertiesDesc) containing the work to do + * @param environmentContext an existing EnvironmentContext created prior, now to be filled/amended + */ + default void prepareAlterTableEnvironmentContext(AbstractAlterTableDesc alterTableDesc, + EnvironmentContext environmentContext) { + } }