This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dc2df53f48 [HUDI-9158] Add storage-based lock provider abstract 
implementation (#13103)
3dc2df53f48 is described below

commit 3dc2df53f48ee8032894ee3ab0f54f5f046e0c02
Author: Alex R <[email protected]>
AuthorDate: Fri Apr 11 03:17:03 2025 -0700

    [HUDI-9158] Add storage-based lock provider abstract implementation (#13103)
    
    * Add initial changes for storage-based lock provider abstract 
implementation
    
    ---------
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../transaction/lock/StorageBasedLockProvider.java | 531 ++++++++++++++++++
 .../lock/models/LockProviderHeartbeatManager.java  |   2 +-
 .../transaction/lock/models/StorageLockFile.java   |  10 +-
 .../apache/hudi/config/StorageBasedLockConfig.java | 100 ++++
 .../lock/TestStorageBasedLockProvider.java         | 624 +++++++++++++++++++++
 .../hudi/config/TestStorageBasedLockConfig.java    | 101 ++++
 .../hudi/common/table/HoodieTableMetaClient.java   |   2 +
 .../apache/hudi/common/fs/TestStorageSchemes.java  |   9 +
 .../org/apache/hudi/storage/StorageSchemes.java    |  94 ++--
 9 files changed, 1433 insertions(+), 40 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
new file mode 100644
index 00000000000..ba9dfb9a3fa
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import 
org.apache.hudi.client.transaction.lock.models.LockProviderHeartbeatManager;
+import org.apache.hudi.client.transaction.lock.models.LockUpdateResult;
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.StorageBasedLockConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StorageSchemes;
+
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.net.URI;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.lock.LockState.ACQUIRED;
+import static org.apache.hudi.common.lock.LockState.ACQUIRING;
+import static org.apache.hudi.common.lock.LockState.FAILED_TO_ACQUIRE;
+import static org.apache.hudi.common.lock.LockState.FAILED_TO_RELEASE;
+import static org.apache.hudi.common.lock.LockState.RELEASED;
+import static org.apache.hudi.common.lock.LockState.RELEASING;
+import static 
org.apache.hudi.common.table.HoodieTableMetaClient.LOCKS_FOLDER_NAME;
+
+/**
+ * A distributed filesystem storage based lock provider. This {@link 
LockProvider} implementation
+ * leverages conditional writes to ensure transactional consistency for 
multi-writer scenarios.
+ * The underlying storage client interface {@link StorageLock} is pluggable so 
it can be implemented for any
+ * filesystem which supports conditional writes.
+ */
+@ThreadSafe
+public class StorageBasedLockProvider implements LockProvider<StorageLockFile> 
{
+
+  public static final String DEFAULT_TABLE_LOCK_FILE_NAME = "table_lock";
+  // How long to wait before retrying lock acquisition in blocking calls.
+  private static final long DEFAULT_LOCK_ACQUISITION_BUFFER_MS = 1000;
+  // Maximum expected clock drift between two nodes.
+  // This is similar idea as SkewAdjustingTimeGenerator.
+  // In reality, within a single cloud provider all nodes share the same ntp
+  // server
+  // therefore we do not expect drift more than a few ms.
+  // However, since our lock leases are pretty long, we can use a high buffer.
+  private static final long CLOCK_DRIFT_BUFFER_MS = 500;
+
+  // When we retry lock upserts, do so 5 times
+  private static final long LOCK_UPSERT_RETRY_COUNT = 5;
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StorageBasedLockProvider.class);
+
+  // Use for testing
+  private final Logger logger;
+
+  // The lock service implementation which interacts with storage
+  private final StorageLock lockService;
+
+  private final long validitySeconds;
+  private final String ownerId;
+  private final String lockFilePath;
+  private final HeartbeatManager heartbeatManager;
+  private final transient Thread shutdownThread;
+
+  @GuardedBy("this")
+  private StorageLockFile currentLockObj = null;
+  @GuardedBy("this")
+  private boolean isClosed = false;
+
+  private synchronized void setLock(StorageLockFile lockObj) {
+    if (lockObj != null && !Objects.equals(lockObj.getOwner(), this.ownerId)) {
+      throw new HoodieLockException("Owners do not match. Current lock owner: 
" + this.ownerId + " lock path: "
+          + this.lockFilePath + " owner: " + lockObj.getOwner());
+    }
+    this.currentLockObj = lockObj;
+  }
+
+  /**
+   * Default constructor for StorageBasedLockProvider, required by LockManager
+   * to instantiate it using reflection.
+   * 
+   * @param lockConfiguration The lock configuration, should be transformable 
into
+   *                          StorageBasedLockConfig
+   * @param conf              Storage config, ignored.
+   */
+  public StorageBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf) {
+    this(
+            UUID.randomUUID().toString(),
+            lockConfiguration.getConfig(),
+            LockProviderHeartbeatManager::new,
+            getLockService(),
+            LOGGER);
+  }
+
+  private static Functions.Function3<String, String, TypedProperties, 
StorageLock> getLockService() {
+    return (ownerId, lockFilePath, lockConfig) -> {
+      try {
+        return (StorageLock) ReflectionUtils.loadClass(
+                getLockServiceClassName(new URI(lockFilePath).getScheme()),
+                new Class<?>[]{String.class, String.class, String.class, 
Properties.class},
+                new Object[]{ownerId, lockFilePath, lockConfig});
+      } catch (Throwable e) {
+        throw new HoodieLockException("Failed to load and initialize 
StorageLock", e);
+      }
+    };
+  }
+
+  private static @NotNull String getLockServiceClassName(String scheme) {
+    Option<StorageSchemes> schemeOptional = 
StorageSchemes.getStorageLockImplementationIfExists(scheme);
+    if (schemeOptional.isPresent()) {
+      return schemeOptional.get().getStorageLockClass();
+    } else {
+      throw new HoodieNotSupportedException("No implementation of StorageLock 
supports this scheme: " + scheme);
+    }
+  }
+
+  @VisibleForTesting
+  StorageBasedLockProvider(
+      String ownerId,
+      TypedProperties properties,
+      Functions.Function3<String, Long, Supplier<Boolean>, HeartbeatManager> 
heartbeatManagerLoader,
+      Functions.Function3<String, String, TypedProperties, StorageLock> 
lockServiceLoader,
+      Logger logger) {
+    StorageBasedLockConfig config = new 
StorageBasedLockConfig.Builder().fromProperties(properties).build();
+    long heartbeatPollSeconds = config.getHeartbeatPollSeconds();
+    this.validitySeconds = config.getValiditySeconds();
+    this.lockFilePath = String.format("%s%s%s", config.getHudiTableBasePath(), 
StoragePath.SEPARATOR, LOCKS_FOLDER_NAME);
+    this.heartbeatManager = heartbeatManagerLoader.apply(ownerId, 
heartbeatPollSeconds * 1000, this::renewLock);
+    this.lockService = lockServiceLoader.apply(ownerId, lockFilePath, 
properties);
+    this.ownerId = ownerId;
+    this.logger = logger;
+    shutdownThread = new Thread(() -> shutdown(true));
+    Runtime.getRuntime().addShutdownHook(shutdownThread);
+    logger.info("Instantiated new storage-based lock provider, owner: {}, 
lockfilePath: {}", ownerId, lockFilePath);
+  }
+
+  // -----------------------------------------
+  // BASE METHODS
+  // -----------------------------------------
+
+  @Override
+  public synchronized StorageLockFile getLock() {
+    return currentLockObj;
+  }
+
+  /**
+   * Attempts to acquire the lock within the given timeout.
+   */
+  @Override
+  public boolean tryLock(long time, TimeUnit unit) {
+    long deadlineNanos = System.nanoTime() + unit.toNanos(time);
+
+    while (System.nanoTime() < deadlineNanos) {
+      try {
+        logDebugLockState(ACQUIRING);
+        if (tryLock()) {
+          return true;
+        }
+        Thread.sleep(DEFAULT_LOCK_ACQUISITION_BUFFER_MS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new 
HoodieLockException(generateLockStateMessage(LockState.FAILED_TO_ACQUIRE), e);
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public synchronized void close() {
+    shutdown(false);
+  }
+
+  private synchronized void shutdown(boolean fromShutdownHook) {
+    if (fromShutdownHook) {
+      // Try to expire the lock from the shutdown hook.
+      if (!isClosed && actuallyHoldsLock()) {
+        tryExpireCurrentLock(true);
+      }
+      // Do not execute any further actions
+      return;
+    } else {
+      Runtime.getRuntime().removeShutdownHook(shutdownThread);
+    }
+    try {
+      this.unlock();
+    } catch (Exception e) {
+      logger.error("Owner {}: Failed to unlock current lock.", ownerId, e);
+    }
+    try {
+      this.lockService.close();
+    } catch (Exception e) {
+      logger.error("Owner {}: Lock service failed to close.", ownerId, e);
+    }
+    try {
+      this.heartbeatManager.close();
+    } catch (Exception e) {
+      logger.error("Owner {}: Heartbeat manager failed to close.", ownerId, e);
+    }
+
+    this.isClosed = true;
+  }
+
+  private synchronized boolean isLockStillValid(StorageLockFile lock) {
+    return !lock.isExpired() && 
!isCurrentTimeCertainlyOlderThanDistributedTime(lock.getValidUntil());
+  }
+
+  /**
+   * Attempts a single pass to acquire the lock (non-blocking).
+   * 
+   * @return true if lock acquired, false otherwise
+   */
+  @Override
+  public synchronized boolean tryLock() {
+    assertHeartbeatManagerExists();
+    assertUnclosed();
+    logDebugLockState(ACQUIRING);
+    if (actuallyHoldsLock()) {
+      // Supports reentrant locks
+      return true;
+    }
+
+    if (this.heartbeatManager.hasActiveHeartbeat()) {
+      logger.error("Detected broken invariant: there is an active heartbeat 
without a lock being held.");
+      // Breach of object invariant - we should never have an active heartbeat 
without
+      // holding a lock.
+      throw new 
HoodieLockException(generateLockStateMessage(FAILED_TO_ACQUIRE));
+    }
+
+    Pair<LockGetResult, StorageLockFile> latestLock = 
this.lockService.readCurrentLockFile();
+    if (latestLock.getLeft() == LockGetResult.UNKNOWN_ERROR) {
+      logInfoLockState(FAILED_TO_ACQUIRE, "Failed to get the latest lock 
status");
+      // We were not able to determine whether a lock was present.
+      return false;
+    }
+
+    if (latestLock.getLeft() == LockGetResult.SUCCESS && 
isLockStillValid(latestLock.getRight())) {
+      String msg = String.format("Lock already held by %s", 
latestLock.getRight().getOwner());
+      // Lock held by others.
+      logInfoLockState(FAILED_TO_ACQUIRE, msg);
+      return false;
+    }
+
+    // Try to acquire the lock
+    StorageLockData newLockData = new StorageLockData(false, 
System.currentTimeMillis() + validitySeconds, ownerId);
+    Pair<LockUpdateResult, StorageLockFile> lockUpdateStatus = 
this.lockService.tryCreateOrUpdateLockFile(
+        newLockData, latestLock.getLeft() == LockGetResult.NOT_EXISTS ? null : 
latestLock.getRight());
+    if (lockUpdateStatus.getLeft() != LockUpdateResult.SUCCESS) {
+      // failed to acquire the lock, indicates concurrent contention
+      logInfoLockState(FAILED_TO_ACQUIRE);
+      return false;
+    }
+    this.setLock(lockUpdateStatus.getRight());
+
+    // There is a remote chance that
+    // - after lock is acquired but before heartbeat starts the lock is 
expired.
+    // - lock is acquired and heartbeat is up yet it does not run timely 
before the
+    // lock is expired
+    // It is mitigated by setting the lock validity period to a reasonably long
+    // period to survive until heartbeat comes, plus
+    // set the heartbeat interval relatively small enough.
+    if 
(!this.heartbeatManager.startHeartbeatForThread(Thread.currentThread())) {
+      // Precondition "no active heartbeat" is checked previously, so when
+      // startHeartbeatForThread returns false,
+      // we are confident no heartbeat thread is running.
+      logErrorLockState(RELEASING, "We were unable to start the heartbeat!");
+      tryExpireCurrentLock(false);
+      return false;
+    }
+
+    logInfoLockState(ACQUIRED);
+    return true;
+  }
+
+  /**
+   * Determines whether this provider currently holds a valid lock.
+   *
+   * <p>
+   * This method checks both the existence of a lock object and its validity. A
+   * lock is considered
+   * valid only if it exists and has not expired according to its timestamp.
+   *
+   * @return {@code true} if this provider holds a valid lock, {@code false}
+   *         otherwise
+   */
+  private boolean actuallyHoldsLock() {
+    return believesLockMightBeHeld() && isLockStillValid(getLock());
+  }
+
+  /**
+   * Checks if this provider has a non-null lock object reference.
+   *
+   * <p>
+   * A non-null lock object indicates that this provider has previously
+   * **successfully** acquired a lock via
+   * StorageBasedLockProvider##lock and has not yet **successfully** released
+   * it via StorageBasedLockProvider#unlock().
+   * It is merely an indicator that the lock might be held by this provider. To
+   * truly certify we are the owner of the lock,
+   * StorageBasedLockProvider#actuallyHoldsLock should be used.
+   *
+   * @return {@code true} if this provider has a non-null lock object,
+   *         {@code false} otherwise
+   * @see StorageBasedLockProvider#actuallyHoldsLock()
+   */
+  private boolean believesLockMightBeHeld() {
+    return this.getLock() != null;
+  }
+
+  /**
+   * Unlock by marking our current lock file "expired": true.
+   */
+  @Override
+  public synchronized void unlock() {
+    assertHeartbeatManagerExists();
+    if (!believesLockMightBeHeld()) {
+      return;
+    }
+    boolean believesNoLongerHoldsLock = true;
+
+    // Try to stop the heartbeat first
+    if (heartbeatManager.hasActiveHeartbeat()) {
+      logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId);
+      believesNoLongerHoldsLock &= heartbeatManager.stopHeartbeat(true);
+    }
+
+    // Then expire the current lock.
+    believesNoLongerHoldsLock &= tryExpireCurrentLock(false);
+    if (!believesNoLongerHoldsLock) {
+      throw new 
HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE));
+    }
+  }
+
+  private void assertHeartbeatManagerExists() {
+    if (heartbeatManager == null) {
+      // broken function precondition.
+      throw new HoodieLockException("Unexpected null heartbeatManager");
+    }
+  }
+
+  private void assertUnclosed() {
+    if (this.isClosed) {
+      throw new HoodieLockException("Lock provider already closed");
+    }
+  }
+
+  /**
+   * Tries to expire the currently held lock.
+   * @param fromShutdownHook Whether we are attempting best effort quick 
unlock from shutdown hook.
+   * @return True if we were successfully able to upload an expired lock.
+   */
+  private synchronized boolean tryExpireCurrentLock(boolean fromShutdownHook) {
+    // It does not make sense to have heartbeat alive extending the lock lease 
while
+    // here we are trying
+    // to expire the lock.
+    if (!fromShutdownHook && heartbeatManager.hasActiveHeartbeat()) {
+      // broken function precondition.
+      throw new HoodieLockException("Must stop heartbeat before expire lock 
file");
+    }
+    logDebugLockState(RELEASING);
+    // Upload metadata that will unlock this lock.
+    StorageLockData expiredLockData = new StorageLockData(true, 
this.getLock().getValidUntil(), ownerId);
+    Pair<LockUpdateResult, StorageLockFile> result;
+    if (fromShutdownHook) {
+      // Only try once for shutdown hook, then return immediately
+      result = this.lockService.tryCreateOrUpdateLockFile(expiredLockData, 
this.getLock());
+    } else {
+      result = this.lockService.tryCreateOrUpdateLockFileWithRetry(
+              () -> expiredLockData,
+              this.getLock(),
+              // Keep retrying for the normal validity time.
+              LOCK_UPSERT_RETRY_COUNT);
+    }
+    switch (result.getLeft()) {
+      case UNKNOWN_ERROR:
+        // Here we do not know the state of the lock.
+        logErrorLockState(FAILED_TO_RELEASE, "Lock state is unknown.");
+        return false;
+      case SUCCESS:
+        logInfoLockState(RELEASED);
+        setLock(null);
+        return true;
+      case ACQUIRED_BY_OTHERS:
+        // As we are confident no lock is held by itself, clean up the cached 
lock object.
+        // However this is an edge case, so warn.
+        logWarnLockState(RELEASED, "lock should not have been acquired by 
others.");
+        setLock(null);
+        return true;
+      default:
+        throw new HoodieLockException("Unexpected lock update result: " + 
result.getLeft());
+    }
+  }
+
+  /**
+   * Renews (heartbeats) the current lock if we are the holder, it forcefully 
set
+   * the expiration flag
+   * to false and the lock expiration time to a later time in the future.
+   * @return True if we successfully renewed the lock, false if not.
+   */
+  @VisibleForTesting
+  protected synchronized boolean renewLock() {
+    try {
+      // If we don't hold the lock, no-op.
+      if (!believesLockMightBeHeld()) {
+        logger.warn("Owner {}: Cannot renew, no lock held by this process", 
ownerId);
+        // No need to extend lock lease.
+        return false;
+      }
+
+      long oldExpiration = getLock().getValidUntil();
+      // Attempt conditional update, extend lock. There are 3 cases:
+      // 1. Happy case: lock has not expired yet, we extend the lease to a 
longer
+      // period.
+      // 2. Corner case 1: lock is expired and is acquired by others, lock 
renewal
+      // failed with ACQUIRED_BY_OTHERS.
+      // 3. Corner case 2: lock is expired but no one has acquired it yet, lock
+      // renewal "revived" the expired lock.
+      // Please note we expect the corner cases almost never happens.
+      // Action taken for corner case 2 is just a best effort mitigation. At 
least it
+      // prevents further data corruption by
+      // letting someone else acquire the lock.
+      Pair<LockUpdateResult, StorageLockFile> currentLock = 
this.lockService.tryCreateOrUpdateLockFileWithRetry(
+          () -> new StorageLockData(false, System.currentTimeMillis() + 
validitySeconds, ownerId),
+          getLock(),
+          LOCK_UPSERT_RETRY_COUNT);
+      switch (currentLock.getLeft()) {
+        case ACQUIRED_BY_OTHERS:
+          logger.error("Owner {}: Unable to renew lock as it is acquired by 
others.", ownerId);
+          // No need to extend lock lease anymore.
+          return false;
+        case UNKNOWN_ERROR:
+          // This could be transient, but unclear, we will let the heartbeat 
continue
+          // normally.
+          // If the next heartbeat run identifies our lock has expired we will 
error out.
+          logger.warn("Owner {}: Unable to renew lock due to unknown error, 
could be transient.", ownerId);
+          // Let heartbeat retry later.
+          return true;
+        case SUCCESS:
+          // Only positive outcome
+          this.setLock(currentLock.getRight());
+          logger.info("Owner {}: Lock renewal successful. The renewal 
completes {} ms before expiration for lock {}.",
+              ownerId, oldExpiration - System.currentTimeMillis(), 
lockFilePath);
+          // Let heartbeat continue to renew lock lease again later.
+          return true;
+        default:
+          throw new HoodieLockException("Unexpected lock update result: " + 
currentLock.getLeft());
+      }
+    } catch (Exception e) {
+      logger.error("Owner {}: Exception occurred while renewing lock", 
ownerId, e);
+      return false;
+    }
+  }
+
+  // ---------
+  // Utilities
+  // ---------
+
+  /**
+   * Method to calculate whether a timestamp from a distributed source has
+   * definitively occurred yet.
+   */
+  protected boolean isCurrentTimeCertainlyOlderThanDistributedTime(long epoch) 
{
+    return System.currentTimeMillis() > epoch + CLOCK_DRIFT_BUFFER_MS;
+  }
+
+  private String generateLockStateMessage(LockState state) {
+    String threadName = Thread.currentThread().getName();
+    return String.format("Owner %s: Lock file path %s, Thread %s, Storage 
based lock state %s", ownerId,
+        lockFilePath, threadName, state.toString());
+  }
+
+  private static final String LOCK_STATE_LOGGER_MSG = "Owner {}: Lock file 
path {}, Thread {}, Storage based lock state {}";
+  private static final String LOCK_STATE_LOGGER_MSG_WITH_INFO = "Owner {}: 
Lock file path {}, Thread {}, Storage based lock state {}, {}";
+
+  private void logDebugLockState(LockState state) {
+    logger.debug(LOCK_STATE_LOGGER_MSG, ownerId, lockFilePath, 
Thread.currentThread(), state);
+  }
+
+  private void logInfoLockState(LockState state) {
+    logger.info(LOCK_STATE_LOGGER_MSG, ownerId, lockFilePath, 
Thread.currentThread(), state);
+  }
+
+  private void logInfoLockState(LockState state, String msg) {
+    logger.info(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, 
Thread.currentThread(), state, msg);
+  }
+
+  private void logWarnLockState(LockState state, String msg) {
+    logger.warn(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, 
Thread.currentThread(), state, msg);
+  }
+
+  private void logErrorLockState(LockState state, String msg) {
+    logger.error(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, 
Thread.currentThread(), state, msg);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
index 4beba39dc95..be73bd7cb2c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
@@ -35,7 +35,7 @@ import java.util.function.Supplier;
 
 /**
  * LockProviderHeartbeatManager is a helper class which handles the scheduling 
and stopping of heartbeat
- * tasks. This is intended for use with the conditional write lock provider, 
which requires
+ * tasks. This is intended for use with the storage based lock provider, which 
requires
  * a separate thread to spawn and renew the lock repeatedly.
  * It should be responsible for the entire lifecycle of the heartbeat task.
  * Importantly, a new instance should be created for each lock provider.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
index 2a045f31aef..dfccf23d67e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
@@ -35,7 +35,7 @@ public class StorageLockFile {
   private final StorageLockData data;
   private final String versionId;
 
-  // Get a custom object mapper. See ConditionalWriteLockData for required 
properties.
+  // Get a custom object mapper. See StorageLockData for required properties.
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
       // This allows us to add new properties down the line.
       .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
@@ -43,7 +43,7 @@ public class StorageLockFile {
       .enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);
 
   /**
-   * Initializes a ConditionalWriteLockFile using the data and unique 
versionId.
+   * Initializes a StorageLockFile using the data and unique versionId.
    *
    * @param data      The data in the lock file.
    * @param versionId The version of this lock file. Used to ensure 
consistency through conditional writes.
@@ -56,11 +56,11 @@ public class StorageLockFile {
   }
 
   /**
-   * Factory method to load an input stream into a ConditionalWriteLockFile.
+   * Factory method to load an input stream into a StorageLockFile.
    *
    * @param inputStream The input stream of the JSON content.
    * @param versionId   The unique version identifier for the lock file.
-   * @return A new instance of ConditionalWriteLockFile.
+   * @return A new instance of StorageLockFile.
    * @throws HoodieIOException If deserialization fails.
    */
   public static StorageLockFile createFromStream(InputStream inputStream, 
String versionId) {
@@ -68,7 +68,7 @@ public class StorageLockFile {
       StorageLockData data = OBJECT_MAPPER.readValue(inputStream, 
StorageLockData.class);
       return new StorageLockFile(data, versionId);
     } catch (IOException e) {
-      throw new HoodieIOException("Failed to deserialize JSON content into 
ConditionalWriteLockData", e);
+      throw new HoodieIOException("Failed to deserialize JSON content into 
StorageLockData", e);
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java
new file mode 100644
index 00000000000..bc5a52db71b
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
+
+public class StorageBasedLockConfig extends HoodieConfig {
+  private static final String SINCE_VERSION_1_0_2 = "1.0.2";
+  private static final String STORAGE_BASED_LOCK_PROPERTY_PREFIX = 
LockConfiguration.LOCK_PREFIX
+      + "storage.";
+
+  public static final ConfigProperty<Long> VALIDITY_TIMEOUT_SECONDS = 
ConfigProperty
+      .key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "validity.timeout.secs")
+      .defaultValue(TimeUnit.MINUTES.toSeconds(5))
+      .markAdvanced()
+      .sinceVersion(SINCE_VERSION_1_0_2)
+      .withDocumentation(
+          "For storage-based lock provider, the amount of time in seconds each 
new lock is valid for. "
+              + "The lock provider will attempt to renew its lock until it 
successfully extends the lock lease period "
+              + "or the validity timeout is reached.");
+
+  public static final ConfigProperty<Long> HEARTBEAT_POLL_SECONDS = 
ConfigProperty
+      .key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "heartbeat.poll.secs")
+      .defaultValue(30L)
+      .markAdvanced()
+      .sinceVersion(SINCE_VERSION_1_0_2)
+      .withDocumentation(
+          "For storage-based lock provider, the amount of time in seconds to 
wait before renewing the lock. "
+              + "Defaults to 30 seconds.");
+
+  public long getValiditySeconds() {
+    return getLong(VALIDITY_TIMEOUT_SECONDS);
+  }
+
+  public long getHeartbeatPollSeconds() {
+    return getLong(HEARTBEAT_POLL_SECONDS);
+  }
+
+  public String getHudiTableBasePath() {
+    return getString(BASE_PATH);
+  }
+
+  public static class Builder {
+    private final StorageBasedLockConfig lockConfig = new 
StorageBasedLockConfig();
+
+    public StorageBasedLockConfig build() {
+      lockConfig.setDefaults(StorageBasedLockConfig.class.getName());
+      return lockConfig;
+    }
+
+    public StorageBasedLockConfig.Builder fromProperties(TypedProperties 
props) {
+      lockConfig.getProps().putAll(props);
+      checkRequiredProps();
+      return this;
+    }
+
+    private void checkRequiredProps() {
+      String notExistsMsg = " does not exist!";
+      if (!lockConfig.contains(BASE_PATH)) {
+        throw new IllegalArgumentException(BASE_PATH.key() + notExistsMsg);
+      }
+      if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) < 
lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS)
+          * 3) {
+        throw new IllegalArgumentException(
+            VALIDITY_TIMEOUT_SECONDS.key() + " should be more than triple " + 
HEARTBEAT_POLL_SECONDS.key());
+      }
+      if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) < 5) {
+        throw new IllegalArgumentException(
+            VALIDITY_TIMEOUT_SECONDS.key() + " should be greater than or equal 
to 5 seconds.");
+      }
+      if (lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS) < 1) {
+        throw new IllegalArgumentException(
+            HEARTBEAT_POLL_SECONDS.key() + " should be greater than or equal 
to 1 second.");
+      }
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
new file mode 100644
index 00000000000..9e5f4a6d689
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import org.apache.hudi.client.transaction.lock.models.LockUpdateResult;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.StorageBasedLockConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test class for StorageBasedLockProvider
+ */
+class TestStorageBasedLockProvider {
+  private StorageBasedLockProvider lockProvider;
+  private StorageLock mockLockService;
+  private HeartbeatManager mockHeartbeatManager;
+  private Logger mockLogger;
+  private final String ownerId = UUID.randomUUID().toString();
+  private static final int DEFAULT_LOCK_VALIDITY_MS = 5000;
+
+  @BeforeEach
+  void setupLockProvider() {
+    mockLockService = mock(StorageLock.class);
+    mockHeartbeatManager = mock(HeartbeatManager.class);
+    mockLogger = mock(Logger.class);
+    when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+    TypedProperties props = new TypedProperties();
+    props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "5");
+    props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
+    props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-default");
+
+    lockProvider = spy(new StorageBasedLockProvider(
+        ownerId,
+        props,
+        (a,b,c) -> mockHeartbeatManager,
+        (a,b,c) -> mockLockService,
+        mockLogger));
+  }
+
+  @AfterEach
+  void cleanupLockProvider() {
+    lockProvider.close();
+  }
+
+  @Test
+  void testUnsupportedLockStorageLocation() {
+    TypedProperties props = new TypedProperties();
+    props.put(BASE_PATH.key(), "hdfs://bucket/lake/db/tbl-default");
+    LockConfiguration lockConf = new LockConfiguration(props);
+    StorageConfiguration<?> storageConf = 
HoodieTestUtils.getDefaultStorageConf();
+    HoodieLockException ex = assertThrows(HoodieLockException.class,
+        () -> new StorageBasedLockProvider(lockConf, storageConf));
+    assertTrue(ex.getCause().getMessage().contains("No implementation of 
StorageLock supports this scheme"));
+  }
+
+  @Test
+  void testValidLockStorageLocation() {
+    TypedProperties props = new TypedProperties();
+    props.put(BASE_PATH.key(), "s3://bucket/lake/db/tbl-default");
+
+    LockConfiguration lockConf = new LockConfiguration(props);
+    StorageConfiguration<?> storageConf = 
HoodieTestUtils.getDefaultStorageConf();
+
+    HoodieLockException ex = assertThrows(HoodieLockException.class,
+        () -> new StorageBasedLockProvider(lockConf, storageConf));
+    assertTrue(ex.getMessage().contains("Failed to load and initialize 
StorageLock"));
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = { "gs://bucket/lake/db/tbl-default", 
"s3://bucket/lake/db/tbl-default",
+      "s3a://bucket/lake/db/tbl-default" })
+  void testNonExistentWriteServiceWithDefaults(String tableBasePathString) {
+    TypedProperties props = new TypedProperties();
+    props.put(BASE_PATH.key(), tableBasePathString);
+
+    LockConfiguration lockConf = new LockConfiguration(props);
+    StorageConfiguration<?> storageConf = 
HoodieTestUtils.getDefaultStorageConf();
+
+    HoodieLockException ex = assertThrows(HoodieLockException.class,
+        () -> new StorageBasedLockProvider(lockConf, storageConf));
+    assertTrue(ex.getMessage().contains("Failed to load and initialize 
StorageLock"));
+  }
+
+  @Test
+  void testInvalidLocksLocationForWriteService() {
+    TypedProperties props = new TypedProperties();
+    props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-default");
+    props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "5");
+    props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
+
+    LockConfiguration lockConf = new LockConfiguration(props);
+    StorageConfiguration<?> storageConf = 
HoodieTestUtils.getDefaultStorageConf();
+
+    HoodieLockException ex = assertThrows(HoodieLockException.class,
+        () -> new StorageBasedLockProvider(lockConf, storageConf));
+    Throwable cause = ex.getCause();
+    assertNotNull(cause);
+    assertInstanceOf(HoodieNotSupportedException.class, cause);
+  }
+
+  @Test
+  void testTryLockForTimeUnitThrowsOnInterrupt() throws Exception {
+    doReturn(false).when(lockProvider).tryLock();
+    CountDownLatch latch = new CountDownLatch(1);
+    Thread t = new Thread(() -> {
+      try {
+        lockProvider.tryLock(1, TimeUnit.SECONDS);
+      } catch (HoodieLockException e) {
+        latch.countDown();
+      }
+    });
+    t.start();
+    Thread.sleep(50);
+    t.interrupt();
+    assertTrue(latch.await(2, TimeUnit.SECONDS));
+  }
+
+  @Test
+  void testTryLockForTimeUnitAcquiresLockEventually() throws Exception {
+    AtomicInteger count = new AtomicInteger(0);
+    doAnswer(inv -> count.incrementAndGet() > 2).when(lockProvider).tryLock();
+    CountDownLatch latch = new CountDownLatch(1);
+    Thread t = new Thread(() -> {
+      assertTrue(lockProvider.tryLock(4, TimeUnit.SECONDS));
+      latch.countDown();
+    });
+    t.start();
+    assertTrue(latch.await(5, TimeUnit.SECONDS));
+  }
+
+  @Test
+  void testTryLockForTimeUnitFailsToAcquireLockEventually() throws Exception {
+    AtomicInteger count = new AtomicInteger(0);
+    doAnswer(inv -> count.incrementAndGet() > 2).when(lockProvider).tryLock();
+    CountDownLatch latch = new CountDownLatch(1);
+    Thread t = new Thread(() -> {
+      assertFalse(lockProvider.tryLock(1, TimeUnit.SECONDS));
+      latch.countDown();
+    });
+    t.start();
+    assertTrue(latch.await(2, TimeUnit.SECONDS));
+  }
+
+  @Test
+  void testTryLockSuccess() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 null));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+    boolean acquired = lockProvider.tryLock();
+    assertTrue(acquired);
+    assertEquals(realLockFile, lockProvider.getLock());
+    verify(mockLockService, atLeastOnce()).tryCreateOrUpdateLockFile(any(), 
any());
+  }
+
+  @Test
+  void testTryLockSuccessButFailureToStartHeartbeat() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 null));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+    
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(false);
+    when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), 
eq(realLockFile), anyLong()))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+
+    boolean acquired = lockProvider.tryLock();
+    assertFalse(acquired);
+  }
+
+  @Test
+  void testTryLockFailsFromOwnerMismatch() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 null));
+    StorageLockFile returnedLockFile = new StorageLockFile(
+        new StorageLockData(false, System.currentTimeMillis() + 
DEFAULT_LOCK_VALIDITY_MS, "different-owner"), "v1");
+    when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS, returnedLockFile));
+
+    HoodieLockException ex = assertThrows(HoodieLockException.class, () -> 
lockProvider.tryLock());
+    assertTrue(ex.getMessage().contains("Owners do not match"));
+  }
+
+  @Test
+  void testTryLockFailsDueToExistingLock() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS,
+        "other-owner");
+    StorageLockFile existingLock = new StorageLockFile(data, "v2");
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS,
 existingLock));
+
+    boolean acquired = lockProvider.tryLock();
+    assertFalse(acquired);
+  }
+
+  @Test
+  void testTryLockFailsToUpdateFile() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 null));
+    when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+        .thenReturn(Pair.of(LockUpdateResult.ACQUIRED_BY_OTHERS, null));
+    assertFalse(lockProvider.tryLock());
+  }
+
+  @Test
+  void testTryLockFailsDueToUnknownState() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.UNKNOWN_ERROR,
 null));
+    assertFalse(lockProvider.tryLock());
+  }
+
+  @Test
+  void testTryLockSucceedsWhenExistingLockExpiredByTime() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
+        "other-owner");
+    StorageLockFile existingLock = new StorageLockFile(data, "v2");
+    StorageLockData newData = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS,
+        ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(newData, "v1");
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS,
 existingLock));
+    when(mockLockService.tryCreateOrUpdateLockFile(any(), eq(existingLock)))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+    boolean acquired = lockProvider.tryLock();
+    assertTrue(acquired);
+  }
+
+  @Test
+  void testTryLockReentrancySucceeds() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 null));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+    boolean acquired = lockProvider.tryLock();
+    assertTrue(acquired);
+    // Re-entrancy succeeds
+    assertTrue(lockProvider.tryLock());
+  }
+
+  @Test
+  void testTryLockReentrancyAfterLockExpiredByTime() {
+    // In an extremely unlikely scenario, we could have a local reference to a 
lock
+    // which is present but expired,
+    // and because we were unable to stop the heartbeat properly, we did not
+    // successfully set it to null.
+    // Due to the nature of the heartbeat manager, this is expected to 
introduce
+    // some delay, but not be permanently blocking.
+    // There are a few variations of this edge case, so we must test them all.
+
+    // Here the lock is still "unexpired" but the time shows expired.
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile expiredLock = new StorageLockFile(data, "v1");
+    doReturn(expiredLock).when(lockProvider).getLock();
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 null));
+    StorageLockData validData = new StorageLockData(false, 
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
+        ownerId);
+    StorageLockFile validLock = new StorageLockFile(validData, "v2");
+    when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS, validLock));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+    assertTrue(lockProvider.tryLock());
+  }
+
+  @Test
+  void testTryLockReentrancyAfterLockSetExpired() {
+    // In an extremely unlikely scenario, we could have a local reference to a 
lock
+    // which is present but expired,
+    // and because we were unable to stop the heartbeat properly, we did not
+    // successfully set it to null.
+    // Due to the nature of the heartbeat manager, this is expected to 
introduce
+    // some delay, but not be permanently blocking.
+    // There are a few variations of this edge case, so we must test them all.
+
+    // Here the lock is "expired" but the time shows unexpired.
+    StorageLockData data = new StorageLockData(true, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile expiredLock = new StorageLockFile(data, "v1");
+    doReturn(expiredLock).when(lockProvider).getLock();
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 null));
+    StorageLockData validData = new StorageLockData(false, 
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
+        ownerId);
+    StorageLockFile validLock = new StorageLockFile(validData, "v2");
+    when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS, validLock));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+    assertTrue(lockProvider.tryLock());
+  }
+
+  @Test
+  void testTryLockHeartbeatStillActive() {
+    // In an extremely unlikely scenario, we could have a local reference to a 
lock
+    // which is present but expired,
+    // and because we were unable to stop the heartbeat properly, we did not
+    // successfully set it to null.
+    // Due to the nature of the heartbeat manager, this is expected to 
introduce
+    // some delay, but not be permanently blocking.
+    // There are a few variations of this edge case, so we must test them all.
+
+    // Here the heartbeat is still active, so we have to error out.
+    StorageLockData data = new StorageLockData(true, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile expiredLock = new StorageLockFile(data, "v1");
+    doReturn(expiredLock).when(lockProvider).getLock();
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 null));
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+    assertThrows(HoodieLockException.class, () -> lockProvider.tryLock());
+  }
+
+  @Test
+  void testUnlockSucceedsAndReentrancy() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 null));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+    when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+    when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), 
eq(realLockFile), anyLong()))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS,
+            new StorageLockFile(new StorageLockData(true, 
data.getValidUntil(), ownerId), "v2")));
+    assertTrue(lockProvider.tryLock());
+    when(mockHeartbeatManager.hasActiveHeartbeat())
+        .thenReturn(true) // when we try to stop the heartbeat we will check 
if heartbeat is active return true.
+        .thenReturn(false); // when try to set lock to expire we will assert 
no active heartbeat as a precondition.
+    lockProvider.unlock();
+    assertNull(lockProvider.getLock());
+    lockProvider.unlock();
+  }
+
+  @Test
+  void testUnlockFailsToStopHeartbeat() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 null));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+    assertTrue(lockProvider.tryLock());
+    when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(false);
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+    assertThrows(HoodieLockException.class, () -> lockProvider.unlock());
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+  }
+
+  @Test
+  void testCloseFailsToStopHeartbeat() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 null));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+    assertTrue(lockProvider.tryLock());
+    when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(false);
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+    // Should wrap the exception and log error.
+    lockProvider.close();
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+  }
+
+  @Test
+  void testRenewLockReturnsFalseWhenNoLockHeld() {
+    doReturn(null).when(lockProvider).getLock();
+    assertFalse(lockProvider.renewLock());
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+    verify(mockLogger).warn("Owner {}: Cannot renew, no lock held by this 
process", this.ownerId);
+  }
+
+  @Test
+  void testRenewLockWithoutHoldingLock() {
+    doReturn(null).when(lockProvider).getLock();
+    assertFalse(lockProvider.renewLock());
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+    verify(mockLogger).warn("Owner {}: Cannot renew, no lock held by this 
process", this.ownerId);
+  }
+
+  @Test
+  void testRenewLockWithFullyExpiredLock() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile nearExpiredLockFile = new StorageLockFile(data, "v1");
+    doReturn(nearExpiredLockFile).when(lockProvider).getLock();
+    when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), 
eq(nearExpiredLockFile), anyLong()))
+        .thenReturn(Pair.of(LockUpdateResult.ACQUIRED_BY_OTHERS, null));
+    assertFalse(lockProvider.renewLock());
+    verify(mockLogger).error("Owner {}: Unable to renew lock as it is acquired 
by others.", this.ownerId);
+  }
+
+  @Test
+  void testRenewLockUnableToUpsertLockFileButNotFatal() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    doReturn(lockFile).when(lockProvider).getLock();
+    // Signal the upsert attempt failed, but may be transient. See interface 
for
+    // more details.
+    when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), 
eq(lockFile), anyLong()))
+        .thenReturn(Pair.of(LockUpdateResult.UNKNOWN_ERROR, null));
+    assertTrue(lockProvider.renewLock());
+  }
+
+  @Test
+  void testRenewLockUnableToUpsertLockFileFatal() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    doReturn(lockFile).when(lockProvider).getLock();
+    // Signal the upsert attempt failed, but may be transient. See interface 
for
+    // more details.
+    when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), 
eq(lockFile), anyLong()))
+        .thenReturn(Pair.of(LockUpdateResult.UNKNOWN_ERROR, null));
+    // renewLock return true so it will be retried.
+    assertTrue(lockProvider.renewLock());
+
+    verify(mockLogger).warn("Owner {}: Unable to renew lock due to unknown 
error, could be transient.", this.ownerId);
+  }
+
+  @Test
+  void testRenewLockSucceedsButRenewalWithinExpirationWindow() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    doReturn(lockFile).when(lockProvider).getLock();
+
+    StorageLockData nearExpirationData = new StorageLockData(false, 
System.currentTimeMillis(), ownerId);
+    StorageLockFile lockFileNearExpiration = new 
StorageLockFile(nearExpirationData, "v2");
+    when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), 
eq(lockFile), anyLong()))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS, lockFileNearExpiration));
+
+    // We used to fail in this case before, but since we are only modifying a 
single
+    // lock file, this is ok now.
+    // Therefore, this can be a happy path variation.
+    assertTrue(lockProvider.renewLock());
+  }
+
+  @Test
+  void testRenewLockSucceeds() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    doReturn(lockFile).when(lockProvider).getLock();
+
+    StorageLockData successData = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS,
+        ownerId);
+    StorageLockFile successLockFile = new StorageLockFile(successData, "v2");
+    when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), 
eq(lockFile), anyLong()))
+        .thenReturn(Pair.of(LockUpdateResult.SUCCESS, successLockFile));
+    assertTrue(lockProvider.renewLock());
+
+    verify(mockLogger).info(
+        eq("Owner {}: Lock renewal successful. The renewal completes {} ms 
before expiration for lock {}."),
+        eq(this.ownerId), anyLong(), 
eq("gs://bucket/lake/db/tbl-default/.hoodie/.locks"));
+  }
+
+  @Test
+  void testRenewLockFails() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    doReturn(lockFile).when(lockProvider).getLock();
+
+    when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), 
eq(lockFile), anyLong()))
+        .thenThrow(new RuntimeException("Failure"));
+    assertFalse(lockProvider.renewLock());
+
+    verify(mockLogger).error(eq("Owner {}: Exception occurred while renewing 
lock"), eq(ownerId),
+        any(RuntimeException.class));
+  }
+
+  @Test
+  void testCloseCallsDependencies() throws Exception {
+    lockProvider.close();
+    verify(mockLockService, atLeastOnce()).close();
+    verify(mockHeartbeatManager, atLeastOnce()).close();
+    assertNull(lockProvider.getLock());
+  }
+
+  @Test
+  void testCloseWithErrorForLockService() throws Exception {
+    doThrow(new RuntimeException("Some 
failure")).when(mockLockService).close();
+    lockProvider.close();
+    verify(mockLogger).error(eq("Owner {}: Lock service failed to close."), 
eq(ownerId), any(RuntimeException.class));
+    assertNull(lockProvider.getLock());
+  }
+
+  @Test
+  void testCloseWithErrorForHeartbeatManager() throws Exception {
+    doThrow(new RuntimeException("Some 
failure")).when(mockHeartbeatManager).close();
+    lockProvider.close();
+    verify(mockLogger).error(eq("Owner {}: Heartbeat manager failed to 
close."), eq(ownerId),
+        any(RuntimeException.class));
+    assertNull(lockProvider.getLock());
+  }
+
+  @Test
+  public void testShutdownHookViaReflection() throws Exception {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 null));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull()))
+            .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+    boolean acquired = lockProvider.tryLock();
+    assertTrue(acquired);
+    assertEquals(realLockFile, lockProvider.getLock());
+    verify(mockLockService, atLeastOnce()).tryCreateOrUpdateLockFile(any(), 
any());
+
+    when(mockLockService.tryCreateOrUpdateLockFile(any(StorageLockData.class), 
eq(realLockFile)))
+            .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile));
+
+    // Mock shutdown
+    Method shutdownMethod = 
lockProvider.getClass().getDeclaredMethod("shutdown", boolean.class);
+    shutdownMethod.setAccessible(true);
+    shutdownMethod.invoke(lockProvider, true);
+
+    // Verify that the expected shutdown behaviors occurred.
+    assertNull(lockProvider.getLock());
+    // We do not execute additional actions
+    verify(mockLockService, never()).close();
+    verify(mockHeartbeatManager, never()).close();
+  }
+
+  @Test
+  public void testShutdownHookWhenNoLockPresent() throws Exception {
+    // Now, when calling shutdown(true), the method should immediately return.
+    Method shutdownMethod = 
lockProvider.getClass().getDeclaredMethod("shutdown", boolean.class);
+    shutdownMethod.setAccessible(true);
+    shutdownMethod.invoke(lockProvider, true);
+
+    // Verify that unlock or close methods are NOT invoked, or adjust 
expectations accordingly.
+    verify(mockLockService, never()).close();
+    verify(mockHeartbeatManager, never()).close();
+  }
+
+  public static class StubStorageLock implements StorageLock {
+    public StubStorageLock(String arg1, String arg2, String arg3) {
+      // No-op constructor for reflection
+    }
+
+    @Override
+    public Pair<LockUpdateResult, StorageLockFile> 
tryCreateOrUpdateLockFile(StorageLockData newLockData,
+        StorageLockFile previousLockFile) {
+      return null;
+    }
+
+    @Override
+    public Pair<LockUpdateResult, StorageLockFile> 
tryCreateOrUpdateLockFileWithRetry(
+        Supplier<StorageLockData> newLockDataSupplier,
+        StorageLockFile previousLockFile,
+        long retryExpiration) {
+      return null;
+    }
+
+    @Override
+    public Pair<LockGetResult, StorageLockFile> readCurrentLockFile() {
+      return null;
+    }
+
+    @Override
+    public void close() throws Exception {
+      // stub, no-op
+    }
+  }
+
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java
new file mode 100644
index 00000000000..3be7e5550b2
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestStorageBasedLockConfig {
+
+  @Test
+  void testDefaultValues() {
+    // Asserts that the defaults are correct
+    TypedProperties props = new TypedProperties();
+    props.setProperty(BASE_PATH.key(), "s3://hudi-bucket/table/basepath");
+
+    StorageBasedLockConfig.Builder builder = new 
StorageBasedLockConfig.Builder();
+    StorageBasedLockConfig config = builder
+        .fromProperties(props)
+        .build();
+
+    assertEquals(5 * 60, config.getValiditySeconds(), "Default lock validity 
should be 5 minutes");
+    assertEquals(30, config.getHeartbeatPollSeconds(), "Default heartbeat poll 
time should be 30 seconds");
+  }
+
+  @Test
+  void testCustomValues() {
+    // Testing that custom values which differ from defaults can be read 
properly
+    TypedProperties props = new TypedProperties();
+    props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), 
"120");
+    props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), 
"10");
+    props.setProperty(BASE_PATH.key(), "/hudi/table/basepath");
+
+    StorageBasedLockConfig config = new StorageBasedLockConfig.Builder()
+        .fromProperties(props)
+        .build();
+
+    assertEquals(120, config.getValiditySeconds());
+    assertEquals(10, config.getHeartbeatPollSeconds());
+    assertEquals("/hudi/table/basepath", config.getHudiTableBasePath());
+  }
+
+  @Test
+  void testBasePathPropertiesValidation() {
+    // Tests that validations around the base path are present.
+    TypedProperties props = new TypedProperties();
+    props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), 
"120");
+    StorageBasedLockConfig.Builder propsBuilder = new 
StorageBasedLockConfig.Builder();
+
+    // Missing base path
+    IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
+        () -> propsBuilder.fromProperties(props));
+    assertTrue(exception.getMessage().contains(BASE_PATH.key()));
+  }
+
+  @Test
+  void testTimeThresholds() {
+    // Ensure that validations which restrict the time-based inputs are 
working.
+    TypedProperties props = new TypedProperties();
+    props.setProperty(BASE_PATH.key(), "/hudi/table/basepath");
+    // Invalid config case: validity timeout is less than triple of heartbeat 
poll period
+    props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), 
"5");
+    props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), 
"3");
+    StorageBasedLockConfig.Builder propsBuilder = new 
StorageBasedLockConfig.Builder();
+
+    IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
+        () -> propsBuilder.fromProperties(props));
+    
assertTrue(exception.getMessage().contains(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key()));
+    // Invalid config case: validity timeout is less than 5 seconds
+    props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), 
"4");
+    props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), 
"1");
+    exception = assertThrows(IllegalArgumentException.class, () -> 
propsBuilder.fromProperties(props));
+    
assertTrue(exception.getMessage().contains(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key()));
+    // Invalid config case: heartbeat poll period is less than 1 second
+    props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), 
"5");
+    props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), 
"0");
+    exception = assertThrows(IllegalArgumentException.class, () -> 
propsBuilder.fromProperties(props));
+    
assertTrue(exception.getMessage().contains(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key()));
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 4c0e947d73b..c42024f42eb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -131,6 +131,8 @@ public class HoodieTableMetaClient implements Serializable {
   public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH =
       BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + StoragePath.SEPARATOR + ".fileids";
 
+  public static final String LOCKS_FOLDER_NAME = METAFOLDER_NAME + 
StoragePath.SEPARATOR + ".locks";
+
   public static final String SCHEMA_FOLDER_NAME = ".schema";
 
   public static final String MARKER_EXTN = ".marker";
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
index e718bc21ed6..25fe3e34c6f 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
@@ -39,6 +39,15 @@ public class TestStorageSchemes {
     assertTrue(StorageSchemes.isSchemeSupported("afs"));
     assertFalse(StorageSchemes.isSchemeSupported("s2"));
 
+    for (StorageSchemes scheme : StorageSchemes.values()) {
+      String schemeName = scheme.getScheme();
+      if (scheme.getScheme().startsWith("s3")) {
+        
assertTrue(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent());
+      } else {
+        
assertFalse(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent());
+      }
+    }
+
     assertTrue(StorageSchemes.isAtomicCreationSupported("file"));
     assertTrue(StorageSchemes.isAtomicCreationSupported("hdfs"));
     assertFalse(StorageSchemes.isAtomicCreationSupported("afs"));
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
index ca795917f98..aabb50104eb 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
@@ -19,6 +19,9 @@
 
 package org.apache.hudi.storage;
 
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+
 import java.util.Arrays;
 import java.util.Set;
 import java.util.HashSet;
@@ -28,65 +31,66 @@ import java.util.HashSet;
  */
 public enum StorageSchemes {
   // Local filesystem
-  FILE("file", false, true),
+  FILE("file", false, true, null),
   // Hadoop File System
-  HDFS("hdfs", false, true),
+  HDFS("hdfs", false, true, null),
   // Baidu Advanced File System
-  AFS("afs", null, null),
+  AFS("afs", null, null, null),
   // Mapr File System
-  MAPRFS("maprfs", null, null),
+  MAPRFS("maprfs", null, null, null),
   // Apache Ignite FS
-  IGNITE("igfs", null, null),
+  IGNITE("igfs", null, null, null),
   // AWS S3
-  S3A("s3a", true, null),
-  S3("s3", true, null),
+  S3A("s3a", true, null, "org.apache.hudi.aws.transaction.lock.S3StorageLock"),
+  S3("s3", true, null, "org.apache.hudi.aws.transaction.lock.S3StorageLock"),
   // Google Cloud Storage
-  GCS("gs", true, null),
+  GCS("gs", true, null, null),
   // Azure WASB
-  WASB("wasb", null, null),
-  WASBS("wasbs", null, null),
+  WASB("wasb", null, null, null),
+  WASBS("wasbs", null, null, null),
   // Azure ADLS
-  ADL("adl", null, null),
+  ADL("adl", null, null, null),
   // Azure ADLS Gen2
-  ABFS("abfs", null, null),
-  ABFSS("abfss", null, null),
+  ABFS("abfs", null, null, null),
+  ABFSS("abfss", null, null, null),
   // Aliyun OSS
-  OSS("oss", null, null),
-  // View FS for federated setups. If federating across cloud stores, then 
append support is false
+  OSS("oss", null, null, null),
+  // View FS for federated setups. If federating across cloud stores, then 
append
+  // support is false
   // View FS support atomic creation
-  VIEWFS("viewfs", null, true),
-  //ALLUXIO
-  ALLUXIO("alluxio", null, null),
+  VIEWFS("viewfs", null, true, null),
+  // ALLUXIO
+  ALLUXIO("alluxio", null, null, null),
   // Tencent Cloud Object Storage
-  COSN("cosn", null, null),
+  COSN("cosn", null, null, null),
   // Tencent Cloud HDFS
-  CHDFS("ofs", null, null),
+  CHDFS("ofs", null, null, null),
   // Tencent Cloud CacheFileSystem
-  GOOSEFS("gfs", null, null),
+  GOOSEFS("gfs", null, null, null),
   // Databricks file system
-  DBFS("dbfs", null, null),
+  DBFS("dbfs", null, null, null),
   // IBM Cloud Object Storage
-  COS("cos", null, null),
+  COS("cos", null, null, null),
   // Huawei Cloud Object Storage
-  OBS("obs", null, null),
+  OBS("obs", null, null, null),
   // Kingsoft Standard Storage ks3
-  KS3("ks3", null, null),
+  KS3("ks3", null, null, null),
   // Netease Object Storage nos
-  NOS("nos", null, null),
+  NOS("nos", null, null, null),
   // JuiceFileSystem
-  JFS("jfs", null, null),
+  JFS("jfs", null, null, null),
   // Baidu Object Storage
-  BOS("bos", null, null),
+  BOS("bos", null, null, null),
   // Oracle Cloud Infrastructure Object Storage
-  OCI("oci", null, null),
+  OCI("oci", null, null, null),
   // Volcengine Object Storage
-  TOS("tos", null, null),
+  TOS("tos", null, null, null),
   // Volcengine Cloud HDFS
-  CFS("cfs", null, null),
+  CFS("cfs", null, null, null),
   // Aliyun Apsara File Storage for HDFS
-  DFS("dfs", false, true),
+  DFS("dfs", false, true, null),
   // Hopsworks File System
-  HOPSFS("hopsfs", false, true);
+  HOPSFS("hopsfs", false, true, null);
 
   // list files may bring pressure to storage with centralized meta service 
like HDFS.
   // when we want to get only part of files under a directory rather than all 
files, use getStatus may be more friendly than listStatus.
@@ -98,11 +102,17 @@ public enum StorageSchemes {
   private final Boolean isWriteTransactional;
   // null for uncertain if dfs support atomic create&delete, please update 
this for each FS
   private final Boolean supportAtomicCreation;
+  private final String storageLockClass;
 
-  StorageSchemes(String scheme, Boolean isWriteTransactional, Boolean 
supportAtomicCreation) {
+  StorageSchemes(
+      String scheme,
+      Boolean isWriteTransactional,
+      Boolean supportAtomicCreation,
+      String storageLockClass) {
     this.scheme = scheme;
     this.isWriteTransactional = isWriteTransactional;
     this.supportAtomicCreation = supportAtomicCreation;
+    this.storageLockClass = storageLockClass;
   }
 
   public String getScheme() {
@@ -117,6 +127,14 @@ public enum StorageSchemes {
     return supportAtomicCreation != null && supportAtomicCreation;
   }
 
+  public boolean implementsStorageLock() {
+    return !StringUtils.isNullOrEmpty(storageLockClass);
+  }
+
+  public String getStorageLockClass() {
+    return storageLockClass;
+  }
+
   public static boolean isSchemeSupported(String scheme) {
     return Arrays.stream(values()).anyMatch(s -> s.getScheme().equals(scheme));
   }
@@ -143,4 +161,12 @@ public enum StorageSchemes {
 
     return LIST_STATUS_FRIENDLY_SCHEMES.contains(scheme);
   }
+
+  public static Option<StorageSchemes> 
getStorageLockImplementationIfExists(String scheme) {
+    if (!isSchemeSupported(scheme)) {
+      throw new IllegalArgumentException("Unsupported scheme :" + scheme);
+    }
+    return Option.fromJavaOptional(Arrays.stream(StorageSchemes.values())
+        .filter(s -> s.implementsStorageLock() && 
s.scheme.equals(scheme)).findFirst());
+  }
 }

Reply via email to