This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3dc2df53f48 [HUDI-9158] Add storage-based lock provider abstract
implementation (#13103)
3dc2df53f48 is described below
commit 3dc2df53f48ee8032894ee3ab0f54f5f046e0c02
Author: Alex R <[email protected]>
AuthorDate: Fri Apr 11 03:17:03 2025 -0700
[HUDI-9158] Add storage-based lock provider abstract implementation (#13103)
* Add initial changes for storage-based lock provider abstract
implementation
---------
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../transaction/lock/StorageBasedLockProvider.java | 531 ++++++++++++++++++
.../lock/models/LockProviderHeartbeatManager.java | 2 +-
.../transaction/lock/models/StorageLockFile.java | 10 +-
.../apache/hudi/config/StorageBasedLockConfig.java | 100 ++++
.../lock/TestStorageBasedLockProvider.java | 624 +++++++++++++++++++++
.../hudi/config/TestStorageBasedLockConfig.java | 101 ++++
.../hudi/common/table/HoodieTableMetaClient.java | 2 +
.../apache/hudi/common/fs/TestStorageSchemes.java | 9 +
.../org/apache/hudi/storage/StorageSchemes.java | 94 ++--
9 files changed, 1433 insertions(+), 40 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
new file mode 100644
index 00000000000..ba9dfb9a3fa
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import
org.apache.hudi.client.transaction.lock.models.LockProviderHeartbeatManager;
+import org.apache.hudi.client.transaction.lock.models.LockUpdateResult;
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.StorageBasedLockConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StorageSchemes;
+
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.net.URI;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.lock.LockState.ACQUIRED;
+import static org.apache.hudi.common.lock.LockState.ACQUIRING;
+import static org.apache.hudi.common.lock.LockState.FAILED_TO_ACQUIRE;
+import static org.apache.hudi.common.lock.LockState.FAILED_TO_RELEASE;
+import static org.apache.hudi.common.lock.LockState.RELEASED;
+import static org.apache.hudi.common.lock.LockState.RELEASING;
+import static
org.apache.hudi.common.table.HoodieTableMetaClient.LOCKS_FOLDER_NAME;
+
+/**
+ * A distributed filesystem storage based lock provider. This {@link
LockProvider} implementation
+ * leverages conditional writes to ensure transactional consistency for
multi-writer scenarios.
+ * The underlying storage client interface {@link StorageLock} is pluggable so
it can be implemented for any
+ * filesystem which supports conditional writes.
+ */
+@ThreadSafe
+public class StorageBasedLockProvider implements LockProvider<StorageLockFile>
{
+
+ public static final String DEFAULT_TABLE_LOCK_FILE_NAME = "table_lock";
+ // How long to wait before retrying lock acquisition in blocking calls.
+ private static final long DEFAULT_LOCK_ACQUISITION_BUFFER_MS = 1000;
+ // Maximum expected clock drift between two nodes.
+ // This is similar idea as SkewAdjustingTimeGenerator.
+ // In reality, within a single cloud provider all nodes share the same ntp
+ // server
+ // therefore we do not expect drift more than a few ms.
+ // However, since our lock leases are pretty long, we can use a high buffer.
+ private static final long CLOCK_DRIFT_BUFFER_MS = 500;
+
+ // When we retry lock upserts, do so 5 times
+ private static final long LOCK_UPSERT_RETRY_COUNT = 5;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StorageBasedLockProvider.class);
+
+ // Use for testing
+ private final Logger logger;
+
+ // The lock service implementation which interacts with storage
+ private final StorageLock lockService;
+
+ private final long validitySeconds;
+ private final String ownerId;
+ private final String lockFilePath;
+ private final HeartbeatManager heartbeatManager;
+ private final transient Thread shutdownThread;
+
+ @GuardedBy("this")
+ private StorageLockFile currentLockObj = null;
+ @GuardedBy("this")
+ private boolean isClosed = false;
+
+ private synchronized void setLock(StorageLockFile lockObj) {
+ if (lockObj != null && !Objects.equals(lockObj.getOwner(), this.ownerId)) {
+ throw new HoodieLockException("Owners do not match. Current lock owner:
" + this.ownerId + " lock path: "
+ + this.lockFilePath + " owner: " + lockObj.getOwner());
+ }
+ this.currentLockObj = lockObj;
+ }
+
+ /**
+ * Default constructor for StorageBasedLockProvider, required by LockManager
+ * to instantiate it using reflection.
+ *
+ * @param lockConfiguration The lock configuration, should be transformable
into
+ * StorageBasedLockConfig
+ * @param conf Storage config, ignored.
+ */
+ public StorageBasedLockProvider(final LockConfiguration lockConfiguration,
final StorageConfiguration<?> conf) {
+ this(
+ UUID.randomUUID().toString(),
+ lockConfiguration.getConfig(),
+ LockProviderHeartbeatManager::new,
+ getLockService(),
+ LOGGER);
+ }
+
+ private static Functions.Function3<String, String, TypedProperties,
StorageLock> getLockService() {
+ return (ownerId, lockFilePath, lockConfig) -> {
+ try {
+ return (StorageLock) ReflectionUtils.loadClass(
+ getLockServiceClassName(new URI(lockFilePath).getScheme()),
+ new Class<?>[]{String.class, String.class, String.class,
Properties.class},
+ new Object[]{ownerId, lockFilePath, lockConfig});
+ } catch (Throwable e) {
+ throw new HoodieLockException("Failed to load and initialize
StorageLock", e);
+ }
+ };
+ }
+
+ private static @NotNull String getLockServiceClassName(String scheme) {
+ Option<StorageSchemes> schemeOptional =
StorageSchemes.getStorageLockImplementationIfExists(scheme);
+ if (schemeOptional.isPresent()) {
+ return schemeOptional.get().getStorageLockClass();
+ } else {
+ throw new HoodieNotSupportedException("No implementation of StorageLock
supports this scheme: " + scheme);
+ }
+ }
+
+ @VisibleForTesting
+ StorageBasedLockProvider(
+ String ownerId,
+ TypedProperties properties,
+ Functions.Function3<String, Long, Supplier<Boolean>, HeartbeatManager>
heartbeatManagerLoader,
+ Functions.Function3<String, String, TypedProperties, StorageLock>
lockServiceLoader,
+ Logger logger) {
+ StorageBasedLockConfig config = new
StorageBasedLockConfig.Builder().fromProperties(properties).build();
+ long heartbeatPollSeconds = config.getHeartbeatPollSeconds();
+ this.validitySeconds = config.getValiditySeconds();
+ this.lockFilePath = String.format("%s%s%s", config.getHudiTableBasePath(),
StoragePath.SEPARATOR, LOCKS_FOLDER_NAME);
+ this.heartbeatManager = heartbeatManagerLoader.apply(ownerId,
heartbeatPollSeconds * 1000, this::renewLock);
+ this.lockService = lockServiceLoader.apply(ownerId, lockFilePath,
properties);
+ this.ownerId = ownerId;
+ this.logger = logger;
+ shutdownThread = new Thread(() -> shutdown(true));
+ Runtime.getRuntime().addShutdownHook(shutdownThread);
+ logger.info("Instantiated new storage-based lock provider, owner: {},
lockfilePath: {}", ownerId, lockFilePath);
+ }
+
+ // -----------------------------------------
+ // BASE METHODS
+ // -----------------------------------------
+
+ @Override
+ public synchronized StorageLockFile getLock() {
+ return currentLockObj;
+ }
+
+ /**
+ * Attempts to acquire the lock within the given timeout.
+ */
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ long deadlineNanos = System.nanoTime() + unit.toNanos(time);
+
+ while (System.nanoTime() < deadlineNanos) {
+ try {
+ logDebugLockState(ACQUIRING);
+ if (tryLock()) {
+ return true;
+ }
+ Thread.sleep(DEFAULT_LOCK_ACQUISITION_BUFFER_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new
HoodieLockException(generateLockStateMessage(LockState.FAILED_TO_ACQUIRE), e);
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public synchronized void close() {
+ shutdown(false);
+ }
+
+ private synchronized void shutdown(boolean fromShutdownHook) {
+ if (fromShutdownHook) {
+ // Try to expire the lock from the shutdown hook.
+ if (!isClosed && actuallyHoldsLock()) {
+ tryExpireCurrentLock(true);
+ }
+ // Do not execute any further actions
+ return;
+ } else {
+ Runtime.getRuntime().removeShutdownHook(shutdownThread);
+ }
+ try {
+ this.unlock();
+ } catch (Exception e) {
+ logger.error("Owner {}: Failed to unlock current lock.", ownerId, e);
+ }
+ try {
+ this.lockService.close();
+ } catch (Exception e) {
+ logger.error("Owner {}: Lock service failed to close.", ownerId, e);
+ }
+ try {
+ this.heartbeatManager.close();
+ } catch (Exception e) {
+ logger.error("Owner {}: Heartbeat manager failed to close.", ownerId, e);
+ }
+
+ this.isClosed = true;
+ }
+
+ private synchronized boolean isLockStillValid(StorageLockFile lock) {
+ return !lock.isExpired() &&
!isCurrentTimeCertainlyOlderThanDistributedTime(lock.getValidUntil());
+ }
+
+ /**
+ * Attempts a single pass to acquire the lock (non-blocking).
+ *
+ * @return true if lock acquired, false otherwise
+ */
+ @Override
+ public synchronized boolean tryLock() {
+ assertHeartbeatManagerExists();
+ assertUnclosed();
+ logDebugLockState(ACQUIRING);
+ if (actuallyHoldsLock()) {
+ // Supports reentrant locks
+ return true;
+ }
+
+ if (this.heartbeatManager.hasActiveHeartbeat()) {
+ logger.error("Detected broken invariant: there is an active heartbeat
without a lock being held.");
+ // Breach of object invariant - we should never have an active heartbeat
without
+ // holding a lock.
+ throw new
HoodieLockException(generateLockStateMessage(FAILED_TO_ACQUIRE));
+ }
+
+ Pair<LockGetResult, StorageLockFile> latestLock =
this.lockService.readCurrentLockFile();
+ if (latestLock.getLeft() == LockGetResult.UNKNOWN_ERROR) {
+ logInfoLockState(FAILED_TO_ACQUIRE, "Failed to get the latest lock
status");
+ // We were not able to determine whether a lock was present.
+ return false;
+ }
+
+ if (latestLock.getLeft() == LockGetResult.SUCCESS &&
isLockStillValid(latestLock.getRight())) {
+ String msg = String.format("Lock already held by %s",
latestLock.getRight().getOwner());
+ // Lock held by others.
+ logInfoLockState(FAILED_TO_ACQUIRE, msg);
+ return false;
+ }
+
+ // Try to acquire the lock
+ StorageLockData newLockData = new StorageLockData(false,
System.currentTimeMillis() + validitySeconds, ownerId);
+ Pair<LockUpdateResult, StorageLockFile> lockUpdateStatus =
this.lockService.tryCreateOrUpdateLockFile(
+ newLockData, latestLock.getLeft() == LockGetResult.NOT_EXISTS ? null :
latestLock.getRight());
+ if (lockUpdateStatus.getLeft() != LockUpdateResult.SUCCESS) {
+ // failed to acquire the lock, indicates concurrent contention
+ logInfoLockState(FAILED_TO_ACQUIRE);
+ return false;
+ }
+ this.setLock(lockUpdateStatus.getRight());
+
+ // There is a remote chance that
+ // - after lock is acquired but before heartbeat starts the lock is
expired.
+ // - lock is acquired and heartbeat is up yet it does not run timely
before the
+ // lock is expired
+ // It is mitigated by setting the lock validity period to a reasonably long
+ // period to survive until heartbeat comes, plus
+ // set the heartbeat interval relatively small enough.
+ if
(!this.heartbeatManager.startHeartbeatForThread(Thread.currentThread())) {
+ // Precondition "no active heartbeat" is checked previously, so when
+ // startHeartbeatForThread returns false,
+ // we are confident no heartbeat thread is running.
+ logErrorLockState(RELEASING, "We were unable to start the heartbeat!");
+ tryExpireCurrentLock(false);
+ return false;
+ }
+
+ logInfoLockState(ACQUIRED);
+ return true;
+ }
+
+ /**
+ * Determines whether this provider currently holds a valid lock.
+ *
+ * <p>
+ * This method checks both the existence of a lock object and its validity. A
+ * lock is considered
+ * valid only if it exists and has not expired according to its timestamp.
+ *
+ * @return {@code true} if this provider holds a valid lock, {@code false}
+ * otherwise
+ */
+ private boolean actuallyHoldsLock() {
+ return believesLockMightBeHeld() && isLockStillValid(getLock());
+ }
+
+ /**
+ * Checks if this provider has a non-null lock object reference.
+ *
+ * <p>
+ * A non-null lock object indicates that this provider has previously
+ * **successfully** acquired a lock via
+ * StorageBasedLockProvider##lock and has not yet **successfully** released
+ * it via StorageBasedLockProvider#unlock().
+ * It is merely an indicator that the lock might be held by this provider. To
+ * truly certify we are the owner of the lock,
+ * StorageBasedLockProvider#actuallyHoldsLock should be used.
+ *
+ * @return {@code true} if this provider has a non-null lock object,
+ * {@code false} otherwise
+ * @see StorageBasedLockProvider#actuallyHoldsLock()
+ */
+ private boolean believesLockMightBeHeld() {
+ return this.getLock() != null;
+ }
+
+ /**
+ * Unlock by marking our current lock file "expired": true.
+ */
+ @Override
+ public synchronized void unlock() {
+ assertHeartbeatManagerExists();
+ if (!believesLockMightBeHeld()) {
+ return;
+ }
+ boolean believesNoLongerHoldsLock = true;
+
+ // Try to stop the heartbeat first
+ if (heartbeatManager.hasActiveHeartbeat()) {
+ logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId);
+ believesNoLongerHoldsLock &= heartbeatManager.stopHeartbeat(true);
+ }
+
+ // Then expire the current lock.
+ believesNoLongerHoldsLock &= tryExpireCurrentLock(false);
+ if (!believesNoLongerHoldsLock) {
+ throw new
HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE));
+ }
+ }
+
+ private void assertHeartbeatManagerExists() {
+ if (heartbeatManager == null) {
+ // broken function precondition.
+ throw new HoodieLockException("Unexpected null heartbeatManager");
+ }
+ }
+
+ private void assertUnclosed() {
+ if (this.isClosed) {
+ throw new HoodieLockException("Lock provider already closed");
+ }
+ }
+
+ /**
+ * Tries to expire the currently held lock.
+ * @param fromShutdownHook Whether we are attempting best effort quick
unlock from shutdown hook.
+ * @return True if we were successfully able to upload an expired lock.
+ */
+ private synchronized boolean tryExpireCurrentLock(boolean fromShutdownHook) {
+ // It does not make sense to have heartbeat alive extending the lock lease
while
+ // here we are trying
+ // to expire the lock.
+ if (!fromShutdownHook && heartbeatManager.hasActiveHeartbeat()) {
+ // broken function precondition.
+ throw new HoodieLockException("Must stop heartbeat before expire lock
file");
+ }
+ logDebugLockState(RELEASING);
+ // Upload metadata that will unlock this lock.
+ StorageLockData expiredLockData = new StorageLockData(true,
this.getLock().getValidUntil(), ownerId);
+ Pair<LockUpdateResult, StorageLockFile> result;
+ if (fromShutdownHook) {
+ // Only try once for shutdown hook, then return immediately
+ result = this.lockService.tryCreateOrUpdateLockFile(expiredLockData,
this.getLock());
+ } else {
+ result = this.lockService.tryCreateOrUpdateLockFileWithRetry(
+ () -> expiredLockData,
+ this.getLock(),
+ // Keep retrying for the normal validity time.
+ LOCK_UPSERT_RETRY_COUNT);
+ }
+ switch (result.getLeft()) {
+ case UNKNOWN_ERROR:
+ // Here we do not know the state of the lock.
+ logErrorLockState(FAILED_TO_RELEASE, "Lock state is unknown.");
+ return false;
+ case SUCCESS:
+ logInfoLockState(RELEASED);
+ setLock(null);
+ return true;
+ case ACQUIRED_BY_OTHERS:
+ // As we are confident no lock is held by itself, clean up the cached
lock object.
+ // However this is an edge case, so warn.
+ logWarnLockState(RELEASED, "lock should not have been acquired by
others.");
+ setLock(null);
+ return true;
+ default:
+ throw new HoodieLockException("Unexpected lock update result: " +
result.getLeft());
+ }
+ }
+
+ /**
+ * Renews (heartbeats) the current lock if we are the holder, it forcefully
set
+ * the expiration flag
+ * to false and the lock expiration time to a later time in the future.
+ * @return True if we successfully renewed the lock, false if not.
+ */
+ @VisibleForTesting
+ protected synchronized boolean renewLock() {
+ try {
+ // If we don't hold the lock, no-op.
+ if (!believesLockMightBeHeld()) {
+ logger.warn("Owner {}: Cannot renew, no lock held by this process",
ownerId);
+ // No need to extend lock lease.
+ return false;
+ }
+
+ long oldExpiration = getLock().getValidUntil();
+ // Attempt conditional update, extend lock. There are 3 cases:
+ // 1. Happy case: lock has not expired yet, we extend the lease to a
longer
+ // period.
+ // 2. Corner case 1: lock is expired and is acquired by others, lock
renewal
+ // failed with ACQUIRED_BY_OTHERS.
+ // 3. Corner case 2: lock is expired but no one has acquired it yet, lock
+ // renewal "revived" the expired lock.
+ // Please note we expect the corner cases almost never happens.
+ // Action taken for corner case 2 is just a best effort mitigation. At
least it
+ // prevents further data corruption by
+ // letting someone else acquire the lock.
+ Pair<LockUpdateResult, StorageLockFile> currentLock =
this.lockService.tryCreateOrUpdateLockFileWithRetry(
+ () -> new StorageLockData(false, System.currentTimeMillis() +
validitySeconds, ownerId),
+ getLock(),
+ LOCK_UPSERT_RETRY_COUNT);
+ switch (currentLock.getLeft()) {
+ case ACQUIRED_BY_OTHERS:
+ logger.error("Owner {}: Unable to renew lock as it is acquired by
others.", ownerId);
+ // No need to extend lock lease anymore.
+ return false;
+ case UNKNOWN_ERROR:
+ // This could be transient, but unclear, we will let the heartbeat
continue
+ // normally.
+ // If the next heartbeat run identifies our lock has expired we will
error out.
+ logger.warn("Owner {}: Unable to renew lock due to unknown error,
could be transient.", ownerId);
+ // Let heartbeat retry later.
+ return true;
+ case SUCCESS:
+ // Only positive outcome
+ this.setLock(currentLock.getRight());
+ logger.info("Owner {}: Lock renewal successful. The renewal
completes {} ms before expiration for lock {}.",
+ ownerId, oldExpiration - System.currentTimeMillis(),
lockFilePath);
+ // Let heartbeat continue to renew lock lease again later.
+ return true;
+ default:
+ throw new HoodieLockException("Unexpected lock update result: " +
currentLock.getLeft());
+ }
+ } catch (Exception e) {
+ logger.error("Owner {}: Exception occurred while renewing lock",
ownerId, e);
+ return false;
+ }
+ }
+
+ // ---------
+ // Utilities
+ // ---------
+
+ /**
+ * Method to calculate whether a timestamp from a distributed source has
+ * definitively occurred yet.
+ */
+ protected boolean isCurrentTimeCertainlyOlderThanDistributedTime(long epoch)
{
+ return System.currentTimeMillis() > epoch + CLOCK_DRIFT_BUFFER_MS;
+ }
+
+ private String generateLockStateMessage(LockState state) {
+ String threadName = Thread.currentThread().getName();
+ return String.format("Owner %s: Lock file path %s, Thread %s, Storage
based lock state %s", ownerId,
+ lockFilePath, threadName, state.toString());
+ }
+
+ private static final String LOCK_STATE_LOGGER_MSG = "Owner {}: Lock file
path {}, Thread {}, Storage based lock state {}";
+ private static final String LOCK_STATE_LOGGER_MSG_WITH_INFO = "Owner {}:
Lock file path {}, Thread {}, Storage based lock state {}, {}";
+
+ private void logDebugLockState(LockState state) {
+ logger.debug(LOCK_STATE_LOGGER_MSG, ownerId, lockFilePath,
Thread.currentThread(), state);
+ }
+
+ private void logInfoLockState(LockState state) {
+ logger.info(LOCK_STATE_LOGGER_MSG, ownerId, lockFilePath,
Thread.currentThread(), state);
+ }
+
+ private void logInfoLockState(LockState state, String msg) {
+ logger.info(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath,
Thread.currentThread(), state, msg);
+ }
+
+ private void logWarnLockState(LockState state, String msg) {
+ logger.warn(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath,
Thread.currentThread(), state, msg);
+ }
+
+ private void logErrorLockState(LockState state, String msg) {
+ logger.error(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath,
Thread.currentThread(), state, msg);
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
index 4beba39dc95..be73bd7cb2c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
@@ -35,7 +35,7 @@ import java.util.function.Supplier;
/**
* LockProviderHeartbeatManager is a helper class which handles the scheduling
and stopping of heartbeat
- * tasks. This is intended for use with the conditional write lock provider,
which requires
+ * tasks. This is intended for use with the storage based lock provider, which
requires
* a separate thread to spawn and renew the lock repeatedly.
* It should be responsible for the entire lifecycle of the heartbeat task.
* Importantly, a new instance should be created for each lock provider.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
index 2a045f31aef..dfccf23d67e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
@@ -35,7 +35,7 @@ public class StorageLockFile {
private final StorageLockData data;
private final String versionId;
- // Get a custom object mapper. See ConditionalWriteLockData for required
properties.
+ // Get a custom object mapper. See StorageLockData for required properties.
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
// This allows us to add new properties down the line.
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
@@ -43,7 +43,7 @@ public class StorageLockFile {
.enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);
/**
- * Initializes a ConditionalWriteLockFile using the data and unique
versionId.
+ * Initializes a StorageLockFile using the data and unique versionId.
*
* @param data The data in the lock file.
* @param versionId The version of this lock file. Used to ensure
consistency through conditional writes.
@@ -56,11 +56,11 @@ public class StorageLockFile {
}
/**
- * Factory method to load an input stream into a ConditionalWriteLockFile.
+ * Factory method to load an input stream into a StorageLockFile.
*
* @param inputStream The input stream of the JSON content.
* @param versionId The unique version identifier for the lock file.
- * @return A new instance of ConditionalWriteLockFile.
+ * @return A new instance of StorageLockFile.
* @throws HoodieIOException If deserialization fails.
*/
public static StorageLockFile createFromStream(InputStream inputStream,
String versionId) {
@@ -68,7 +68,7 @@ public class StorageLockFile {
StorageLockData data = OBJECT_MAPPER.readValue(inputStream,
StorageLockData.class);
return new StorageLockFile(data, versionId);
} catch (IOException e) {
- throw new HoodieIOException("Failed to deserialize JSON content into
ConditionalWriteLockData", e);
+ throw new HoodieIOException("Failed to deserialize JSON content into
StorageLockData", e);
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java
new file mode 100644
index 00000000000..bc5a52db71b
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
+
+public class StorageBasedLockConfig extends HoodieConfig {
+ private static final String SINCE_VERSION_1_0_2 = "1.0.2";
+ private static final String STORAGE_BASED_LOCK_PROPERTY_PREFIX =
LockConfiguration.LOCK_PREFIX
+ + "storage.";
+
+ public static final ConfigProperty<Long> VALIDITY_TIMEOUT_SECONDS =
ConfigProperty
+ .key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "validity.timeout.secs")
+ .defaultValue(TimeUnit.MINUTES.toSeconds(5))
+ .markAdvanced()
+ .sinceVersion(SINCE_VERSION_1_0_2)
+ .withDocumentation(
+ "For storage-based lock provider, the amount of time in seconds each
new lock is valid for. "
+ + "The lock provider will attempt to renew its lock until it
successfully extends the lock lease period "
+ + "or the validity timeout is reached.");
+
+ public static final ConfigProperty<Long> HEARTBEAT_POLL_SECONDS =
ConfigProperty
+ .key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "heartbeat.poll.secs")
+ .defaultValue(30L)
+ .markAdvanced()
+ .sinceVersion(SINCE_VERSION_1_0_2)
+ .withDocumentation(
+ "For storage-based lock provider, the amount of time in seconds to
wait before renewing the lock. "
+ + "Defaults to 30 seconds.");
+
+ public long getValiditySeconds() {
+ return getLong(VALIDITY_TIMEOUT_SECONDS);
+ }
+
+ public long getHeartbeatPollSeconds() {
+ return getLong(HEARTBEAT_POLL_SECONDS);
+ }
+
+ public String getHudiTableBasePath() {
+ return getString(BASE_PATH);
+ }
+
+ public static class Builder {
+ private final StorageBasedLockConfig lockConfig = new
StorageBasedLockConfig();
+
+ public StorageBasedLockConfig build() {
+ lockConfig.setDefaults(StorageBasedLockConfig.class.getName());
+ return lockConfig;
+ }
+
+ public StorageBasedLockConfig.Builder fromProperties(TypedProperties
props) {
+ lockConfig.getProps().putAll(props);
+ checkRequiredProps();
+ return this;
+ }
+
+ private void checkRequiredProps() {
+ String notExistsMsg = " does not exist!";
+ if (!lockConfig.contains(BASE_PATH)) {
+ throw new IllegalArgumentException(BASE_PATH.key() + notExistsMsg);
+ }
+ if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) <
lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS)
+ * 3) {
+ throw new IllegalArgumentException(
+ VALIDITY_TIMEOUT_SECONDS.key() + " should be more than triple " +
HEARTBEAT_POLL_SECONDS.key());
+ }
+ if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) < 5) {
+ throw new IllegalArgumentException(
+ VALIDITY_TIMEOUT_SECONDS.key() + " should be greater than or equal
to 5 seconds.");
+ }
+ if (lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS) < 1) {
+ throw new IllegalArgumentException(
+ HEARTBEAT_POLL_SECONDS.key() + " should be greater than or equal
to 1 second.");
+ }
+ }
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
new file mode 100644
index 00000000000..9e5f4a6d689
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import org.apache.hudi.client.transaction.lock.models.LockUpdateResult;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.StorageBasedLockConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test class for StorageBasedLockProvider
+ */
+class TestStorageBasedLockProvider {
+ private StorageBasedLockProvider lockProvider;
+ private StorageLock mockLockService;
+ private HeartbeatManager mockHeartbeatManager;
+ private Logger mockLogger;
+ private final String ownerId = UUID.randomUUID().toString();
+ private static final int DEFAULT_LOCK_VALIDITY_MS = 5000;
+
+ @BeforeEach
+ void setupLockProvider() {
+ mockLockService = mock(StorageLock.class);
+ mockHeartbeatManager = mock(HeartbeatManager.class);
+ mockLogger = mock(Logger.class);
+ when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+ TypedProperties props = new TypedProperties();
+ props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "5");
+ props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
+ props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-default");
+
+ lockProvider = spy(new StorageBasedLockProvider(
+ ownerId,
+ props,
+ (a,b,c) -> mockHeartbeatManager,
+ (a,b,c) -> mockLockService,
+ mockLogger));
+ }
+
+ @AfterEach
+ void cleanupLockProvider() {
+ lockProvider.close();
+ }
+
+ @Test
+ void testUnsupportedLockStorageLocation() {
+ TypedProperties props = new TypedProperties();
+ props.put(BASE_PATH.key(), "hdfs://bucket/lake/db/tbl-default");
+ LockConfiguration lockConf = new LockConfiguration(props);
+ StorageConfiguration<?> storageConf =
HoodieTestUtils.getDefaultStorageConf();
+ HoodieLockException ex = assertThrows(HoodieLockException.class,
+ () -> new StorageBasedLockProvider(lockConf, storageConf));
+ assertTrue(ex.getCause().getMessage().contains("No implementation of
StorageLock supports this scheme"));
+ }
+
+ @Test
+ void testValidLockStorageLocation() {
+ TypedProperties props = new TypedProperties();
+ props.put(BASE_PATH.key(), "s3://bucket/lake/db/tbl-default");
+
+ LockConfiguration lockConf = new LockConfiguration(props);
+ StorageConfiguration<?> storageConf =
HoodieTestUtils.getDefaultStorageConf();
+
+ HoodieLockException ex = assertThrows(HoodieLockException.class,
+ () -> new StorageBasedLockProvider(lockConf, storageConf));
+ assertTrue(ex.getMessage().contains("Failed to load and initialize
StorageLock"));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = { "gs://bucket/lake/db/tbl-default",
"s3://bucket/lake/db/tbl-default",
+ "s3a://bucket/lake/db/tbl-default" })
+ void testNonExistentWriteServiceWithDefaults(String tableBasePathString) {
+ TypedProperties props = new TypedProperties();
+ props.put(BASE_PATH.key(), tableBasePathString);
+
+ LockConfiguration lockConf = new LockConfiguration(props);
+ StorageConfiguration<?> storageConf =
HoodieTestUtils.getDefaultStorageConf();
+
+ HoodieLockException ex = assertThrows(HoodieLockException.class,
+ () -> new StorageBasedLockProvider(lockConf, storageConf));
+ assertTrue(ex.getMessage().contains("Failed to load and initialize
StorageLock"));
+ }
+
+ @Test
+ void testInvalidLocksLocationForWriteService() {
+ TypedProperties props = new TypedProperties();
+ props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-default");
+ props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "5");
+ props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
+
+ LockConfiguration lockConf = new LockConfiguration(props);
+ StorageConfiguration<?> storageConf =
HoodieTestUtils.getDefaultStorageConf();
+
+ HoodieLockException ex = assertThrows(HoodieLockException.class,
+ () -> new StorageBasedLockProvider(lockConf, storageConf));
+ Throwable cause = ex.getCause();
+ assertNotNull(cause);
+ assertInstanceOf(HoodieNotSupportedException.class, cause);
+ }
+
+ @Test
+ void testTryLockForTimeUnitThrowsOnInterrupt() throws Exception {
+ doReturn(false).when(lockProvider).tryLock();
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread t = new Thread(() -> {
+ try {
+ lockProvider.tryLock(1, TimeUnit.SECONDS);
+ } catch (HoodieLockException e) {
+ latch.countDown();
+ }
+ });
+ t.start();
+ Thread.sleep(50);
+ t.interrupt();
+ assertTrue(latch.await(2, TimeUnit.SECONDS));
+ }
+
+ @Test
+ void testTryLockForTimeUnitAcquiresLockEventually() throws Exception {
+ AtomicInteger count = new AtomicInteger(0);
+ doAnswer(inv -> count.incrementAndGet() > 2).when(lockProvider).tryLock();
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread t = new Thread(() -> {
+ assertTrue(lockProvider.tryLock(4, TimeUnit.SECONDS));
+ latch.countDown();
+ });
+ t.start();
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ void testTryLockForTimeUnitFailsToAcquireLockEventually() throws Exception {
+ AtomicInteger count = new AtomicInteger(0);
+ doAnswer(inv -> count.incrementAndGet() > 2).when(lockProvider).tryLock();
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread t = new Thread(() -> {
+ assertFalse(lockProvider.tryLock(1, TimeUnit.SECONDS));
+ latch.countDown();
+ });
+ t.start();
+ assertTrue(latch.await(2, TimeUnit.SECONDS));
+ }
+
+ @Test
+ void testTryLockSuccess() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
null));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+ boolean acquired = lockProvider.tryLock();
+ assertTrue(acquired);
+ assertEquals(realLockFile, lockProvider.getLock());
+ verify(mockLockService, atLeastOnce()).tryCreateOrUpdateLockFile(any(),
any());
+ }
+
+ @Test
+ void testTryLockSuccessButFailureToStartHeartbeat() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
null));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(false);
+ when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(),
eq(realLockFile), anyLong()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+
+ boolean acquired = lockProvider.tryLock();
+ assertFalse(acquired);
+ }
+
+ @Test
+ void testTryLockFailsFromOwnerMismatch() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
null));
+ StorageLockFile returnedLockFile = new StorageLockFile(
+ new StorageLockData(false, System.currentTimeMillis() +
DEFAULT_LOCK_VALIDITY_MS, "different-owner"), "v1");
+ when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, returnedLockFile));
+
+ HoodieLockException ex = assertThrows(HoodieLockException.class, () ->
lockProvider.tryLock());
+ assertTrue(ex.getMessage().contains("Owners do not match"));
+ }
+
+ @Test
+ void testTryLockFailsDueToExistingLock() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS,
+ "other-owner");
+ StorageLockFile existingLock = new StorageLockFile(data, "v2");
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS,
existingLock));
+
+ boolean acquired = lockProvider.tryLock();
+ assertFalse(acquired);
+ }
+
+ @Test
+ void testTryLockFailsToUpdateFile() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
null));
+ when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+ .thenReturn(Pair.of(LockUpdateResult.ACQUIRED_BY_OTHERS, null));
+ assertFalse(lockProvider.tryLock());
+ }
+
+ @Test
+ void testTryLockFailsDueToUnknownState() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.UNKNOWN_ERROR,
null));
+ assertFalse(lockProvider.tryLock());
+ }
+
+ @Test
+ void testTryLockSucceedsWhenExistingLockExpiredByTime() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
+ "other-owner");
+ StorageLockFile existingLock = new StorageLockFile(data, "v2");
+ StorageLockData newData = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS,
+ ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(newData, "v1");
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS,
existingLock));
+ when(mockLockService.tryCreateOrUpdateLockFile(any(), eq(existingLock)))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+ boolean acquired = lockProvider.tryLock();
+ assertTrue(acquired);
+ }
+
+ @Test
+ void testTryLockReentrancySucceeds() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
null));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+ boolean acquired = lockProvider.tryLock();
+ assertTrue(acquired);
+ // Re-entrancy succeeds
+ assertTrue(lockProvider.tryLock());
+ }
+
+ @Test
+ void testTryLockReentrancyAfterLockExpiredByTime() {
+ // In an extremely unlikely scenario, we could have a local reference to a
lock
+ // which is present but expired,
+ // and because we were unable to stop the heartbeat properly, we did not
+ // successfully set it to null.
+ // Due to the nature of the heartbeat manager, this is expected to
introduce
+ // some delay, but not be permanently blocking.
+ // There are a few variations of this edge case, so we must test them all.
+
+ // Here the lock is still "unexpired" but the time shows expired.
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile expiredLock = new StorageLockFile(data, "v1");
+ doReturn(expiredLock).when(lockProvider).getLock();
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
null));
+ StorageLockData validData = new StorageLockData(false,
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
+ ownerId);
+ StorageLockFile validLock = new StorageLockFile(validData, "v2");
+ when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, validLock));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+ assertTrue(lockProvider.tryLock());
+ }
+
+ @Test
+ void testTryLockReentrancyAfterLockSetExpired() {
+ // In an extremely unlikely scenario, we could have a local reference to a
lock
+ // which is present but expired,
+ // and because we were unable to stop the heartbeat properly, we did not
+ // successfully set it to null.
+ // Due to the nature of the heartbeat manager, this is expected to
introduce
+ // some delay, but not be permanently blocking.
+ // There are a few variations of this edge case, so we must test them all.
+
+ // Here the lock is "expired" but the time shows unexpired.
+ StorageLockData data = new StorageLockData(true,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile expiredLock = new StorageLockFile(data, "v1");
+ doReturn(expiredLock).when(lockProvider).getLock();
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
null));
+ StorageLockData validData = new StorageLockData(false,
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
+ ownerId);
+ StorageLockFile validLock = new StorageLockFile(validData, "v2");
+ when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, validLock));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+ assertTrue(lockProvider.tryLock());
+ }
+
+ @Test
+ void testTryLockHeartbeatStillActive() {
+ // In an extremely unlikely scenario, we could have a local reference to a
lock
+ // which is present but expired,
+ // and because we were unable to stop the heartbeat properly, we did not
+ // successfully set it to null.
+ // Due to the nature of the heartbeat manager, this is expected to
introduce
+ // some delay, but not be permanently blocking.
+ // There are a few variations of this edge case, so we must test them all.
+
+ // Here the heartbeat is still active, so we have to error out.
+ StorageLockData data = new StorageLockData(true,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile expiredLock = new StorageLockFile(data, "v1");
+ doReturn(expiredLock).when(lockProvider).getLock();
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
null));
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+ assertThrows(HoodieLockException.class, () -> lockProvider.tryLock());
+ }
+
+ @Test
+ void testUnlockSucceedsAndReentrancy() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
null));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+ when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+ when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(),
eq(realLockFile), anyLong()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS,
+ new StorageLockFile(new StorageLockData(true,
data.getValidUntil(), ownerId), "v2")));
+ assertTrue(lockProvider.tryLock());
+ when(mockHeartbeatManager.hasActiveHeartbeat())
+ .thenReturn(true) // when we try to stop the heartbeat we will check
if heartbeat is active return true.
+ .thenReturn(false); // when try to set lock to expire we will assert
no active heartbeat as a precondition.
+ lockProvider.unlock();
+ assertNull(lockProvider.getLock());
+ lockProvider.unlock();
+ }
+
+ @Test
+ void testUnlockFailsToStopHeartbeat() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
null));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+ assertTrue(lockProvider.tryLock());
+ when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(false);
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+ assertThrows(HoodieLockException.class, () -> lockProvider.unlock());
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+ }
+
+ @Test
+ void testCloseFailsToStopHeartbeat() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
null));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+ assertTrue(lockProvider.tryLock());
+ when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(false);
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+ // Should wrap the exception and log error.
+ lockProvider.close();
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+ }
+
+ @Test
+ void testRenewLockReturnsFalseWhenNoLockHeld() {
+ doReturn(null).when(lockProvider).getLock();
+ assertFalse(lockProvider.renewLock());
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+ verify(mockLogger).warn("Owner {}: Cannot renew, no lock held by this
process", this.ownerId);
+ }
+
+ @Test
+ void testRenewLockWithoutHoldingLock() {
+ doReturn(null).when(lockProvider).getLock();
+ assertFalse(lockProvider.renewLock());
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+ verify(mockLogger).warn("Owner {}: Cannot renew, no lock held by this
process", this.ownerId);
+ }
+
+ @Test
+ void testRenewLockWithFullyExpiredLock() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile nearExpiredLockFile = new StorageLockFile(data, "v1");
+ doReturn(nearExpiredLockFile).when(lockProvider).getLock();
+ when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(),
eq(nearExpiredLockFile), anyLong()))
+ .thenReturn(Pair.of(LockUpdateResult.ACQUIRED_BY_OTHERS, null));
+ assertFalse(lockProvider.renewLock());
+ verify(mockLogger).error("Owner {}: Unable to renew lock as it is acquired
by others.", this.ownerId);
+ }
+
+ @Test
+ void testRenewLockUnableToUpsertLockFileButNotFatal() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ doReturn(lockFile).when(lockProvider).getLock();
+ // Signal the upsert attempt failed, but may be transient. See interface
for
+ // more details.
+ when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(),
eq(lockFile), anyLong()))
+ .thenReturn(Pair.of(LockUpdateResult.UNKNOWN_ERROR, null));
+ assertTrue(lockProvider.renewLock());
+ }
+
+ @Test
+ void testRenewLockUnableToUpsertLockFileFatal() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ doReturn(lockFile).when(lockProvider).getLock();
+ // Signal the upsert attempt failed, but may be transient. See interface
for
+ // more details.
+ when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(),
eq(lockFile), anyLong()))
+ .thenReturn(Pair.of(LockUpdateResult.UNKNOWN_ERROR, null));
+ // renewLock return true so it will be retried.
+ assertTrue(lockProvider.renewLock());
+
+ verify(mockLogger).warn("Owner {}: Unable to renew lock due to unknown
error, could be transient.", this.ownerId);
+ }
+
+ @Test
+ void testRenewLockSucceedsButRenewalWithinExpirationWindow() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ doReturn(lockFile).when(lockProvider).getLock();
+
+ StorageLockData nearExpirationData = new StorageLockData(false,
System.currentTimeMillis(), ownerId);
+ StorageLockFile lockFileNearExpiration = new
StorageLockFile(nearExpirationData, "v2");
+ when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(),
eq(lockFile), anyLong()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, lockFileNearExpiration));
+
+ // We used to fail in this case before, but since we are only modifying a
single
+ // lock file, this is ok now.
+ // Therefore, this can be a happy path variation.
+ assertTrue(lockProvider.renewLock());
+ }
+
+ @Test
+ void testRenewLockSucceeds() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ doReturn(lockFile).when(lockProvider).getLock();
+
+ StorageLockData successData = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS,
+ ownerId);
+ StorageLockFile successLockFile = new StorageLockFile(successData, "v2");
+ when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(),
eq(lockFile), anyLong()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, successLockFile));
+ assertTrue(lockProvider.renewLock());
+
+ verify(mockLogger).info(
+ eq("Owner {}: Lock renewal successful. The renewal completes {} ms
before expiration for lock {}."),
+ eq(this.ownerId), anyLong(),
eq("gs://bucket/lake/db/tbl-default/.hoodie/.locks"));
+ }
+
+ @Test
+ void testRenewLockFails() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ doReturn(lockFile).when(lockProvider).getLock();
+
+ when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(),
eq(lockFile), anyLong()))
+ .thenThrow(new RuntimeException("Failure"));
+ assertFalse(lockProvider.renewLock());
+
+ verify(mockLogger).error(eq("Owner {}: Exception occurred while renewing
lock"), eq(ownerId),
+ any(RuntimeException.class));
+ }
+
+ @Test
+ void testCloseCallsDependencies() throws Exception {
+ lockProvider.close();
+ verify(mockLockService, atLeastOnce()).close();
+ verify(mockHeartbeatManager, atLeastOnce()).close();
+ assertNull(lockProvider.getLock());
+ }
+
+ @Test
+ void testCloseWithErrorForLockService() throws Exception {
+ doThrow(new RuntimeException("Some
failure")).when(mockLockService).close();
+ lockProvider.close();
+ verify(mockLogger).error(eq("Owner {}: Lock service failed to close."),
eq(ownerId), any(RuntimeException.class));
+ assertNull(lockProvider.getLock());
+ }
+
+ @Test
+ void testCloseWithErrorForHeartbeatManager() throws Exception {
+ doThrow(new RuntimeException("Some
failure")).when(mockHeartbeatManager).close();
+ lockProvider.close();
+ verify(mockLogger).error(eq("Owner {}: Heartbeat manager failed to
close."), eq(ownerId),
+ any(RuntimeException.class));
+ assertNull(lockProvider.getLock());
+ }
+
+ @Test
+ public void testShutdownHookViaReflection() throws Exception {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
null));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+ boolean acquired = lockProvider.tryLock();
+ assertTrue(acquired);
+ assertEquals(realLockFile, lockProvider.getLock());
+ verify(mockLockService, atLeastOnce()).tryCreateOrUpdateLockFile(any(),
any());
+
+ when(mockLockService.tryCreateOrUpdateLockFile(any(StorageLockData.class),
eq(realLockFile)))
+ .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+
+ // Mock shutdown
+ Method shutdownMethod =
lockProvider.getClass().getDeclaredMethod("shutdown", boolean.class);
+ shutdownMethod.setAccessible(true);
+ shutdownMethod.invoke(lockProvider, true);
+
+ // Verify that the expected shutdown behaviors occurred.
+ assertNull(lockProvider.getLock());
+ // We do not execute additional actions
+ verify(mockLockService, never()).close();
+ verify(mockHeartbeatManager, never()).close();
+ }
+
+ @Test
+ public void testShutdownHookWhenNoLockPresent() throws Exception {
+ // Now, when calling shutdown(true), the method should immediately return.
+ Method shutdownMethod =
lockProvider.getClass().getDeclaredMethod("shutdown", boolean.class);
+ shutdownMethod.setAccessible(true);
+ shutdownMethod.invoke(lockProvider, true);
+
+ // Verify that unlock or close methods are NOT invoked, or adjust
expectations accordingly.
+ verify(mockLockService, never()).close();
+ verify(mockHeartbeatManager, never()).close();
+ }
+
+ public static class StubStorageLock implements StorageLock {
+ public StubStorageLock(String arg1, String arg2, String arg3) {
+ // No-op constructor for reflection
+ }
+
+ @Override
+ public Pair<LockUpdateResult, StorageLockFile>
tryCreateOrUpdateLockFile(StorageLockData newLockData,
+ StorageLockFile previousLockFile) {
+ return null;
+ }
+
+ @Override
+ public Pair<LockUpdateResult, StorageLockFile>
tryCreateOrUpdateLockFileWithRetry(
+ Supplier<StorageLockData> newLockDataSupplier,
+ StorageLockFile previousLockFile,
+ long retryExpiration) {
+ return null;
+ }
+
+ @Override
+ public Pair<LockGetResult, StorageLockFile> readCurrentLockFile() {
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // stub, no-op
+ }
+ }
+
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java
new file mode 100644
index 00000000000..3be7e5550b2
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestStorageBasedLockConfig {
+
+ @Test
+ void testDefaultValues() {
+ // Asserts that the defaults are correct
+ TypedProperties props = new TypedProperties();
+ props.setProperty(BASE_PATH.key(), "s3://hudi-bucket/table/basepath");
+
+ StorageBasedLockConfig.Builder builder = new
StorageBasedLockConfig.Builder();
+ StorageBasedLockConfig config = builder
+ .fromProperties(props)
+ .build();
+
+ assertEquals(5 * 60, config.getValiditySeconds(), "Default lock validity
should be 5 minutes");
+ assertEquals(30, config.getHeartbeatPollSeconds(), "Default heartbeat poll
time should be 30 seconds");
+ }
+
+ @Test
+ void testCustomValues() {
+ // Testing that custom values which differ from defaults can be read
properly
+ TypedProperties props = new TypedProperties();
+ props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
"120");
+ props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(),
"10");
+ props.setProperty(BASE_PATH.key(), "/hudi/table/basepath");
+
+ StorageBasedLockConfig config = new StorageBasedLockConfig.Builder()
+ .fromProperties(props)
+ .build();
+
+ assertEquals(120, config.getValiditySeconds());
+ assertEquals(10, config.getHeartbeatPollSeconds());
+ assertEquals("/hudi/table/basepath", config.getHudiTableBasePath());
+ }
+
+ @Test
+ void testBasePathPropertiesValidation() {
+ // Tests that validations around the base path are present.
+ TypedProperties props = new TypedProperties();
+ props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
"120");
+ StorageBasedLockConfig.Builder propsBuilder = new
StorageBasedLockConfig.Builder();
+
+ // Missing base path
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class,
+ () -> propsBuilder.fromProperties(props));
+ assertTrue(exception.getMessage().contains(BASE_PATH.key()));
+ }
+
+ @Test
+ void testTimeThresholds() {
+ // Ensure that validations which restrict the time-based inputs are
working.
+ TypedProperties props = new TypedProperties();
+ props.setProperty(BASE_PATH.key(), "/hudi/table/basepath");
+ // Invalid config case: validity timeout is less than triple of heartbeat
poll period
+ props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
"5");
+ props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(),
"3");
+ StorageBasedLockConfig.Builder propsBuilder = new
StorageBasedLockConfig.Builder();
+
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class,
+ () -> propsBuilder.fromProperties(props));
+
assertTrue(exception.getMessage().contains(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key()));
+ // Invalid config case: validity timeout is less than 5 seconds
+ props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
"4");
+ props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(),
"1");
+ exception = assertThrows(IllegalArgumentException.class, () ->
propsBuilder.fromProperties(props));
+
assertTrue(exception.getMessage().contains(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key()));
+ // Invalid config case: heartbeat poll period is less than 1 second
+ props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
"5");
+ props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(),
"0");
+ exception = assertThrows(IllegalArgumentException.class, () ->
propsBuilder.fromProperties(props));
+
assertTrue(exception.getMessage().contains(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key()));
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 4c0e947d73b..c42024f42eb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -131,6 +131,8 @@ public class HoodieTableMetaClient implements Serializable {
public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH =
BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + StoragePath.SEPARATOR + ".fileids";
+ public static final String LOCKS_FOLDER_NAME = METAFOLDER_NAME +
StoragePath.SEPARATOR + ".locks";
+
public static final String SCHEMA_FOLDER_NAME = ".schema";
public static final String MARKER_EXTN = ".marker";
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
index e718bc21ed6..25fe3e34c6f 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
@@ -39,6 +39,15 @@ public class TestStorageSchemes {
assertTrue(StorageSchemes.isSchemeSupported("afs"));
assertFalse(StorageSchemes.isSchemeSupported("s2"));
+ for (StorageSchemes scheme : StorageSchemes.values()) {
+ String schemeName = scheme.getScheme();
+ if (scheme.getScheme().startsWith("s3")) {
+
assertTrue(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent());
+ } else {
+
assertFalse(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent());
+ }
+ }
+
assertTrue(StorageSchemes.isAtomicCreationSupported("file"));
assertTrue(StorageSchemes.isAtomicCreationSupported("hdfs"));
assertFalse(StorageSchemes.isAtomicCreationSupported("afs"));
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
index ca795917f98..aabb50104eb 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
@@ -19,6 +19,9 @@
package org.apache.hudi.storage;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+
import java.util.Arrays;
import java.util.Set;
import java.util.HashSet;
@@ -28,65 +31,66 @@ import java.util.HashSet;
*/
public enum StorageSchemes {
// Local filesystem
- FILE("file", false, true),
+ FILE("file", false, true, null),
// Hadoop File System
- HDFS("hdfs", false, true),
+ HDFS("hdfs", false, true, null),
// Baidu Advanced File System
- AFS("afs", null, null),
+ AFS("afs", null, null, null),
// Mapr File System
- MAPRFS("maprfs", null, null),
+ MAPRFS("maprfs", null, null, null),
// Apache Ignite FS
- IGNITE("igfs", null, null),
+ IGNITE("igfs", null, null, null),
// AWS S3
- S3A("s3a", true, null),
- S3("s3", true, null),
+ S3A("s3a", true, null, "org.apache.hudi.aws.transaction.lock.S3StorageLock"),
+ S3("s3", true, null, "org.apache.hudi.aws.transaction.lock.S3StorageLock"),
// Google Cloud Storage
- GCS("gs", true, null),
+ GCS("gs", true, null, null),
// Azure WASB
- WASB("wasb", null, null),
- WASBS("wasbs", null, null),
+ WASB("wasb", null, null, null),
+ WASBS("wasbs", null, null, null),
// Azure ADLS
- ADL("adl", null, null),
+ ADL("adl", null, null, null),
// Azure ADLS Gen2
- ABFS("abfs", null, null),
- ABFSS("abfss", null, null),
+ ABFS("abfs", null, null, null),
+ ABFSS("abfss", null, null, null),
// Aliyun OSS
- OSS("oss", null, null),
- // View FS for federated setups. If federating across cloud stores, then
append support is false
+ OSS("oss", null, null, null),
+ // View FS for federated setups. If federating across cloud stores, then
append
+ // support is false
// View FS support atomic creation
- VIEWFS("viewfs", null, true),
- //ALLUXIO
- ALLUXIO("alluxio", null, null),
+ VIEWFS("viewfs", null, true, null),
+ // ALLUXIO
+ ALLUXIO("alluxio", null, null, null),
// Tencent Cloud Object Storage
- COSN("cosn", null, null),
+ COSN("cosn", null, null, null),
// Tencent Cloud HDFS
- CHDFS("ofs", null, null),
+ CHDFS("ofs", null, null, null),
// Tencent Cloud CacheFileSystem
- GOOSEFS("gfs", null, null),
+ GOOSEFS("gfs", null, null, null),
// Databricks file system
- DBFS("dbfs", null, null),
+ DBFS("dbfs", null, null, null),
// IBM Cloud Object Storage
- COS("cos", null, null),
+ COS("cos", null, null, null),
// Huawei Cloud Object Storage
- OBS("obs", null, null),
+ OBS("obs", null, null, null),
// Kingsoft Standard Storage ks3
- KS3("ks3", null, null),
+ KS3("ks3", null, null, null),
// Netease Object Storage nos
- NOS("nos", null, null),
+ NOS("nos", null, null, null),
// JuiceFileSystem
- JFS("jfs", null, null),
+ JFS("jfs", null, null, null),
// Baidu Object Storage
- BOS("bos", null, null),
+ BOS("bos", null, null, null),
// Oracle Cloud Infrastructure Object Storage
- OCI("oci", null, null),
+ OCI("oci", null, null, null),
// Volcengine Object Storage
- TOS("tos", null, null),
+ TOS("tos", null, null, null),
// Volcengine Cloud HDFS
- CFS("cfs", null, null),
+ CFS("cfs", null, null, null),
// Aliyun Apsara File Storage for HDFS
- DFS("dfs", false, true),
+ DFS("dfs", false, true, null),
// Hopsworks File System
- HOPSFS("hopsfs", false, true);
+ HOPSFS("hopsfs", false, true, null);
// list files may bring pressure to storage with centralized meta service
like HDFS.
// when we want to get only part of files under a directory rather than all
files, use getStatus may be more friendly than listStatus.
@@ -98,11 +102,17 @@ public enum StorageSchemes {
private final Boolean isWriteTransactional;
// null for uncertain if dfs support atomic create&delete, please update
this for each FS
private final Boolean supportAtomicCreation;
+ private final String storageLockClass;
- StorageSchemes(String scheme, Boolean isWriteTransactional, Boolean
supportAtomicCreation) {
+ StorageSchemes(
+ String scheme,
+ Boolean isWriteTransactional,
+ Boolean supportAtomicCreation,
+ String storageLockClass) {
this.scheme = scheme;
this.isWriteTransactional = isWriteTransactional;
this.supportAtomicCreation = supportAtomicCreation;
+ this.storageLockClass = storageLockClass;
}
public String getScheme() {
@@ -117,6 +127,14 @@ public enum StorageSchemes {
return supportAtomicCreation != null && supportAtomicCreation;
}
+ public boolean implementsStorageLock() {
+ return !StringUtils.isNullOrEmpty(storageLockClass);
+ }
+
+ public String getStorageLockClass() {
+ return storageLockClass;
+ }
+
public static boolean isSchemeSupported(String scheme) {
return Arrays.stream(values()).anyMatch(s -> s.getScheme().equals(scheme));
}
@@ -143,4 +161,12 @@ public enum StorageSchemes {
return LIST_STATUS_FRIENDLY_SCHEMES.contains(scheme);
}
+
+ public static Option<StorageSchemes>
getStorageLockImplementationIfExists(String scheme) {
+ if (!isSchemeSupported(scheme)) {
+ throw new IllegalArgumentException("Unsupported scheme :" + scheme);
+ }
+ return Option.fromJavaOptional(Arrays.stream(StorageSchemes.values())
+ .filter(s -> s.implementsStorageLock() &&
s.scheme.equals(scheme)).findFirst());
+ }
}