This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 5d66a5e4e12 [HUDI-7728] Use StorageConfiguration in LockProvider constructors (#11173) 5d66a5e4e12 is described below commit 5d66a5e4e12e6245d3de602dbe7ac5de1e34a846 Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Wed May 8 14:33:26 2024 -0700 [HUDI-7728] Use StorageConfiguration in LockProvider constructors (#11173) --- .../lock/DynamoDBBasedLockProvider.java | 13 +++---- .../lock/FileSystemBasedLockProvider.java | 4 +- .../hudi/client/transaction/lock/LockManager.java | 5 ++- .../lock/ZookeeperBasedLockProvider.java | 4 +- .../FileSystemBasedLockProviderTestClass.java | 9 +++-- .../InProcessLockProviderWithRuntimeError.java | 7 ++-- .../hudi/client/TestFileBasedLockProvider.java | 16 ++++---- .../transaction/lock/InProcessLockProvider.java | 4 +- .../common/table/timeline/TimeGeneratorBase.java | 3 +- .../lock/TestInProcessLockProvider.java | 43 +++++++++++----------- .../table/timeline/TestWaitBasedTimeGenerator.java | 2 +- .../lock/HiveMetastoreBasedLockProvider.java | 5 ++- .../TestHiveMetastoreBasedLockProvider.java | 20 +++++----- .../testutils/HiveSyncFunctionalTestHarness.java | 5 +++ 14 files changed, 77 insertions(+), 63 deletions(-) diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java index a3e61924026..2b67a483f38 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java @@ -19,20 +19,22 @@ package org.apache.hudi.aws.transaction.lock; import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory; +import org.apache.hudi.aws.utils.DynamoTableUtils; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.lock.LockProvider; import org.apache.hudi.common.lock.LockState; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.DynamoDbBasedLockConfig; import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.storage.StorageConfiguration; import com.amazonaws.services.dynamodbv2.AcquireLockOptions; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions; import com.amazonaws.services.dynamodbv2.LockItem; import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException; - -import org.apache.hudi.aws.utils.DynamoTableUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; @@ -42,9 +44,6 @@ import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.concurrent.NotThreadSafe; @@ -71,11 +70,11 @@ public class DynamoDBBasedLockProvider implements LockProvider<LockItem> { protected final DynamoDbBasedLockConfig dynamoDBLockConfiguration; private volatile LockItem lock; - public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) { + public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final StorageConfiguration<?> conf) { this(lockConfiguration, conf, null); } - public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf, DynamoDbClient dynamoDB) { + public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final StorageConfiguration<?> conf, DynamoDbClient dynamoDB) { this.dynamoDBLockConfiguration = DynamoDbBasedLockConfig.newBuilder() .fromProperties(lockConfiguration.getConfig()) .build(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java index cf8ffb7186a..73f14355ca9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java @@ -33,10 +33,10 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieLockException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StorageSchemes; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -72,7 +72,7 @@ public class FileSystemBasedLockProvider implements LockProvider<String>, Serial private LockInfo lockInfo; private String currentOwnerLockInfo; - public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration configuration) { + public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, final StorageConfiguration<?> configuration) { checkRequiredProps(lockConfiguration); this.lockConfiguration = lockConfiguration; String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java index f1fd05bc02b..3b5b9c449a9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java @@ -29,6 +29,8 @@ import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; @@ -109,7 +111,8 @@ public class LockManager implements Serializable, AutoCloseable { if (lockProvider == null) { LOG.info("LockProvider " + writeConfig.getLockProviderClass()); lockProvider = (LockProvider) ReflectionUtils.loadClass(writeConfig.getLockProviderClass(), - lockConfiguration, hadoopConf.get()); + new Class<?>[] {LockConfiguration.class, StorageConfiguration.class}, + lockConfiguration, HadoopFSUtils.getStorageConf(hadoopConf.get())); } return lockProvider; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java index 4299a603ece..02f137b509a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java @@ -24,13 +24,13 @@ import org.apache.hudi.common.lock.LockState; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.BoundedExponentialBackoffRetry; -import org.apache.hadoop.conf.Configuration; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,7 +64,7 @@ public class ZookeeperBasedLockProvider implements LockProvider<InterProcessMute private volatile InterProcessMutex lock = null; protected LockConfiguration lockConfiguration; - public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) { + public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, final StorageConfiguration<?> conf) { checkRequiredProps(lockConfiguration); this.lockConfiguration = lockConfiguration; this.curatorFrameworkClient = CuratorFrameworkFactory.builder() diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java index 9488d5bab6c..2df166c1c71 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java @@ -18,14 +18,15 @@ package org.apache.hudi.client.transaction; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.lock.LockProvider; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieLockException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.StorageConfiguration; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import java.io.IOException; import java.io.Serializable; @@ -50,7 +51,7 @@ public class FileSystemBasedLockProviderTestClass implements LockProvider<String private transient Path lockFile; protected LockConfiguration lockConfiguration; - public FileSystemBasedLockProviderTestClass(final LockConfiguration lockConfiguration, final Configuration configuration) { + public FileSystemBasedLockProviderTestClass(final LockConfiguration lockConfiguration, final StorageConfiguration<?> configuration) { this.lockConfiguration = lockConfiguration; final String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY); this.retryWaitTimeMs = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/InProcessLockProviderWithRuntimeError.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/InProcessLockProviderWithRuntimeError.java index f825012f131..2824e0dd47f 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/InProcessLockProviderWithRuntimeError.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/InProcessLockProviderWithRuntimeError.java @@ -18,16 +18,17 @@ package org.apache.hudi.client.transaction; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.storage.StorageConfiguration; + +import java.util.concurrent.TimeUnit; public class InProcessLockProviderWithRuntimeError extends InProcessLockProvider { public InProcessLockProviderWithRuntimeError( LockConfiguration lockConfiguration, - Configuration conf) { + StorageConfiguration<?> conf) { super(lockConfiguration, conf); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java index e81a85c5978..0fcc9dadea1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java @@ -22,6 +22,7 @@ package org.apache.hudi.client; import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.BeforeEach; @@ -38,6 +39,7 @@ import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PA import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY; +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -48,7 +50,7 @@ public class TestFileBasedLockProvider { Path tempDir; String basePath; LockConfiguration lockConfiguration; - Configuration hadoopConf; + StorageConfiguration<Configuration> storageConf; @BeforeEach public void setUp() throws IOException { @@ -60,12 +62,12 @@ public class TestFileBasedLockProvider { properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000"); properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3"); lockConfiguration = new LockConfiguration(properties); - hadoopConf = new Configuration(); + storageConf = getDefaultStorageConf(); } @Test public void testAcquireLock() { - FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, storageConf); assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); fileBasedLockProvider.unlock(); @@ -75,7 +77,7 @@ public class TestFileBasedLockProvider { public void testAcquireLockWithDefaultPath() { lockConfiguration.getConfig().remove(FILESYSTEM_LOCK_PATH_PROP_KEY); lockConfiguration.getConfig().setProperty(HoodieWriteConfig.BASE_PATH.key(), basePath); - FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, storageConf); assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); fileBasedLockProvider.unlock(); @@ -84,7 +86,7 @@ public class TestFileBasedLockProvider { @Test public void testUnLock() { - FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, storageConf); assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); fileBasedLockProvider.unlock(); @@ -94,7 +96,7 @@ public class TestFileBasedLockProvider { @Test public void testReentrantLock() { - FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, storageConf); assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); assertFalse(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() @@ -105,7 +107,7 @@ public class TestFileBasedLockProvider { @Test public void testUnlockWithoutLock() { assertDoesNotThrow(() -> { - FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, storageConf); fileBasedLockProvider.unlock(); }); } diff --git a/hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java b/hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java index c2edb1864b0..ab191bacab8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java +++ b/hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java @@ -26,8 +26,8 @@ import org.apache.hudi.common.lock.LockProvider; import org.apache.hudi.common.lock.LockState; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.storage.StorageConfiguration; -import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +55,7 @@ public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLoc private final String basePath; private final long maxWaitTimeMillis; - public InProcessLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) { + public InProcessLockProvider(final LockConfiguration lockConfiguration, final StorageConfiguration<?> conf) { TypedProperties typedProperties = lockConfiguration.getConfig(); basePath = lockConfiguration.getConfig().getProperty(HoodieCommonConfig.BASE_PATH.key()); ValidationUtils.checkArgument(basePath != null); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGeneratorBase.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGeneratorBase.java index 672710135ef..c5d76c36119 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGeneratorBase.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGeneratorBase.java @@ -91,7 +91,8 @@ public abstract class TimeGeneratorBase implements TimeGenerator, Serializable { String lockProviderClass = lockConfiguration.getConfig().getString("hoodie.write.lock.provider"); LOG.info("LockProvider for TimeGenerator: " + lockProviderClass); lockProvider = (LockProvider<?>) ReflectionUtils.loadClass(lockProviderClass, - lockConfiguration, storageConf.unwrap()); + new Class<?>[] {LockConfiguration.class, StorageConfiguration.class}, + lockConfiguration, storageConf); } } } diff --git a/hudi-common/src/test/java/org/apache/hudi/client/transaction/lock/TestInProcessLockProvider.java b/hudi-common/src/test/java/org/apache/hudi/client/transaction/lock/TestInProcessLockProvider.java index 6e7dcd7e3fa..47c06e43b9e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/client/transaction/lock/TestInProcessLockProvider.java +++ b/hudi-common/src/test/java/org/apache/hudi/client/transaction/lock/TestInProcessLockProvider.java @@ -22,9 +22,9 @@ import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.storage.StorageConfiguration; import junit.framework.AssertionFailedError; -import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -36,13 +36,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; public class TestInProcessLockProvider { private static final Logger LOG = LoggerFactory.getLogger(TestInProcessLockProvider.class); - private final Configuration hadoopConfiguration = new Configuration(); + private final StorageConfiguration<?> storageConf = getDefaultStorageConf(); private final LockConfiguration lockConfiguration1; private final LockConfiguration lockConfiguration2; @@ -63,7 +64,7 @@ public class TestInProcessLockProvider { // Writer 2: try lock | ... lock |------| unlock and close // Writer 3: try lock | ... lock |------| unlock and close List<InProcessLockProvider> lockProviderList = new ArrayList<>(); - InProcessLockProvider lockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider lockProvider1 = new InProcessLockProvider(lockConfiguration1, storageConf); lockProviderList.add(lockProvider1); AtomicBoolean writer1Completed = new AtomicBoolean(false); AtomicBoolean writer2TryLock = new AtomicBoolean(false); @@ -81,7 +82,7 @@ public class TestInProcessLockProvider { // Writer 2 thread in parallel, should block // and later acquire the lock once it is released Thread writer2 = new Thread(() -> { - InProcessLockProvider lockProvider2 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider lockProvider2 = new InProcessLockProvider(lockConfiguration1, storageConf); lockProviderList.add(lockProvider2); assertDoesNotThrow(() -> { LOG.info("Writer 2 tries to acquire the lock."); @@ -117,7 +118,7 @@ public class TestInProcessLockProvider { } } // Lock instance of Writer 3 should be held by Writer 2 - InProcessLockProvider lockProvider3 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider lockProvider3 = new InProcessLockProvider(lockConfiguration1, storageConf); lockProviderList.add(lockProvider3); boolean isLocked = lockProvider3.getLock().isWriteLocked(); if (!isLocked) { @@ -173,7 +174,7 @@ public class TestInProcessLockProvider { @Test public void testLockAcquisition() { - InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, storageConf); assertDoesNotThrow(() -> { inProcessLockProvider.lock(); }); @@ -184,7 +185,7 @@ public class TestInProcessLockProvider { @Test public void testLockReAcquisitionBySameThread() { - InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, storageConf); assertDoesNotThrow(() -> { inProcessLockProvider.lock(); }); @@ -198,8 +199,8 @@ public class TestInProcessLockProvider { @Test public void testLockReAcquisitionBySameThreadWithTwoTables() { - InProcessLockProvider inProcessLockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); - InProcessLockProvider inProcessLockProvider2 = new InProcessLockProvider(lockConfiguration2, hadoopConfiguration); + InProcessLockProvider inProcessLockProvider1 = new InProcessLockProvider(lockConfiguration1, storageConf); + InProcessLockProvider inProcessLockProvider2 = new InProcessLockProvider(lockConfiguration2, storageConf); assertDoesNotThrow(() -> { inProcessLockProvider1.lock(); @@ -223,7 +224,7 @@ public class TestInProcessLockProvider { @Test public void testLockReAcquisitionByDifferentThread() { - InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, storageConf); final AtomicBoolean writer2Completed = new AtomicBoolean(false); // Main test thread @@ -263,8 +264,8 @@ public class TestInProcessLockProvider { @Test public void testLockReAcquisitionByDifferentThreadWithTwoTables() { - InProcessLockProvider inProcessLockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); - InProcessLockProvider inProcessLockProvider2 = new InProcessLockProvider(lockConfiguration2, hadoopConfiguration); + InProcessLockProvider inProcessLockProvider1 = new InProcessLockProvider(lockConfiguration1, storageConf); + InProcessLockProvider inProcessLockProvider2 = new InProcessLockProvider(lockConfiguration2, storageConf); final AtomicBoolean writer2Stream1Completed = new AtomicBoolean(false); final AtomicBoolean writer2Stream2Completed = new AtomicBoolean(false); @@ -329,7 +330,7 @@ public class TestInProcessLockProvider { @Test public void testTryLockAcquisition() { - InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, storageConf); Assertions.assertTrue(inProcessLockProvider.tryLock()); assertDoesNotThrow(() -> { inProcessLockProvider.unlock(); @@ -338,7 +339,7 @@ public class TestInProcessLockProvider { @Test public void testTryLockAcquisitionWithTimeout() { - InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, storageConf); Assertions.assertTrue(inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS)); assertDoesNotThrow(() -> { inProcessLockProvider.unlock(); @@ -347,7 +348,7 @@ public class TestInProcessLockProvider { @Test public void testTryLockReAcquisitionBySameThread() { - InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, storageConf); Assertions.assertTrue(inProcessLockProvider.tryLock()); assertThrows(HoodieLockException.class, () -> { inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS); @@ -359,7 +360,7 @@ public class TestInProcessLockProvider { @Test public void testTryLockReAcquisitionByDifferentThread() { - InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, storageConf); final AtomicBoolean writer2Completed = new AtomicBoolean(false); // Main test thread @@ -387,7 +388,7 @@ public class TestInProcessLockProvider { @Test public void testTryUnLockByDifferentThread() { - InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, storageConf); final AtomicBoolean writer3Completed = new AtomicBoolean(false); // Main test thread @@ -431,7 +432,7 @@ public class TestInProcessLockProvider { @Test public void testTryLockAcquisitionBeforeTimeOutFromTwoThreads() { - final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, storageConf); final int threadCount = 3; final long awaitMaxTimeoutMs = 2000L; final CountDownLatch latch = new CountDownLatch(threadCount); @@ -492,7 +493,7 @@ public class TestInProcessLockProvider { @Test public void testLockReleaseByClose() { - InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, storageConf); assertDoesNotThrow(() -> { inProcessLockProvider.lock(); }); @@ -503,7 +504,7 @@ public class TestInProcessLockProvider { @Test public void testRedundantUnlock() { - InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, storageConf); assertDoesNotThrow(() -> { inProcessLockProvider.lock(); }); @@ -517,7 +518,7 @@ public class TestInProcessLockProvider { @Test public void testUnlockWithoutLock() { - InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, storageConf); assertDoesNotThrow(() -> { inProcessLockProvider.unlock(); }); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java index dcc92b969df..681e62bdeef 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java @@ -50,7 +50,7 @@ public class TestWaitBasedTimeGenerator { NEED_TO_LOCK_LATER.set(lockLater); } - public MockInProcessLockProvider(LockConfiguration lockConfiguration, Configuration conf) { + public MockInProcessLockProvider(LockConfiguration lockConfiguration, StorageConfiguration<?> conf) { super(lockConfiguration, conf); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java index 4c5aa5cb4f7..b458df9a579 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieLockException; import org.apache.hudi.hive.util.IMetaStoreClientUtil; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -87,12 +88,12 @@ public class HiveMetastoreBasedLockProvider implements LockProvider<LockResponse private ScheduledFuture<?> future = null; private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); - public HiveMetastoreBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) { + public HiveMetastoreBasedLockProvider(final LockConfiguration lockConfiguration, final StorageConfiguration<?> conf) { this(lockConfiguration); try { HiveConf hiveConf = new HiveConf(); setHiveLockConfs(hiveConf); - hiveConf.addResource(conf); + hiveConf.addResource(conf.unwrapAs(Configuration.class)); this.hiveClient = IMetaStoreClientUtil.getMSC(hiveConf); } catch (MetaException | HiveException e) { throw new HoodieLockException("Failed to create HiveMetaStoreClient", e); diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/functional/TestHiveMetastoreBasedLockProvider.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/functional/TestHiveMetastoreBasedLockProvider.java index b01b4cdc058..6f456e0551b 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/functional/TestHiveMetastoreBasedLockProvider.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/functional/TestHiveMetastoreBasedLockProvider.java @@ -21,8 +21,8 @@ package org.apache.hudi.hive.functional; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.hive.transaction.lock.HiveMetastoreBasedLockProvider; import org.apache.hudi.hive.testutils.HiveSyncFunctionalTestHarness; +import org.apache.hudi.hive.transaction.lock.HiveMetastoreBasedLockProvider; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.LockComponent; @@ -80,7 +80,7 @@ public class TestHiveMetastoreBasedLockProvider extends HiveSyncFunctionalTestHa @Test public void testAcquireLock() throws Exception { - HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, hiveConf()); + HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, storageConf()); lockComponent.setOperationType(DataOperationType.NO_TXN); Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS, lockComponent)); @@ -100,7 +100,7 @@ public class TestHiveMetastoreBasedLockProvider extends HiveSyncFunctionalTestHa @Test public void testUnlock() throws Exception { - HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, hiveConf()); + HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, storageConf()); lockComponent.setOperationType(DataOperationType.NO_TXN); Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS, lockComponent)); @@ -113,7 +113,7 @@ public class TestHiveMetastoreBasedLockProvider extends HiveSyncFunctionalTestHa @Test public void testReentrantLock() throws Exception { - HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, hiveConf()); + HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, storageConf()); lockComponent.setOperationType(DataOperationType.NO_TXN); Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS, lockComponent)); @@ -127,8 +127,8 @@ public class TestHiveMetastoreBasedLockProvider extends HiveSyncFunctionalTestHa lockProvider.unlock(); // not acquired in the beginning - HiveMetastoreBasedLockProvider lockProvider1 = new HiveMetastoreBasedLockProvider(lockConfiguration, hiveConf()); - HiveMetastoreBasedLockProvider lockProvider2 = new HiveMetastoreBasedLockProvider(lockConfiguration, hiveConf()); + HiveMetastoreBasedLockProvider lockProvider1 = new HiveMetastoreBasedLockProvider(lockConfiguration, storageConf()); + HiveMetastoreBasedLockProvider lockProvider2 = new HiveMetastoreBasedLockProvider(lockConfiguration, storageConf()); lockComponent.setOperationType(DataOperationType.NO_TXN); Assertions.assertTrue(lockProvider1.acquireLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS, lockComponent)); @@ -152,8 +152,8 @@ public class TestHiveMetastoreBasedLockProvider extends HiveSyncFunctionalTestHa @Test public void testWaitingLock() throws Exception { // create different HiveMetastoreBasedLockProvider to simulate different applications - HiveMetastoreBasedLockProvider lockProvider1 = new HiveMetastoreBasedLockProvider(lockConfiguration, hiveConf()); - HiveMetastoreBasedLockProvider lockProvider2 = new HiveMetastoreBasedLockProvider(lockConfiguration, hiveConf()); + HiveMetastoreBasedLockProvider lockProvider1 = new HiveMetastoreBasedLockProvider(lockConfiguration, storageConf()); + HiveMetastoreBasedLockProvider lockProvider2 = new HiveMetastoreBasedLockProvider(lockConfiguration, storageConf()); lockComponent.setOperationType(DataOperationType.NO_TXN); Assertions.assertTrue(lockProvider1.acquireLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS, lockComponent)); @@ -166,7 +166,7 @@ public class TestHiveMetastoreBasedLockProvider extends HiveSyncFunctionalTestHa } lockProvider1.unlock(); // create the third HiveMetastoreBasedLockProvider to acquire lock - HiveMetastoreBasedLockProvider lockProvider3 = new HiveMetastoreBasedLockProvider(lockConfiguration, hiveConf()); + HiveMetastoreBasedLockProvider lockProvider3 = new HiveMetastoreBasedLockProvider(lockConfiguration, storageConf()); boolean acquireStatus = lockProvider3.acquireLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS, lockComponent); // we should acquired lock, since lockProvider1 has already released lock @@ -180,7 +180,7 @@ public class TestHiveMetastoreBasedLockProvider extends HiveSyncFunctionalTestHa @Test public void testUnlockWithoutLock() { - HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, hiveConf()); + HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, storageConf()); lockComponent.setOperationType(DataOperationType.NO_TXN); lockProvider.unlock(); } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java index f84de8a8d34..70d77abd248 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java @@ -28,6 +28,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncClient; import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor; import org.apache.hudi.hive.util.IMetaStoreClientUtil; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -85,6 +86,10 @@ public class HiveSyncFunctionalTestHarness { return hiveTestService.getHiveServer().getHiveConf(); } + public StorageConfiguration<Configuration> storageConf() { + return HadoopFSUtils.getStorageConf(hiveConf()); + } + public ZookeeperTestService zkService() { return zookeeperTestService; }