This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new fede493d59 Hive: Lock hardening (#6451)
fede493d59 is described below
commit fede493d59f17ff2bfc0744b296d90bd36130386
Author: pvary <[email protected]>
AuthorDate: Wed Jan 11 07:43:07 2023 +0100
Hive: Lock hardening (#6451)
---
docs/configuration.md | 27 ++-
.../org/apache/iceberg/hive/HiveSchemaUtil.java | 2 +-
.../apache/iceberg/hive/HiveTableOperations.java | 242 ++++++++++++++++++---
.../java/org/apache/iceberg/hive/HiveVersion.java | 65 ++++++
.../org/apache/iceberg/hive/MetastoreUtil.java | 22 --
.../apache/iceberg/hive/TestHiveCommitLocks.java | 188 +++++++++++++++-
.../org/apache/iceberg/hive/TestHiveCommits.java | 17 +-
.../iceberg/mr/hive/HiveIcebergInputFormat.java | 6 +-
.../objectinspector/IcebergObjectInspector.java | 8 +-
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 8 +-
.../apache/iceberg/mr/hive/TestDeserializer.java | 4 +-
.../hive/TestHiveIcebergStorageHandlerNoScan.java | 4 +-
.../TestHiveIcebergStorageHandlerWithEngine.java | 4 +-
.../org/apache/iceberg/mr/hive/TestTables.java | 6 +-
.../TestIcebergObjectInspector.java | 8 +-
15 files changed, 512 insertions(+), 99 deletions(-)
diff --git a/docs/configuration.md b/docs/configuration.md
index 0cdcc0ac95..1f240d9ab4 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -159,15 +159,24 @@ Here are the catalog properties related to locking. They
are used by some catalo
## Hadoop configuration
The following properties from the Hadoop configuration are used by the Hive
Metastore connector.
-
-| Property | Default | Description
|
-| ------------------------------------- | ---------------- |
----------------------------------------------------------------------------------
|
-| iceberg.hive.client-pool-size | 5 | The size of the
Hive client pool when tracking tables in HMS |
-| iceberg.hive.lock-timeout-ms | 180000 (3 min) | Maximum time in
milliseconds to acquire a lock |
-| iceberg.hive.lock-check-min-wait-ms | 50 | Minimum time in
milliseconds to check back on the status of lock acquisition |
-| iceberg.hive.lock-check-max-wait-ms | 5000 | Maximum time in
milliseconds to check back on the status of lock acquisition |
-
-Note: `iceberg.hive.lock-check-max-wait-ms` should be less than the
[transaction
timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout)
+The HMS table locking is a 2-step process:
+1. Lock Creation: Create lock in HMS and queue for acquisition
+2. Lock Check: Check if lock successfully acquired
+
+| Property | Default | Description
|
+|-------------------------------------------|-----------------|------------------------------------------------------------------------------|
+| iceberg.hive.client-pool-size | 5 | The size of
the Hive client pool when tracking tables in HMS |
+| iceberg.hive.lock-creation-timeout-ms | 180000 (3 min) | Maximum time
in milliseconds to create a lock in the HMS |
+| iceberg.hive.lock-creation-min-wait-ms | 50 | Minimum time
in milliseconds between retries of creating the lock in the HMS |
+| iceberg.hive.lock-creation-max-wait-ms | 5000 | Maximum time
in milliseconds between retries of creating the lock in the HMS |
+| iceberg.hive.lock-timeout-ms | 180000 (3 min) | Maximum time
in milliseconds to acquire a lock |
+| iceberg.hive.lock-check-min-wait-ms | 50 | Minimum time
in milliseconds between checking the acquisition of the lock |
+| iceberg.hive.lock-check-max-wait-ms | 5000 | Maximum time
in milliseconds between checking the acquisition of the lock |
+| iceberg.hive.lock-heartbeat-interval-ms | 240000 (4 min) | The heartbeat
interval for the HMS locks. |
+| iceberg.hive.metadata-refresh-max-retries | 2 | Maximum number
of retries when the metadata file is missing |
+| iceberg.hive.table-level-lock-evict-ms | 600000 (10 min) | The timeout
for the JVM table lock is |
+
+Note: `iceberg.hive.lock-check-max-wait-ms` and
`iceberg.hive.lock-heartbeat-interval-ms` should be less than the [transaction
timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout)
of the Hive Metastore (`hive.txn.timeout` or `metastore.txn.timeout` in the
newer versions). Otherwise, the heartbeats on the lock (which happens during
the lock checks) would end up expiring in the
Hive Metastore before the lock is retried from Iceberg.
diff --git
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
index 25d9a74a52..20f9eb7f61 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
@@ -160,7 +160,7 @@ public final class HiveSchemaUtil {
return "string";
case TIMESTAMP:
Types.TimestampType timestampType = (Types.TimestampType) type;
- if (MetastoreUtil.hive3PresentOnClasspath() &&
timestampType.shouldAdjustToUTC()) {
+ if (HiveVersion.min(HiveVersion.HIVE_3) &&
timestampType.shouldAdjustToUTC()) {
return "timestamp with local time zone";
}
return "timestamp";
diff --git
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index f91deb5724..e5321d2ab5 100644
---
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -31,11 +31,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
@@ -51,6 +51,9 @@ import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -72,6 +75,7 @@ import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.io.FileIO;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.BiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
@@ -95,6 +99,12 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS =
"iceberg.hive.lock-timeout-ms";
private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS =
"iceberg.hive.lock-check-min-wait-ms";
private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS =
"iceberg.hive.lock-check-max-wait-ms";
+ private static final String HIVE_LOCK_CREATION_TIMEOUT_MS =
+ "iceberg.hive.lock-creation-timeout-ms";
+ private static final String HIVE_LOCK_CREATION_MIN_WAIT_MS =
+ "iceberg.hive.lock-creation-min-wait-ms";
+ private static final String HIVE_LOCK_CREATION_MAX_WAIT_MS =
+ "iceberg.hive.lock-creation-max-wait-ms";
private static final String HIVE_LOCK_HEARTBEAT_INTERVAL_MS =
"iceberg.hive.lock-heartbeat-interval-ms";
private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES =
@@ -111,6 +121,9 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 *
1000; // 3 minutes
private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50
milliseconds
private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; //
5 seconds
+ private static final long HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT = 3 * 60 *
1000; // 3 minutes
+ private static final long HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT = 50; // 50
milliseconds
+ private static final long HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT = 5 * 1000;
// 5 seconds
private static final long HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT = 4 * 60 *
1000; // 4 minutes
private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT =
2;
private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(10);
@@ -161,6 +174,9 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
private final long lockAcquireTimeout;
private final long lockCheckMinWaitTime;
private final long lockCheckMaxWaitTime;
+ private final long lockCreationTimeout;
+ private final long lockCreationMinWaitTime;
+ private final long lockCreationMaxWaitTime;
private final long lockHeartbeatIntervalTime;
private final long maxHiveTablePropertySize;
private final int metadataRefreshMaxRetries;
@@ -187,6 +203,12 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS,
HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
this.lockCheckMaxWaitTime =
conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS,
HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+ this.lockCreationTimeout =
+ conf.getLong(HIVE_LOCK_CREATION_TIMEOUT_MS,
HIVE_LOCK_CREATION_TIMEOUT_MS_DEFAULT);
+ this.lockCreationMinWaitTime =
+ conf.getLong(HIVE_LOCK_CREATION_MIN_WAIT_MS,
HIVE_LOCK_CREATION_MIN_WAIT_MS_DEFAULT);
+ this.lockCreationMaxWaitTime =
+ conf.getLong(HIVE_LOCK_CREATION_MAX_WAIT_MS,
HIVE_LOCK_CREATION_MAX_WAIT_MS_DEFAULT);
this.lockHeartbeatIntervalTime =
conf.getLong(HIVE_LOCK_HEARTBEAT_INTERVAL_MS,
HIVE_LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
this.metadataRefreshMaxRetries =
@@ -255,6 +277,7 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
CommitStatus commitStatus = CommitStatus.FAILURE;
boolean updateHiveTable = false;
+ String agentInfo = "Iceberg-" + UUID.randomUUID();
Optional<Long> lockId = Optional.empty();
// getting a process-level lock per table to avoid concurrent commit
attempts to the same table
// from the same
@@ -263,7 +286,7 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
tableLevelMutex.lock();
HiveLockHeartbeat hiveLockHeartbeat = null;
try {
- lockId = Optional.of(acquireLock());
+ lockId = Optional.of(acquireLock(agentInfo));
hiveLockHeartbeat =
new HiveLockHeartbeat(metaClients, lockId.get(),
lockHeartbeatIntervalTime);
hiveLockHeartbeat.schedule(exitingScheduledExecutorService);
@@ -383,7 +406,8 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
hiveLockHeartbeat.cancel();
}
- cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId,
tableLevelMutex);
+ cleanupMetadataAndUnlock(
+ commitStatus, newMetadataLocation, lockId, tableLevelMutex,
agentInfo);
}
LOG.info(
@@ -606,27 +630,16 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
return storageDescriptor;
}
- @SuppressWarnings("ReverseDnsLookup")
@VisibleForTesting
- long acquireLock() throws UnknownHostException, TException,
InterruptedException {
- final LockComponent lockComponent =
- new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
- lockComponent.setTablename(tableName);
- final LockRequest lockRequest =
- new LockRequest(
- Lists.newArrayList(lockComponent),
- System.getProperty("user.name"),
- InetAddress.getLocalHost().getHostName());
- LockResponse lockResponse = metaClients.run(client ->
client.lock(lockRequest));
- AtomicReference<LockState> state = new
AtomicReference<>(lockResponse.getState());
- long lockId = lockResponse.getLockid();
+ long acquireLock(String agentInfo) throws UnknownHostException, TException,
InterruptedException {
+ LockInfo lockInfo = tryLock(agentInfo);
final long start = System.currentTimeMillis();
long duration = 0;
boolean timeout = false;
try {
- if (state.get().equals(LockState.WAITING)) {
+ if (lockInfo.lockState.equals(LockState.WAITING)) {
// Retry count is the typical "upper bound of retries" for Tasks.run()
function. In fact,
// the maximum number of
// attempts the Tasks.run() would try is `retries + 1`. Here, for
checking locks, we use
@@ -637,7 +650,7 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
// Integer.MIN_VALUE. Hence,
// the retry is set conservatively as `Integer.MAX_VALUE - 100` so it
doesn't hit any
// boundary issues.
- Tasks.foreach(lockId)
+ Tasks.foreach(lockInfo.lockId)
.retry(Integer.MAX_VALUE - 100)
.exponentialBackoff(lockCheckMinWaitTime, lockCheckMaxWaitTime,
lockAcquireTimeout, 1.5)
.throwFailureWhenFinished()
@@ -647,7 +660,7 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
try {
LockResponse response = metaClients.run(client ->
client.checkLock(id));
LockState newState = response.getState();
- state.set(newState);
+ lockInfo.lockState = newState;
if (newState.equals(LockState.WAITING)) {
throw new WaitingForLockException(
String.format("Waiting for lock on table %s.%s",
database, tableName));
@@ -667,30 +680,32 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
timeout = true;
duration = System.currentTimeMillis() - start;
} finally {
- if (!state.get().equals(LockState.ACQUIRED)) {
- unlock(Optional.of(lockId));
+ if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
+ unlock(Optional.of(lockInfo.lockId), agentInfo);
}
}
// timeout and do not have lock acquired
- if (timeout && !state.get().equals(LockState.ACQUIRED)) {
+ if (timeout && !lockInfo.lockState.equals(LockState.ACQUIRED)) {
throw new CommitFailedException(
"Timed out after %s ms waiting for lock on %s.%s", duration,
database, tableName);
}
- if (!state.get().equals(LockState.ACQUIRED)) {
+ if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
throw new CommitFailedException(
- "Could not acquire the lock on %s.%s, " + "lock request ended in
state %s",
- database, tableName, state);
+ "Could not acquire the lock on %s.%s, lock request ended in state
%s",
+ database, tableName, lockInfo.lockState);
}
- return lockId;
+
+ return lockInfo.lockId;
}
private void cleanupMetadataAndUnlock(
CommitStatus commitStatus,
String metadataLocation,
Optional<Long> lockId,
- ReentrantLock tableLevelMutex) {
+ ReentrantLock tableLevelMutex,
+ String agentInfo) {
try {
if (commitStatus == CommitStatus.FAILURE) {
// If we are sure the commit failed, clean up the uncommitted metadata
file
@@ -699,18 +714,52 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
} catch (RuntimeException e) {
LOG.error("Failed to cleanup metadata file at {}", metadataLocation, e);
} finally {
- unlock(lockId);
+ unlock(lockId, agentInfo);
tableLevelMutex.unlock();
}
}
- private void unlock(Optional<Long> lockId) {
- if (lockId.isPresent()) {
- try {
- doUnlock(lockId.get());
- } catch (Exception e) {
- LOG.warn("Failed to unlock {}.{}", database, tableName, e);
+ private void unlock(Optional<Long> lockId, String agentInfo) {
+ Long id = null;
+ try {
+ if (!lockId.isPresent()) {
+ // Try to find the lock based on agentInfo. Only works with Hive 2 or
later.
+ if (HiveVersion.min(HiveVersion.HIVE_2)) {
+ LockInfo lockInfo = findLock(agentInfo);
+ if (lockInfo == null) {
+ // No lock found
+ LOG.info("No lock found with {} agentInfo", agentInfo);
+ return;
+ }
+
+ id = lockInfo.lockId;
+ } else {
+ LOG.warn("Could not find lock with HMSClient {}",
HiveVersion.current());
+ return;
+ }
+ } else {
+ id = lockId.get();
}
+
+ doUnlock(id);
+ } catch (InterruptedException ie) {
+ if (id != null) {
+ // Interrupted unlock. We try to unlock one more time if we have a
lockId
+ try {
+ Thread.interrupted(); // Clear the interrupt status flag for now, so
we can retry unlock
+ LOG.warn("Interrupted unlock we try one more time {}.{}", database,
tableName, ie);
+ doUnlock(id);
+ } catch (Exception e) {
+ LOG.warn("Failed to unlock even on 2nd attempt {}.{}", database,
tableName, e);
+ } finally {
+ Thread.currentThread().interrupt(); // Set back the interrupt status
+ }
+ } else {
+ Thread.currentThread().interrupt(); // Set back the interrupt status
+ LOG.warn("Interrupted finding locks to unlock {}.{}", database,
tableName, ie);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unlock {}.{}", database, tableName, e);
}
}
@@ -759,6 +808,108 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
ConfigProperties.ENGINE_HIVE_ENABLED,
TableProperties.ENGINE_HIVE_ENABLED_DEFAULT);
}
+ /**
+ * Tries to create a lock. If the lock creation fails, and it is possible
then retries the lock
+ * creation a few times. If the lock creation is successful then a {@link
LockInfo} is returned,
+ * otherwise an appropriate exception is thrown.
+ *
+ * @param agentInfo The agentInfo which should be used during lock creation
+ * @return The created lock
+ * @throws UnknownHostException When we are not able to fill the hostname
for lock creation
+ * @throws TException When there is an error during lock creation
+ */
+ @SuppressWarnings("ReverseDnsLookup")
+ private LockInfo tryLock(String agentInfo) throws UnknownHostException,
TException {
+ LockInfo lockInfo = new LockInfo();
+
+ final LockComponent lockComponent =
+ new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
+ lockComponent.setTablename(tableName);
+ final LockRequest lockRequest =
+ new LockRequest(
+ Lists.newArrayList(lockComponent),
+ System.getProperty("user.name"),
+ InetAddress.getLocalHost().getHostName());
+
+ // Only works in Hive 2 or later.
+ if (HiveVersion.min(HiveVersion.HIVE_2)) {
+ lockRequest.setAgentInfo(agentInfo);
+ }
+
+ Tasks.foreach(lockRequest)
+ .retry(Integer.MAX_VALUE - 100)
+ .exponentialBackoff(
+ lockCreationMinWaitTime, lockCreationMaxWaitTime,
lockCreationTimeout, 2.0)
+ .shouldRetryTest(e -> e instanceof TException &&
HiveVersion.min(HiveVersion.HIVE_2))
+ .throwFailureWhenFinished()
+ .run(
+ request -> {
+ try {
+ LockResponse lockResponse = metaClients.run(client ->
client.lock(request));
+ lockInfo.lockId = lockResponse.getLockid();
+ lockInfo.lockState = lockResponse.getState();
+ } catch (TException te) {
+ LOG.warn("Failed to acquire lock {}", request, te);
+ try {
+ // If we can not check for lock, or we do not find it, then
rethrow the exception
+ // Otherwise we are happy as the findLock sets the lockId
and the state correctly
+ if (!HiveVersion.min(HiveVersion.HIVE_2)) {
+ LockInfo lockFound = findLock(agentInfo);
+ if (lockFound != null) {
+ lockInfo.lockId = lockFound.lockId;
+ lockInfo.lockState = lockFound.lockState;
+ LOG.info("Found lock {} by agentInfo {}", lockInfo,
agentInfo);
+ return;
+ }
+ }
+
+ throw te;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn(
+ "Interrupted while checking for lock on table {}.{}",
database, tableName, e);
+ throw new RuntimeException("Interrupted while checking for
lock", e);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while acquiring lock on table {}.{}",
database, tableName, e);
+ throw new RuntimeException("Interrupted while acquiring lock",
e);
+ }
+ },
+ TException.class);
+
+ // This should be initialized always, or exception should be thrown.
+ LOG.debug("Lock {} created for table {}.{}", lockInfo, database,
tableName);
+ return lockInfo;
+ }
+
+ /**
+ * Search for the locks using HMSClient.showLocks identified by the
agentInfo. If the lock is
+ * there, then a {@link LockInfo} object is returned. If the lock is not
found <code>null</code>
+ * is returned.
+ *
+ * @param agentInfo The key for searching the locks
+ * @return The {@link LockInfo} for the found lock, or <code>null</code> if
nothing found
+ */
+ private LockInfo findLock(String agentInfo) throws TException,
InterruptedException {
+ Preconditions.checkArgument(
+ HiveVersion.min(HiveVersion.HIVE_2),
+ "Minimally Hive 2 HMS client is needed to find the Lock using the
showLocks API call");
+ ShowLocksRequest showLocksRequest = new ShowLocksRequest();
+ showLocksRequest.setDbname(database);
+ showLocksRequest.setTablename(tableName);
+ ShowLocksResponse response = metaClients.run(client ->
client.showLocks(showLocksRequest));
+ for (ShowLocksResponseElement lock : response.getLocks()) {
+ if (lock.getAgentInfo().equals(agentInfo)) {
+ // We found our lock
+ return new LockInfo(lock.getLockid(), lock.getState());
+ }
+ }
+
+ // Not found anything
+ return null;
+ }
+
private static class HiveLockHeartbeat implements Runnable {
private final ClientPool<IMetaStoreClient, TException> hmsClients;
private final long lockId;
@@ -799,4 +950,27 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
}
}
}
+
+ private static class LockInfo {
+ private long lockId;
+ private LockState lockState;
+
+ private LockInfo() {
+ this.lockId = -1;
+ this.lockState = null;
+ }
+
+ private LockInfo(long lockId, LockState lockState) {
+ this.lockId = lockId;
+ this.lockState = lockState;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("lockId", lockId)
+ .add("lockState", lockState)
+ .toString();
+ }
+ }
}
diff --git
a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveVersion.java
b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveVersion.java
new file mode 100644
index 0000000000..288c77fcbd
--- /dev/null
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveVersion.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hive;
+
+import org.apache.hive.common.util.HiveVersionInfo;
+
+public enum HiveVersion {
+ HIVE_4(4),
+ HIVE_3(3),
+ HIVE_2(2),
+ HIVE_1_2(1),
+ NOT_SUPPORTED(0);
+
+ private final int order;
+ private static final HiveVersion current = calculate();
+
+ HiveVersion(int order) {
+ this.order = order;
+ }
+
+ public static HiveVersion current() {
+ return current;
+ }
+
+ public static boolean min(HiveVersion other) {
+ return current.order >= other.order;
+ }
+
+ private static HiveVersion calculate() {
+ String version = HiveVersionInfo.getShortVersion();
+ String[] versions = version.split("\\.");
+ switch (versions[0]) {
+ case "4":
+ return HIVE_4;
+ case "3":
+ return HIVE_3;
+ case "2":
+ return HIVE_2;
+ case "1":
+ if (versions[1].equals("2")) {
+ return HIVE_1_2;
+ } else {
+ return NOT_SUPPORTED;
+ }
+ default:
+ return NOT_SUPPORTED;
+ }
+ }
+}
diff --git
a/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
b/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
index 5c3c485d50..7d9dd32e4c 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
@@ -26,12 +26,6 @@ import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
public class MetastoreUtil {
-
- // this class is unique to Hive3 and cannot be found in Hive2, therefore a
good proxy to see if
- // we are working against Hive3 dependencies
- private static final String HIVE3_UNIQUE_CLASS =
- "org.apache.hadoop.hive.serde2.io.DateWritableV2";
-
private static final DynMethods.UnboundMethod ALTER_TABLE =
DynMethods.builder("alter_table")
.impl(
@@ -51,15 +45,8 @@ public class MetastoreUtil {
.impl(IMetaStoreClient.class, "alter_table", String.class,
String.class, Table.class)
.build();
- private static final boolean HIVE3_PRESENT_ON_CLASSPATH = detectHive3();
-
private MetastoreUtil() {}
- /** Returns true if Hive3 dependencies are found on classpath, false
otherwise. */
- public static boolean hive3PresentOnClasspath() {
- return HIVE3_PRESENT_ON_CLASSPATH;
- }
-
/**
* Calls alter_table method using the metastore client. If possible, an
environmental context will
* be used that turns off stats updates to avoid recursive listing.
@@ -71,13 +58,4 @@ public class MetastoreUtil {
ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS,
StatsSetupConst.TRUE));
ALTER_TABLE.invoke(client, databaseName, tblName, table, envContext);
}
-
- private static boolean detectHive3() {
- try {
- Class.forName(HIVE3_UNIQUE_CLASS);
- return true;
- } catch (ClassNotFoundException e) {
- return false;
- }
- }
}
diff --git
a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
index 8b98eb897a..8b439047ca 100644
---
a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
+++
b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
@@ -45,12 +45,15 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.thrift.TException;
import org.junit.AfterClass;
@@ -59,6 +62,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.AdditionalAnswers;
+import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
public class TestHiveCommitLocks extends HiveTableBaseTest {
@@ -76,6 +80,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
LockResponse waitLockResponse = new LockResponse(dummyLockId,
LockState.WAITING);
LockResponse acquiredLockResponse = new LockResponse(dummyLockId,
LockState.ACQUIRED);
LockResponse notAcquiredLockResponse = new LockResponse(dummyLockId,
LockState.NOT_ACQUIRED);
+ ShowLocksResponse emptyLocks = new ShowLocksResponse(Lists.newArrayList());
@BeforeClass
public static void startMetastore() throws Exception {
@@ -137,6 +142,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
catalog.name(),
dbName,
tableName));
+ reset(spyClient);
}
@AfterClass
@@ -178,6 +184,182 @@ public class TestHiveCommitLocks extends
HiveTableBaseTest {
Assert.assertEquals(1, spyOps.current().schema().columns().size()); //
should be 1 again
}
+ @Test
+ public void testLockAcquisitionAfterFailedNotFoundLock() throws TException,
InterruptedException {
+ doReturn(emptyLocks).when(spyClient).showLocks(any());
+ doThrow(new TException("Failed to connect to HMS"))
+ .doReturn(waitLockResponse)
+ .when(spyClient)
+ .lock(any());
+ doReturn(waitLockResponse)
+ .doReturn(acquiredLockResponse)
+ .when(spyClient)
+ .checkLock(eq(dummyLockId));
+ doNothing().when(spyOps).doUnlock(eq(dummyLockId));
+ doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
+
+ spyOps.doCommit(metadataV2, metadataV1);
+
+ Assert.assertEquals(1, spyOps.current().schema().columns().size()); //
should be 1 again
+ }
+
+ @Test
+ public void testLockAcquisitionAfterFailedAndFoundLock() throws TException,
InterruptedException {
+ ArgumentCaptor<LockRequest> lockRequestCaptor =
ArgumentCaptor.forClass(LockRequest.class);
+ doReturn(emptyLocks).when(spyClient).showLocks(any());
+ doThrow(new TException("Failed to connect to HMS"))
+ .doReturn(waitLockResponse)
+ .when(spyClient)
+ .lock(lockRequestCaptor.capture());
+
+ // Capture the lockRequest, and generate a response simulating that we
have a lock
+ ShowLocksResponse showLocksResponse = new
ShowLocksResponse(Lists.newArrayList());
+ ShowLocksResponseElement showLocksElement =
+ new ShowLocksResponseElementWrapper(lockRequestCaptor);
+ showLocksResponse.getLocks().add(showLocksElement);
+
+ doReturn(showLocksResponse).when(spyClient).showLocks(any());
+ doReturn(acquiredLockResponse).when(spyClient).checkLock(eq(dummyLockId));
+ doNothing().when(spyOps).doUnlock(eq(dummyLockId));
+ doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
+
+ spyOps.doCommit(metadataV2, metadataV1);
+
+ Assert.assertEquals(1, spyOps.current().schema().columns().size()); //
should be 1 again
+ }
+
+ @Test
+ public void testUnLock() throws TException {
+ doReturn(waitLockResponse).when(spyClient).lock(any());
+ doReturn(acquiredLockResponse).when(spyClient).checkLock(eq(dummyLockId));
+ doNothing().when(spyClient).unlock(eq(dummyLockId));
+ doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
+
+ spyOps.doCommit(metadataV2, metadataV1);
+
+ verify(spyClient, times(1)).unlock(eq(dummyLockId));
+ }
+
+ @Test
+ public void testUnLockInterruptedUnLock() throws TException {
+ doReturn(waitLockResponse).when(spyClient).lock(any());
+ doReturn(acquiredLockResponse).when(spyClient).checkLock(eq(dummyLockId));
+ doAnswer(
+ invocation -> {
+ throw new InterruptedException("Interrupt test");
+ })
+ .doNothing()
+ .when(spyClient)
+ .unlock(eq(dummyLockId));
+ doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
+
+ spyOps.doCommit(metadataV2, metadataV1);
+
+ verify(spyClient, times(2)).unlock(eq(dummyLockId));
+ }
+
+ @Test
+ public void testUnLockAfterInterruptedLock() throws TException {
+ ArgumentCaptor<LockRequest> lockRequestCaptor =
ArgumentCaptor.forClass(LockRequest.class);
+ doAnswer(
+ invocation -> {
+ throw new InterruptedException("Interrupt test");
+ })
+ .when(spyClient)
+ .lock(lockRequestCaptor.capture());
+
+ // Capture the lockRequest, and generate a response simulating that we
have a lock
+ ShowLocksResponse showLocksResponse = new
ShowLocksResponse(Lists.newArrayList());
+ ShowLocksResponseElement showLocksElement =
+ new ShowLocksResponseElementWrapper(lockRequestCaptor);
+ showLocksResponse.getLocks().add(showLocksElement);
+
+ doReturn(showLocksResponse).when(spyClient).showLocks(any());
+ doReturn(acquiredLockResponse).when(spyClient).checkLock(eq(dummyLockId));
+ doNothing().when(spyClient).unlock(eq(dummyLockId));
+ doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
+
+ AssertHelpers.assertThrows(
+ "Expected an exception",
+ RuntimeException.class,
+ "Interrupted while acquiring lock",
+ () -> spyOps.doCommit(metadataV2, metadataV1));
+
+ verify(spyClient, times(1)).unlock(eq(dummyLockId));
+ // Make sure that we exit the lock loop on InterruptedException
+ verify(spyClient, times(1)).lock(any());
+ }
+
+ @Test
+ public void testUnLockAfterInterruptedLockCheck() throws TException {
+ doReturn(waitLockResponse).when(spyClient).lock(any());
+ doAnswer(
+ invocation -> {
+ throw new InterruptedException("Interrupt test");
+ })
+ .when(spyClient)
+ .checkLock(eq(dummyLockId));
+
+ doNothing().when(spyClient).unlock(eq(dummyLockId));
+ doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
+
+ AssertHelpers.assertThrows(
+ "Expected an exception",
+ RuntimeException.class,
+ "Could not acquire the lock on",
+ () -> spyOps.doCommit(metadataV2, metadataV1));
+
+ verify(spyClient, times(1)).unlock(eq(dummyLockId));
+ // Make sure that we exit the checkLock loop on InterruptedException
+ verify(spyClient, times(1)).checkLock(eq(dummyLockId));
+ }
+
+ @Test
+ public void testUnLockAfterInterruptedGetTable() throws TException {
+ doReturn(acquiredLockResponse).when(spyClient).lock(any());
+ doAnswer(
+ invocation -> {
+ throw new InterruptedException("Interrupt test");
+ })
+ .when(spyClient)
+ .getTable(any(), any());
+
+ doNothing().when(spyClient).unlock(eq(dummyLockId));
+ doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
+
+ AssertHelpers.assertThrows(
+ "Expected an exception",
+ RuntimeException.class,
+ "Interrupted during commit",
+ () -> spyOps.doCommit(metadataV2, metadataV1));
+
+ verify(spyClient, times(1)).unlock(eq(dummyLockId));
+ }
+
+ /** Wraps an ArgumentCaptor to provide data based on the request */
+ private class ShowLocksResponseElementWrapper extends
ShowLocksResponseElement {
+ private ArgumentCaptor<LockRequest> wrapped;
+
+ private ShowLocksResponseElementWrapper(ArgumentCaptor<LockRequest>
wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public String getAgentInfo() {
+ return wrapped.getValue().getAgentInfo();
+ }
+
+ @Override
+ public LockState getState() {
+ return LockState.WAITING;
+ }
+
+ @Override
+ public long getLockid() {
+ return dummyLockId;
+ }
+ }
+
@Test
public void testLockFailureAtFirstTime() throws TException {
doReturn(notAcquiredLockResponse).when(spyClient).lock(any());
@@ -286,11 +468,11 @@ public class TestHiveCommitLocks extends
HiveTableBaseTest {
}
@Test
- public void testLockHeartbeat() throws TException {
+ public void testLockHeartbeat() throws TException, InterruptedException {
doReturn(acquiredLockResponse).when(spyClient).lock(any());
doAnswer(AdditionalAnswers.answersWithDelay(2000,
InvocationOnMock::callRealMethod))
- .when(spyClient)
- .getTable(any(), any());
+ .when(spyOps)
+ .loadHmsTable();
doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));
spyOps.doCommit(metadataV2, metadataV1);
diff --git
a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
index dedec50f0a..869cb9fe44 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
@@ -237,10 +237,15 @@ public class TestHiveCommits extends HiveTableBaseTest {
* second committer placed a commit on top of ours before the first
committer was able to check if
* their commit succeeded or not
*
- * <p>Timeline: Client 1 commits which throws an exception but suceeded
Client 1's lock expires
- * while waiting to do the recheck for commit success Client 2 acquires a
lock, commits
- * successfully on top of client 1's commit and release lock Client 1
check's to see if their
- * commit was successful
+ * <p>Timeline:
+ *
+ * <ul>
+ * <li>Client 1 commits which throws an exception but succeeded
+ * <li>Client 1's lock expires while waiting to do the recheck for commit
success
+ * <li>Client 2 acquires a lock, commits successfully on top of client 1's
commit and release
+ * lock
+ * <li>Client 1 check's to see if their commit was successful
+ * </ul>
*
* <p>This tests to make sure a disconnected client 1 doesn't think their
commit failed just
* because it isn't the current one during the recheck phase.
@@ -266,11 +271,11 @@ public class TestHiveCommits extends HiveTableBaseTest {
AtomicLong lockId = new AtomicLong();
doAnswer(
i -> {
- lockId.set(ops.acquireLock());
+ lockId.set(ops.acquireLock("agentInfo"));
return lockId.get();
})
.when(spyOps)
- .acquireLock();
+ .acquireLock(any());
concurrentCommitAndThrowException(ops, spyOps, table, lockId);
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index add9e07c54..5f2eb9834b 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.hive.HiveVersion;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader;
import org.apache.iceberg.mr.mapred.Container;
@@ -63,7 +63,7 @@ public class HiveIcebergInputFormat extends
MapredIcebergInputFormat<Record>
HIVE_VECTORIZED_RECORDREADER_CTOR;
static {
- if (MetastoreUtil.hive3PresentOnClasspath()) {
+ if (HiveVersion.min(HiveVersion.HIVE_3)) {
HIVE_VECTORIZED_RECORDREADER_CTOR =
DynConstructors.builder(AbstractMapredIcebergRecordReader.class)
.impl(
@@ -114,7 +114,7 @@ public class HiveIcebergInputFormat extends
MapredIcebergInputFormat<Record>
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)
&& Utilities.getVectorizedRowBatchCtx(job) != null) {
Preconditions.checkArgument(
- MetastoreUtil.hive3PresentOnClasspath(), "Vectorization only
supported for Hive 3+");
+ HiveVersion.min(HiveVersion.HIVE_3), "Vectorization only supported
for Hive 3+");
job.setEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.HIVE);
job.setBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, true);
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java
b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java
index f4014ed6f2..8be9a586d5 100644
---
a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java
+++
b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergObjectInspector.java
@@ -27,7 +27,7 @@ import
org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.iceberg.Schema;
import org.apache.iceberg.common.DynMethods;
-import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.hive.HiveVersion;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
@@ -38,7 +38,7 @@ public final class IcebergObjectInspector extends
TypeUtil.SchemaVisitor<ObjectI
// we need to do this because there is a breaking API change in
Date/TimestampObjectInspector
// between Hive2 and Hive3
private static final String DATE_INSPECTOR_CLASS =
- MetastoreUtil.hive3PresentOnClasspath()
+ HiveVersion.min(HiveVersion.HIVE_3)
?
"org.apache.iceberg.mr.hive.serde.objectinspector.IcebergDateObjectInspectorHive3"
:
"org.apache.iceberg.mr.hive.serde.objectinspector.IcebergDateObjectInspector";
@@ -46,12 +46,12 @@ public final class IcebergObjectInspector extends
TypeUtil.SchemaVisitor<ObjectI
DynMethods.builder("get").impl(DATE_INSPECTOR_CLASS).buildStatic().invoke();
private static final String TIMESTAMP_INSPECTOR_CLASS =
- MetastoreUtil.hive3PresentOnClasspath()
+ HiveVersion.min(HiveVersion.HIVE_3)
?
"org.apache.iceberg.mr.hive.serde.objectinspector.IcebergTimestampObjectInspectorHive3"
:
"org.apache.iceberg.mr.hive.serde.objectinspector.IcebergTimestampObjectInspector";
private static final String TIMESTAMPTZ_INSPECTOR_CLASS =
- MetastoreUtil.hive3PresentOnClasspath()
+ HiveVersion.min(HiveVersion.HIVE_3)
?
"org.apache.iceberg.mr.hive.serde.objectinspector.IcebergTimestampWithZoneObjectInspectorHive3"
:
"org.apache.iceberg.mr.hive.serde.objectinspector.IcebergTimestampWithZoneObjectInspector";
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 8c31723da5..a95454b8b0 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -61,7 +61,7 @@ import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.hive.HiveVersion;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
@@ -200,7 +200,7 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
private static final DynMethods.StaticMethod
HIVE_VECTORIZED_READER_BUILDER;
static {
- if (MetastoreUtil.hive3PresentOnClasspath()) {
+ if (HiveVersion.min(HiveVersion.HIVE_3)) {
HIVE_VECTORIZED_READER_BUILDER =
DynMethods.builder("reader")
.impl(
@@ -398,7 +398,7 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
case PIG:
throw new UnsupportedOperationException("Parquet support not yet
supported for Pig");
case HIVE:
- if (MetastoreUtil.hive3PresentOnClasspath()) {
+ if (HiveVersion.min(HiveVersion.HIVE_3)) {
parquetIterator =
HIVE_VECTORIZED_READER_BUILDER.invoke(inputFile, task,
idToConstant, context);
} else {
@@ -445,7 +445,7 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
// TODO: implement value readers for Pig
throw new UnsupportedOperationException("ORC support not yet
supported for Pig");
case HIVE:
- if (MetastoreUtil.hive3PresentOnClasspath()) {
+ if (HiveVersion.min(HiveVersion.HIVE_3)) {
orcIterator =
HIVE_VECTORIZED_READER_BUILDER.invoke(inputFile, task,
idToConstant, context);
} else {
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java
index 464f7ccc0a..4a5d819279 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.io.Text;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.hive.HiveVersion;
import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
@@ -161,7 +161,7 @@ public class TestDeserializer {
@Test
public void testDeserializeEverySupportedType() {
Assume.assumeFalse(
- "No test yet for Hive3 (Date/Timestamp creation)",
MetastoreUtil.hive3PresentOnClasspath());
+ "No test yet for Hive3 (Date/Timestamp creation)",
HiveVersion.min(HiveVersion.HIVE_3));
Deserializer deserializer =
new Deserializer.Builder()
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index 0326ef4e7e..002e222932 100644
---
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -51,7 +51,7 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.hive.HiveSchemaUtil;
-import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.hive.HiveVersion;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -798,7 +798,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
expectedIcebergProperties.put(TableProperties.ENGINE_HIVE_ENABLED,
"true");
}
- if (MetastoreUtil.hive3PresentOnClasspath()) {
+ if (HiveVersion.min(HiveVersion.HIVE_3)) {
expectedIcebergProperties.put("bucketing_version", "2");
}
Assert.assertEquals(expectedIcebergProperties, icebergTable.properties());
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 4b74ad4fc6..09242ead09 100644
---
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -42,7 +42,7 @@ import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.hive.HiveSchemaUtil;
-import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.hive.HiveVersion;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -127,7 +127,7 @@ public class TestHiveIcebergStorageHandlerWithEngine {
// test for vectorization=ON in case of ORC format and Tez engine
if ((fileFormat == FileFormat.PARQUET || fileFormat ==
FileFormat.ORC)
&& "tez".equals(engine)
- && MetastoreUtil.hive3PresentOnClasspath()) {
+ && HiveVersion.min(HiveVersion.HIVE_3)) {
testParams.add(
new Object[] {fileFormat, engine,
TestTables.TestTableType.HIVE_CATALOG, true});
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index ec166b6dc6..faeb7d0df7 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -48,7 +48,7 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hive.HiveCatalog;
-import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.hive.HiveVersion;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestCatalogs;
@@ -422,7 +422,7 @@ abstract class TestTables {
this(
conf,
temp,
- (MetastoreUtil.hive3PresentOnClasspath() ? "file:" : "")
+ (HiveVersion.min(HiveVersion.HIVE_3) ? "file:" : "")
+ temp.newFolder("custom", "warehouse").toString(),
catalogName);
}
@@ -457,7 +457,7 @@ abstract class TestTables {
this(
conf,
temp,
- (MetastoreUtil.hive3PresentOnClasspath() ? "file:" : "")
+ (HiveVersion.min(HiveVersion.HIVE_3) ? "file:" : "")
+ temp.newFolder("hadoop", "warehouse").toString(),
catalogName);
}
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergObjectInspector.java
b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergObjectInspector.java
index 0641ee2b63..c4d3cea4b0 100644
---
a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergObjectInspector.java
+++
b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergObjectInspector.java
@@ -28,7 +28,7 @@ import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.hive.HiveVersion;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
@@ -99,7 +99,7 @@ public class TestIcebergObjectInspector {
Assert.assertEquals(3, dateField.getFieldID());
Assert.assertEquals("date_field", dateField.getFieldName());
Assert.assertEquals("date comment", dateField.getFieldComment());
- if (MetastoreUtil.hive3PresentOnClasspath()) {
+ if (HiveVersion.min(HiveVersion.HIVE_3)) {
Assert.assertEquals(
"org.apache.iceberg.mr.hive.serde.objectinspector.IcebergDateObjectInspectorHive3",
dateField.getFieldObjectInspector().getClass().getName());
@@ -169,7 +169,7 @@ public class TestIcebergObjectInspector {
Assert.assertEquals(11, timestampField.getFieldID());
Assert.assertEquals("timestamp_field", timestampField.getFieldName());
Assert.assertEquals("timestamp comment", timestampField.getFieldComment());
- if (MetastoreUtil.hive3PresentOnClasspath()) {
+ if (HiveVersion.min(HiveVersion.HIVE_3)) {
Assert.assertEquals(
"IcebergTimestampObjectInspectorHive3",
timestampField.getFieldObjectInspector().getClass().getSimpleName());
@@ -183,7 +183,7 @@ public class TestIcebergObjectInspector {
Assert.assertEquals(12, timestampTzField.getFieldID());
Assert.assertEquals("timestamptz_field", timestampTzField.getFieldName());
Assert.assertEquals("timestamptz comment",
timestampTzField.getFieldComment());
- if (MetastoreUtil.hive3PresentOnClasspath()) {
+ if (HiveVersion.min(HiveVersion.HIVE_3)) {
Assert.assertEquals(
"IcebergTimestampWithZoneObjectInspectorHive3",
timestampTzField.getFieldObjectInspector().getClass().getSimpleName());