This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 9a9dd3a82d3 [HUDI-6123] Auto adjust lock configs for single writer (#8542) 9a9dd3a82d3 is described below commit 9a9dd3a82d3e69f1d5eebe46c79c8fd0dc2355db Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Sun Apr 23 16:37:48 2023 +0800 [HUDI-6123] Auto adjust lock configs for single writer (#8542) Currently, the `hoodie.auto.adjust.lock.configs` opiton is by default false for batch mode ingestion, and true for spark streaming sink and delta_streamer, while MDT is by default enabled. For multiple streaming writers with no explicit lock provider set up, `InProcessLockProvider` should not be used. Change list: 1. Restrict the option `hoodie.auto.adjust.lock.configs` to take effect in single writer scope, because for multi-writer, the `InProcessLockProvider` can not work as expected among hosts/processes; 2. The LockManager #lock and #unlock are invoked from the TransactionManager which already does the checks for the requirement of an explicit lock, remove the redundant check in LockManager. --- .../apache/hudi/client/BaseHoodieWriteClient.java | 2 +- .../DirectMarkerTransactionManager.java | 6 +- .../client/transaction/TransactionManager.java | 18 +++--- .../hudi/client/transaction/lock/LockManager.java | 62 ++++++++++----------- .../org/apache/hudi/config/HoodieWriteConfig.java | 54 +++++++++--------- .../action/commit/BaseCommitActionExecutor.java | 2 +- .../hudi/client/transaction/TestLockManager.java | 65 +++++++++++++--------- .../apache/hudi/client/HoodieFlinkWriteClient.java | 2 +- .../apache/hudi/configuration/OptionsResolver.java | 2 +- .../org/apache/hudi/util/FlinkWriteClients.java | 2 +- 10 files changed, 113 insertions(+), 102 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 2f129c0aab6..79a813567b9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -493,7 +493,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) { setOperationType(writeOperationType); - this.lastCompletedTxnAndMetadata = txnManager.isNeedsLockGuard() + this.lastCompletedTxnAndMetadata = txnManager.isLockRequired() ? TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient) : Option.empty(); this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient); this.pendingInflightAndRequestedInstants.remove(instantTime); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java index f27af9a2549..7ed6d51038c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java @@ -42,12 +42,12 @@ public class DirectMarkerTransactionManager extends TransactionManager { private final String filePath; public DirectMarkerTransactionManager(HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) { - super(new LockManager(config, fs, createUpdatedLockProps(config, partitionPath, fileId)), config.needsLockGuard()); + super(new LockManager(config, fs, createUpdatedLockProps(config, partitionPath, fileId)), config.isLockRequired()); this.filePath = partitionPath + "/" + fileId; } public void beginTransaction(String newTxnOwnerInstantTime) { - if (needsLockGuard) { + if (isLockRequired) { LOG.info("Transaction starting for " + newTxnOwnerInstantTime + " and " + filePath); lockManager.lock(); @@ -57,7 +57,7 @@ public class DirectMarkerTransactionManager extends TransactionManager { } public void endTransaction(String currentTxnOwnerInstantTime) { - if (needsLockGuard) { + if (isLockRequired) { LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstantTime + " for " + filePath); if (reset(Option.of(getInstant(currentTxnOwnerInstantTime)), Option.empty(), Option.empty())) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java index c257d46f594..b3e9abc7a3a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java @@ -37,22 +37,22 @@ public class TransactionManager implements Serializable { protected static final Logger LOG = LoggerFactory.getLogger(TransactionManager.class); protected final LockManager lockManager; - protected final boolean needsLockGuard; + protected final boolean isLockRequired; protected Option<HoodieInstant> currentTxnOwnerInstant = Option.empty(); private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty(); public TransactionManager(HoodieWriteConfig config, FileSystem fs) { - this(new LockManager(config, fs), config.needsLockGuard()); + this(new LockManager(config, fs), config.isLockRequired()); } - protected TransactionManager(LockManager lockManager, boolean needsLockGuard) { + protected TransactionManager(LockManager lockManager, boolean isLockRequired) { this.lockManager = lockManager; - this.needsLockGuard = needsLockGuard; + this.isLockRequired = isLockRequired; } public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant, Option<HoodieInstant> lastCompletedTxnOwnerInstant) { - if (needsLockGuard) { + if (isLockRequired) { LOG.info("Transaction starting for " + newTxnOwnerInstant + " with latest completed transaction instant " + lastCompletedTxnOwnerInstant); lockManager.lock(); @@ -63,7 +63,7 @@ public class TransactionManager implements Serializable { } public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) { - if (needsLockGuard) { + if (isLockRequired) { LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant); if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) { lockManager.unlock(); @@ -84,7 +84,7 @@ public class TransactionManager implements Serializable { } public void close() { - if (needsLockGuard) { + if (isLockRequired) { lockManager.close(); LOG.info("Transaction manager closed"); } @@ -102,7 +102,7 @@ public class TransactionManager implements Serializable { return currentTxnOwnerInstant; } - public boolean isNeedsLockGuard() { - return needsLockGuard; + public boolean isLockRequired() { + return isLockRequired; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java index d7c7905c793..598f7cd7072 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java @@ -68,39 +68,37 @@ public class LockManager implements Serializable, AutoCloseable { } public void lock() { - if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { - LockProvider lockProvider = getLockProvider(); - int retryCount = 0; - boolean acquired = false; - while (retryCount <= maxRetries) { + LockProvider lockProvider = getLockProvider(); + int retryCount = 0; + boolean acquired = false; + while (retryCount <= maxRetries) { + try { + metrics.startLockApiTimerContext(); + acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS); + if (acquired) { + metrics.updateLockAcquiredMetric(); + break; + } + metrics.updateLockNotAcquiredMetric(); + LOG.info("Retrying to acquire lock. Current lock owner information : " + lockProvider.getCurrentOwnerLockInfo()); + Thread.sleep(maxWaitTimeInMs); + } catch (HoodieLockException | InterruptedException e) { + metrics.updateLockNotAcquiredMetric(); + if (retryCount >= maxRetries) { + throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock(), e); + } try { - metrics.startLockApiTimerContext(); - acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS); - if (acquired) { - metrics.updateLockAcquiredMetric(); - break; - } - metrics.updateLockNotAcquiredMetric(); - LOG.info("Retrying to acquire lock. Current lock owner information : " + lockProvider.getCurrentOwnerLockInfo()); Thread.sleep(maxWaitTimeInMs); - } catch (HoodieLockException | InterruptedException e) { - metrics.updateLockNotAcquiredMetric(); - if (retryCount >= maxRetries) { - throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock(), e); - } - try { - Thread.sleep(maxWaitTimeInMs); - } catch (InterruptedException ex) { - // ignore InterruptedException here - } - } finally { - retryCount++; + } catch (InterruptedException ex) { + // ignore InterruptedException here } - } - if (!acquired) { - throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock()); + } finally { + retryCount++; } } + if (!acquired) { + throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock()); + } } /** @@ -108,11 +106,9 @@ public class LockManager implements Serializable, AutoCloseable { * and tries to call unlock() */ public void unlock() { - if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { - getLockProvider().unlock(); - metrics.updateLockHeldTimerMetrics(); - close(); - } + getLockProvider().unlock(); + metrics.updateLockHeldTimerMetrics(); + close(); } public synchronized LockProvider getLockProvider() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 0475619944c..b5e93c90e1f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2344,7 +2344,7 @@ public class HoodieWriteConfig extends HoodieConfig { * File listing metadata configs. */ public boolean isMetadataTableEnabled() { - return metadataConfig.enabled(); + return getBooleanOrDefault(HoodieMetadataConfig.ENABLE); } public int getMetadataInsertParallelism() { @@ -2483,8 +2483,15 @@ public class HoodieWriteConfig extends HoodieConfig { /** * Returns whether the explicit guard of lock is required. */ - public boolean needsLockGuard() { - return isMetadataTableEnabled() || getWriteConcurrencyMode().supportsOptimisticConcurrencyControl(); + public boolean isLockRequired() { + return !isDefaultLockProvider() || getWriteConcurrencyMode().supportsOptimisticConcurrencyControl(); + } + + /** + * Returns whether the lock provider is default. + */ + private boolean isDefaultLockProvider() { + return HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue().equals(getLockProviderClass()); } /** @@ -3035,30 +3042,27 @@ public class HoodieWriteConfig extends HoodieConfig { autoAdjustConfigsForConcurrencyMode(isLockProviderPropertySet); } + private boolean isLockRequiredForSingleWriter() { + // When metadata table is enabled, lock provider must be used for + // single writer with async table services. + // Async table services can update the metadata table and a lock provider is + // needed to guard against any concurrent table write operations. If user has + // not configured any lock provider, let's use the InProcess lock provider. + return writeConfig.isMetadataTableEnabled() && writeConfig.areAnyTableServicesAsync() + && !writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl(); + } + private void autoAdjustConfigsForConcurrencyMode(boolean isLockProviderPropertySet) { - if (writeConfig.isAutoAdjustLockConfigs()) { + if (!isLockProviderPropertySet && writeConfig.isAutoAdjustLockConfigs() && isLockRequiredForSingleWriter()) { // auto adjustment is required only for deltastreamer and spark streaming where async table services can be executed in the same JVM. - boolean isMetadataTableEnabled = writeConfig.getBoolean(HoodieMetadataConfig.ENABLE); - - if (isMetadataTableEnabled) { - // When metadata table is enabled, optimistic concurrency control must be used for - // single writer with async table services. - // Async table services can update the metadata table and a lock provider is - // needed to guard against any concurrent table write operations. If user has - // not configured any lock provider, let's use the InProcess lock provider. - boolean areTableServicesEnabled = writeConfig.areTableServicesEnabled(); - boolean areAsyncTableServicesEnabled = writeConfig.areAnyTableServicesAsync(); - if (!isLockProviderPropertySet && areTableServicesEnabled && areAsyncTableServicesEnabled) { - // This is targeted at Single writer with async table services - // If user does not set the lock provider, likely that the concurrency mode is not set either - // Override the configs for metadata table - writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), - InProcessLockProvider.class.getName()); - LOG.info(String.format("Automatically set %s=%s since user has not set the " - + "lock provider for single writer with async table services", - HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName())); - } - } + // This is targeted at Single writer with async table services + // If user does not set the lock provider, likely that the concurrency mode is not set either + // Override the configs for metadata table + writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + InProcessLockProvider.class.getName()); + LOG.info(String.format("Automatically set %s=%s since user has not set the " + + "lock provider for single writer with async table services", + HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName())); } // We check if "hoodie.cleaner.policy.failed.writes" diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index 1a79904ef5c..318f6155235 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -94,7 +94,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, R> this.taskContextSupplier = context.getTaskContextSupplier(); // TODO : Remove this once we refactor and move out autoCommit method from here, since the TxnManager is held in {@link BaseHoodieWriteClient}. this.txnManagerOption = config.shouldAutoCommit() ? Option.of(new TransactionManager(config, table.getMetaClient().getFs())) : Option.empty(); - if (this.txnManagerOption.isPresent() && this.txnManagerOption.get().isNeedsLockGuard()) { + if (this.txnManagerOption.isPresent() && this.txnManagerOption.get().isLockRequired()) { // these txn metadata are only needed for auto commit when optimistic concurrent control is also enabled this.lastCompletedTxn = TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient()); this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient()); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java index 96d8ae65d98..1b4c08c5329 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestLockManager.java @@ -31,7 +31,8 @@ import org.apache.curator.test.TestingServer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,11 +46,8 @@ public class TestLockManager extends HoodieCommonTestHarness { private static final Logger LOG = LoggerFactory.getLogger(TestLockManager.class); private static TestingServer server; - private static String zk_basePath = "/hudi/test/lock"; - private static String key = "table1"; - - HoodieWriteConfig writeConfig; - LockManager lockManager; + private static final String ZK_BASE_PATH = "/hudi/test/lock"; + private static final String KEY = "table1"; @BeforeAll public static void setup() { @@ -70,32 +68,17 @@ public class TestLockManager extends HoodieCommonTestHarness { } } - private HoodieWriteConfig getWriteConfig() { - return HoodieWriteConfig.newBuilder() - .withPath(basePath) - .withCleanConfig(HoodieCleanConfig.newBuilder() - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) - .build()) - .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) - .withLockConfig(HoodieLockConfig.newBuilder() - .withLockProvider(ZookeeperBasedLockProvider.class) - .withZkBasePath(zk_basePath) - .withZkLockKey(key) - .withZkQuorum(server.getConnectString()) - .build()) - .build(); - } - @BeforeEach - private void init() throws IOException { + void init() throws IOException { initPath(); initMetaClient(); - this.writeConfig = getWriteConfig(); - this.lockManager = new LockManager(this.writeConfig, this.metaClient.getFs()); } - @Test - public void testLockAndUnlock() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testLockAndUnlock(boolean multiWriter) { + HoodieWriteConfig writeConfig = multiWriter ? getMultiWriterWriteConfig() : getSingleWriterWriteConfig(); + LockManager lockManager = new LockManager(writeConfig, this.metaClient.getFs()); LockManager mockLockManager = Mockito.spy(lockManager); assertDoesNotThrow(() -> { @@ -108,4 +91,32 @@ public class TestLockManager extends HoodieCommonTestHarness { Mockito.verify(mockLockManager).close(); } + + private HoodieWriteConfig getMultiWriterWriteConfig() { + return HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(ZookeeperBasedLockProvider.class) + .withZkBasePath(ZK_BASE_PATH) + .withZkLockKey(KEY) + .withZkQuorum(server.getConnectString()) + .build()) + .build(); + } + + private HoodieWriteConfig getSingleWriterWriteConfig() { + return HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(ZookeeperBasedLockProvider.class) + .withZkBasePath(ZK_BASE_PATH) + .withZkLockKey(KEY) + .withZkQuorum(server.getConnectString()) + .build()) + .build(); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 0498d190fc8..3eeffc3943e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -280,7 +280,7 @@ public class HoodieFlinkWriteClient<T> extends * should be called before the Driver starts a new transaction. */ public void preTxn(HoodieTableMetaClient metaClient) { - if (txnManager.isNeedsLockGuard()) { + if (txnManager.isLockRequired()) { // refresh the meta client which is reused metaClient.reloadActiveTimeline(); this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 6da87e66089..fd1e5b5f32b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -243,7 +243,7 @@ public class OptionsResolver { /** * Returns whether the writer txn should be guarded by lock. */ - public static boolean needsGuardByLock(Configuration conf) { + public static boolean isLockRequired(Configuration conf) { return conf.getBoolean(FlinkOptions.METADATA_ENABLED) || conf.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue()) .equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java index 34d326f843f..c039cbaacd8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java @@ -224,7 +224,7 @@ public class FlinkWriteClients { .withProps(flinkConf2TypedProperties(conf)) .withSchema(getSourceSchema(conf).toString()); - if (OptionsResolver.needsGuardByLock(conf) && !conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) { + if (OptionsResolver.isLockRequired(conf) && !conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) { // configure the fs lock provider by default builder.withLockConfig(HoodieLockConfig.newBuilder() .withConflictResolutionStrategy(OptionsResolver.getConflictResolutionStrategy(conf))