This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 949017eda3d1 [HUDI-9158] Add storage based lock provider abstract 
implementation to 0.x branch (#13712)
949017eda3d1 is described below

commit 949017eda3d158833088a98736eb1115fa9a060c
Author: Alex R <[email protected]>
AuthorDate: Tue Aug 12 15:14:09 2025 -0700

    [HUDI-9158] Add storage based lock provider abstract implementation to 0.x 
branch (#13712)
---
 .../transaction/lock/StorageBasedLockProvider.java | 545 +++++++++++++++++
 .../lock/models/LockProviderHeartbeatManager.java  |   2 +-
 .../apache/hudi/config/StorageBasedLockConfig.java | 100 +++
 .../lock/TestStorageBasedLockProvider.java         | 669 +++++++++++++++++++++
 .../models/TestLockProviderHeartbeatManager.java   |  14 +-
 .../hudi/config/TestStorageBasedLockConfig.java    | 101 ++++
 .../hudi/common/table/HoodieTableMetaClient.java   |   2 +
 .../apache/hudi/common/fs/TestStorageSchemes.java  |   9 +
 .../org/apache/hudi/storage/StorageSchemes.java    |  81 ++-
 9 files changed, 1485 insertions(+), 38 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
new file mode 100644
index 000000000000..00f9bcd74742
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import 
org.apache.hudi.client.transaction.lock.models.LockProviderHeartbeatManager;
+import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.StorageBasedLockConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StorageSchemes;
+
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.net.URI;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
+import static org.apache.hudi.common.lock.LockState.ACQUIRED;
+import static org.apache.hudi.common.lock.LockState.ACQUIRING;
+import static org.apache.hudi.common.lock.LockState.FAILED_TO_ACQUIRE;
+import static org.apache.hudi.common.lock.LockState.FAILED_TO_RELEASE;
+import static org.apache.hudi.common.lock.LockState.RELEASED;
+import static org.apache.hudi.common.lock.LockState.RELEASING;
+import static 
org.apache.hudi.common.table.HoodieTableMetaClient.LOCKS_FOLDER_NAME;
+
+/**
+ * A distributed filesystem storage based lock provider. This {@link 
LockProvider} implementation
+ * leverages conditional writes to ensure transactional consistency for 
multi-writer scenarios.
+ * The underlying storage client interface {@link StorageLockClient} is 
pluggable so it can be implemented for any
+ * filesystem which supports conditional writes.
+ */
+@ThreadSafe
+public class StorageBasedLockProvider implements LockProvider<StorageLockFile> 
{
+
+  public static final String DEFAULT_TABLE_LOCK_FILE_NAME = "table_lock.json";
+  // How long to wait before retrying lock acquisition in blocking calls.
+  // Maximum expected clock drift between two nodes.
+  // This is similar idea as SkewAdjustingTimeGenerator.
+  // In reality, within a single cloud provider all nodes share the same ntp
+  // server
+  // therefore we do not expect drift more than a few ms.
+  // However, since our lock leases are pretty long, we can use a high buffer.
+  private static final long CLOCK_DRIFT_BUFFER_MS = 500;
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StorageBasedLockProvider.class);
+
+  // Use for testing
+  private final Logger logger;
+
+  // The lock service implementation which interacts with storage
+  private final StorageLockClient storageLockClient;
+
+  private final long lockValiditySecs;
+  private final String ownerId;
+  private final String lockFilePath;
+  private final HeartbeatManager heartbeatManager;
+  private final transient Thread shutdownThread;
+
+  @GuardedBy("this")
+  private StorageLockFile currentLockObj = null;
+  @GuardedBy("this")
+  private boolean isClosed = false;
+
+  private synchronized void setLock(StorageLockFile lockObj) {
+    if (lockObj != null && !Objects.equals(lockObj.getOwner(), this.ownerId)) {
+      throw new HoodieLockException("Owners do not match. Current lock owner: 
" + this.ownerId + " lock path: "
+          + this.lockFilePath + " owner: " + lockObj.getOwner());
+    }
+    this.currentLockObj = lockObj;
+  }
+
+  /**
+   * Default constructor for StorageBasedLockProvider, required by LockManager
+   * to instantiate it using reflection.
+   * 
+   * @param lockConfiguration The lock configuration, should be transformable 
into
+   *                          StorageBasedLockConfig
+   * @param conf              Storage config, ignored.
+   */
+  public StorageBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf) {
+    this(
+            UUID.randomUUID().toString(),
+            lockConfiguration.getConfig(),
+            LockProviderHeartbeatManager::new,
+            getStorageLockClientClassName(),
+            LOGGER);
+  }
+
+  private static Functions.Function3<String, String, TypedProperties, 
StorageLockClient> getStorageLockClientClassName() {
+    return (ownerId, lockFilePath, lockConfig) -> {
+      try {
+        return (StorageLockClient) ReflectionUtils.loadClass(
+                getLockServiceClassName(new URI(lockFilePath).getScheme()),
+                new Class<?>[]{String.class, String.class, Properties.class},
+                new Object[]{ownerId, lockFilePath, lockConfig});
+      } catch (Throwable e) {
+        throw new HoodieLockException("Failed to load and initialize 
StorageLock", e);
+      }
+    };
+  }
+
+  private static @NotNull String getLockServiceClassName(String scheme) {
+    Option<StorageSchemes> schemeOptional = 
StorageSchemes.getStorageLockImplementationIfExists(scheme);
+    if (schemeOptional.isPresent()) {
+      return schemeOptional.get().getStorageLockClass();
+    } else {
+      throw new HoodieNotSupportedException("No implementation of StorageLock 
supports this scheme: " + scheme);
+    }
+  }
+
+  @VisibleForTesting
+  StorageBasedLockProvider(
+      String ownerId,
+      TypedProperties properties,
+      Functions.Function3<String, Long, Supplier<Boolean>, HeartbeatManager> 
heartbeatManagerLoader,
+      Functions.Function3<String, String, TypedProperties, StorageLockClient> 
storageLockClientLoader,
+      Logger logger) {
+    StorageBasedLockConfig config = new 
StorageBasedLockConfig.Builder().fromProperties(properties).build();
+    long heartbeatPollSeconds = config.getHeartbeatPollSeconds();
+    this.lockValiditySecs = config.getValiditySeconds();
+    this.lockFilePath = String.format(
+            "%s%s%s%s%s",
+            config.getHudiTableBasePath(),
+            StoragePath.SEPARATOR,
+            LOCKS_FOLDER_NAME,
+            StoragePath.SEPARATOR,
+            DEFAULT_TABLE_LOCK_FILE_NAME);
+    this.heartbeatManager = heartbeatManagerLoader.apply(ownerId, 
TimeUnit.SECONDS.toMillis(heartbeatPollSeconds), this::renewLock);
+    this.storageLockClient = storageLockClientLoader.apply(ownerId, 
lockFilePath, properties);
+    this.ownerId = ownerId;
+    this.logger = logger;
+    shutdownThread = new Thread(() -> shutdown(true));
+    Runtime.getRuntime().addShutdownHook(shutdownThread);
+    logger.info("Instantiated new storage-based lock provider, owner: {}, 
lockfilePath: {}", ownerId, lockFilePath);
+  }
+
+  // -----------------------------------------
+  // BASE METHODS
+  // -----------------------------------------
+
+  @Override
+  public synchronized StorageLockFile getLock() {
+    return currentLockObj;
+  }
+
+  /**
+   * Attempts to acquire the lock within the given timeout.
+   */
+  @Override
+  public boolean tryLock(long time, TimeUnit unit) {
+    long deadlineNanos = System.nanoTime() + unit.toNanos(time);
+
+    while (System.nanoTime() < deadlineNanos) {
+      try {
+        logDebugLockState(ACQUIRING);
+        if (tryLock()) {
+          return true;
+        }
+        
Thread.sleep(Long.parseLong(DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new 
HoodieLockException(generateLockStateMessage(LockState.FAILED_TO_ACQUIRE), e);
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public synchronized void close() {
+    shutdown(false);
+  }
+
+  @VisibleForTesting
+  synchronized void shutdown(boolean fromShutdownHook) {
+    if (fromShutdownHook) {
+      // Try to expire the lock from the shutdown hook.
+      if (!isClosed && actuallyHoldsLock()) {
+        tryExpireCurrentLock(true);
+      }
+      // Do not execute any further actions, mark closed
+      this.isClosed = true;
+      return;
+    } else {
+      try {
+        tryRemoveShutdownHook();
+      } catch (IllegalStateException e) {
+        logger.warn("Owner {}: Failed to remove shutdown hook, JVM is already 
shutting down.", ownerId, e);
+      }
+    }
+    try {
+      this.unlock();
+    } catch (Exception e) {
+      logger.error("Owner {}: Failed to unlock current lock.", ownerId, e);
+    }
+    try {
+      this.storageLockClient.close();
+    } catch (Exception e) {
+      logger.error("Owner {}: Lock service failed to close.", ownerId, e);
+    }
+    try {
+      this.heartbeatManager.close();
+    } catch (Exception e) {
+      logger.error("Owner {}: Heartbeat manager failed to close.", ownerId, e);
+    }
+
+    this.isClosed = true;
+  }
+
+  @VisibleForTesting
+  void tryRemoveShutdownHook() {
+    Runtime.getRuntime().removeShutdownHook(shutdownThread);
+  }
+
+  private synchronized boolean isLockStillValid(StorageLockFile lock) {
+    return !lock.isExpired() && 
!isCurrentTimeCertainlyOlderThanDistributedTime(lock.getValidUntilMs());
+  }
+
+  /**
+   * Attempts a single pass to acquire the lock (non-blocking).
+   * 
+   * @return true if lock acquired, false otherwise
+   */
+  @Override
+  public synchronized boolean tryLock() {
+    assertHeartbeatManagerExists();
+    assertUnclosed();
+    logDebugLockState(ACQUIRING);
+    if (actuallyHoldsLock()) {
+      // Supports reentrant locks
+      return true;
+    }
+
+    if (this.heartbeatManager.hasActiveHeartbeat()) {
+      logger.error("Detected broken invariant: there is an active heartbeat 
without a lock being held.");
+      // Breach of object invariant - we should never have an active heartbeat 
without
+      // holding a lock.
+      throw new 
HoodieLockException(generateLockStateMessage(FAILED_TO_ACQUIRE));
+    }
+
+    Pair<LockGetResult, Option<StorageLockFile>> latestLock = 
this.storageLockClient.readCurrentLockFile();
+    if (latestLock.getLeft() == LockGetResult.UNKNOWN_ERROR) {
+      logInfoLockState(FAILED_TO_ACQUIRE, "Failed to get the latest lock 
status");
+      // We were not able to determine whether a lock was present.
+      return false;
+    }
+
+    if (latestLock.getLeft() == LockGetResult.SUCCESS && 
isLockStillValid(latestLock.getRight().get())) {
+      String msg = String.format("Lock already held by %s", 
latestLock.getRight().get().getOwner());
+      // Lock held by others.
+      logInfoLockState(FAILED_TO_ACQUIRE, msg);
+      return false;
+    }
+
+    // Try to acquire the lock
+    StorageLockData newLockData = new StorageLockData(false, 
getCurrentEpochMs() + TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId);
+    Pair<LockUpsertResult, Option<StorageLockFile>> lockUpdateStatus = 
this.storageLockClient.tryUpsertLockFile(
+        newLockData,
+        latestLock.getRight());
+    if (lockUpdateStatus.getLeft() != LockUpsertResult.SUCCESS) {
+      // failed to acquire the lock, indicates concurrent contention
+      logInfoLockState(FAILED_TO_ACQUIRE);
+      return false;
+    }
+    this.setLock(lockUpdateStatus.getRight().get());
+
+    // There is a remote chance that
+    // - after lock is acquired but before heartbeat starts the lock is 
expired.
+    // - lock is acquired and heartbeat is up yet it does not run timely 
before the
+    // lock is expired
+    // It is mitigated by setting the lock validity period to a reasonably long
+    // period to survive until heartbeat comes, plus
+    // set the heartbeat interval relatively small enough.
+    if 
(!this.heartbeatManager.startHeartbeatForThread(Thread.currentThread())) {
+      // Precondition "no active heartbeat" is checked previously, so when
+      // startHeartbeatForThread returns false,
+      // we are confident no heartbeat thread is running.
+      logErrorLockState(RELEASING, "We were unable to start the heartbeat!");
+      tryExpireCurrentLock(false);
+      return false;
+    }
+
+    logInfoLockState(ACQUIRED);
+    return true;
+  }
+
+  /**
+   * Determines whether this provider currently holds a valid lock.
+   *
+   * <p>
+   * This method checks both the existence of a lock object and its validity. A
+   * lock is considered
+   * valid only if it exists and has not expired according to its timestamp.
+   *
+   * @return {@code true} if this provider holds a valid lock, {@code false}
+   *         otherwise
+   */
+  private boolean actuallyHoldsLock() {
+    return believesLockMightBeHeld() && isLockStillValid(getLock());
+  }
+
+  /**
+   * Checks if this provider has a non-null lock object reference.
+   *
+   * <p>
+   * A non-null lock object indicates that this provider has previously
+   * **successfully** acquired a lock via
+   * StorageBasedLockProvider##lock and has not yet **successfully** released
+   * it via StorageBasedLockProvider#unlock().
+   * It is merely an indicator that the lock might be held by this provider. To
+   * truly certify we are the owner of the lock,
+   * StorageBasedLockProvider#actuallyHoldsLock should be used.
+   *
+   * @return {@code true} if this provider has a non-null lock object,
+   *         {@code false} otherwise
+   * @see StorageBasedLockProvider#actuallyHoldsLock()
+   */
+  private boolean believesLockMightBeHeld() {
+    return this.getLock() != null;
+  }
+
+  /**
+   * Unlock by marking our current lock file "expired": true.
+   */
+  @Override
+  public synchronized void unlock() {
+    assertHeartbeatManagerExists();
+    if (!believesLockMightBeHeld()) {
+      return;
+    }
+    boolean believesNoLongerHoldsLock = true;
+
+    // Try to stop the heartbeat first
+    if (heartbeatManager.hasActiveHeartbeat()) {
+      logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId);
+      believesNoLongerHoldsLock &= heartbeatManager.stopHeartbeat(true);
+    }
+
+    // Then expire the current lock.
+    believesNoLongerHoldsLock &= tryExpireCurrentLock(false);
+    if (!believesNoLongerHoldsLock) {
+      throw new 
HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE));
+    }
+  }
+
+  private void assertHeartbeatManagerExists() {
+    if (heartbeatManager == null) {
+      // broken function precondition.
+      throw new HoodieLockException("Unexpected null heartbeatManager");
+    }
+  }
+
+  private void assertUnclosed() {
+    if (this.isClosed) {
+      throw new HoodieLockException("Lock provider already closed");
+    }
+  }
+
+  /**
+   * Tries to expire the currently held lock.
+   * @param fromShutdownHook Whether we are attempting best effort quick 
unlock from shutdown hook.
+   * @return True if we were successfully able to upload an expired lock.
+   */
+  private synchronized boolean tryExpireCurrentLock(boolean fromShutdownHook) {
+    // It does not make sense to have heartbeat alive extending the lock lease 
while
+    // here we are trying
+    // to expire the lock.
+    if (!fromShutdownHook && heartbeatManager.hasActiveHeartbeat()) {
+      // broken function precondition.
+      throw new HoodieLockException("Must stop heartbeat before expire lock 
file");
+    }
+    logDebugLockState(RELEASING);
+    // Upload metadata that will unlock this lock.
+    StorageLockData expiredLockData = new StorageLockData(true, 
this.getLock().getValidUntilMs(), ownerId);
+    Pair<LockUpsertResult, Option<StorageLockFile>> result;
+    result = this.storageLockClient.tryUpsertLockFile(expiredLockData, 
Option.of(this.getLock()));
+    switch (result.getLeft()) {
+      case UNKNOWN_ERROR:
+        // Here we do not know the state of the lock.
+        logErrorLockState(FAILED_TO_RELEASE, "Lock state is unknown.");
+        return false;
+      case SUCCESS:
+        logInfoLockState(RELEASED);
+        setLock(null);
+        return true;
+      case ACQUIRED_BY_OTHERS:
+        // As we are confident no lock is held by itself, clean up the cached 
lock object.
+        // However, this is an edge case, so warn.
+        logWarnLockState(RELEASED, "lock should not have been acquired by 
others.");
+        setLock(null);
+        return true;
+      default:
+        throw new HoodieLockException("Unexpected lock update result: " + 
result.getLeft());
+    }
+  }
+
+  /**
+   * Renews (heartbeats) the current lock if we are the holder, it forcefully 
set
+   * the expiration flag
+   * to false and the lock expiration time to a later time in the future.
+   * @return True if we successfully renewed the lock, false if not.
+   */
+  @VisibleForTesting
+  protected synchronized boolean renewLock() {
+    try {
+      // If we don't hold the lock, no-op.
+      if (!believesLockMightBeHeld()) {
+        logger.warn("Owner {}: Cannot renew, no lock held by this process", 
ownerId);
+        // No need to extend lock lease.
+        return false;
+      }
+
+      long oldExpirationMs = getLock().getValidUntilMs();
+      // Attempt conditional update, extend lock. There are 3 cases:
+      // 1. Happy case: lock has not expired yet, we extend the lease to a 
longer
+      // period.
+      // 2. Corner case 1: lock is expired and is acquired by others, lock 
renewal
+      // failed with ACQUIRED_BY_OTHERS.
+      // 3. Corner case 2: lock is expired but no one has acquired it yet, lock
+      // renewal "revived" the expired lock.
+      // Please note we expect the corner cases almost never happens.
+      // Action taken for corner case 2 is just a best effort mitigation. At 
least it
+      // prevents further data corruption by
+      // letting someone else acquire the lock.
+      Pair<LockUpsertResult, Option<StorageLockFile>> currentLock = 
this.storageLockClient.tryUpsertLockFile(
+          new StorageLockData(false, getCurrentEpochMs() + 
TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId),
+          Option.of(getLock()));
+      switch (currentLock.getLeft()) {
+        case ACQUIRED_BY_OTHERS:
+          logger.error("Owner {}: Unable to renew lock as it is acquired by 
others.", ownerId);
+          // No need to extend lock lease anymore.
+          return false;
+        case UNKNOWN_ERROR:
+          // This could be transient, but unclear, we will let the heartbeat 
continue
+          // normally.
+          // If the next heartbeat run identifies our lock has expired we will 
error out.
+          logger.warn("Owner {}: Unable to renew lock due to unknown error, 
could be transient.", ownerId);
+          // Let heartbeat retry later.
+          return true;
+        case SUCCESS:
+          // Only positive outcome
+          this.setLock(currentLock.getRight().get());
+          logger.info("Owner {}: Lock renewal successful. The renewal 
completes {} ms before expiration for lock {}.",
+              ownerId, oldExpirationMs - getCurrentEpochMs(), lockFilePath);
+          // Let heartbeat continue to renew lock lease again later.
+          return true;
+        default:
+          throw new HoodieLockException("Unexpected lock update result: " + 
currentLock.getLeft());
+      }
+    } catch (Exception e) {
+      logger.error("Owner {}: Exception occurred while renewing lock", 
ownerId, e);
+      return false;
+    }
+  }
+
+  // ---------
+  // Utilities
+  // ---------
+
+  /**
+   * Method to calculate whether a timestamp from a distributed source has
+   * definitively occurred yet.
+   */
+  protected boolean isCurrentTimeCertainlyOlderThanDistributedTime(long 
epochMs) {
+    return getCurrentEpochMs() > epochMs + CLOCK_DRIFT_BUFFER_MS;
+  }
+
+  private String generateLockStateMessage(LockState state) {
+    String threadName = Thread.currentThread().getName();
+    return String.format(
+            "Owner %s: Lock file path %s, Thread %s, Storage based lock state 
%s",
+            ownerId,
+            lockFilePath,
+            threadName,
+            state.toString());
+  }
+
+  private static final String LOCK_STATE_LOGGER_MSG = "Owner {}: Lock file 
path {}, Thread {}, Storage based lock state {}";
+  private static final String LOCK_STATE_LOGGER_MSG_WITH_INFO = "Owner {}: 
Lock file path {}, Thread {}, Storage based lock state {}, {}";
+
+  private void logDebugLockState(LockState state) {
+    logger.debug(LOCK_STATE_LOGGER_MSG, ownerId, lockFilePath, 
Thread.currentThread(), state);
+  }
+
+  private void logInfoLockState(LockState state) {
+    logger.info(LOCK_STATE_LOGGER_MSG, ownerId, lockFilePath, 
Thread.currentThread(), state);
+  }
+
+  private void logInfoLockState(LockState state, String msg) {
+    logger.info(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, 
Thread.currentThread(), state, msg);
+  }
+
+  private void logWarnLockState(LockState state, String msg) {
+    logger.warn(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, 
Thread.currentThread(), state, msg);
+  }
+
+  private void logErrorLockState(LockState state, String msg) {
+    logger.error(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, 
Thread.currentThread(), state, msg);
+  }
+
+  @VisibleForTesting
+  long getCurrentEpochMs() {
+    return System.currentTimeMillis();
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
index be73bd7cb2c0..a91193d5dc05 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
@@ -167,7 +167,7 @@ public class LockProviderHeartbeatManager implements 
HeartbeatManager {
    */
   private static ScheduledExecutorService createThreadScheduler(String 
shortUuid) {
     return Executors.newSingleThreadScheduledExecutor(
-            r -> new Thread(r, "LockProvider-HeartbeatManager-Thread-" + 
shortUuid));
+        r -> new Thread(r, "LockProvider-HeartbeatManager-Thread-" + 
shortUuid));
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java
new file mode 100644
index 000000000000..8f4d4779ad1c
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
+
+public class StorageBasedLockConfig extends HoodieConfig {
+  private static final String SINCE_VERSION_1_0_2 = "1.0.2";
+  private static final String STORAGE_BASED_LOCK_PROPERTY_PREFIX = 
LockConfiguration.LOCK_PREFIX
+      + "storage.";
+
+  public static final ConfigProperty<Long> VALIDITY_TIMEOUT_SECONDS = 
ConfigProperty
+      .key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "validity.timeout.secs")
+      .defaultValue(TimeUnit.MINUTES.toSeconds(5))
+      .markAdvanced()
+      .sinceVersion(SINCE_VERSION_1_0_2)
+      .withDocumentation(
+          "For storage-based lock provider, the amount of time in seconds each 
new lock is valid for. "
+              + "The lock provider will attempt to renew its lock until it 
successfully extends the lock lease period "
+              + "or the validity timeout is reached.");
+
+  public static final ConfigProperty<Long> HEARTBEAT_POLL_SECONDS = 
ConfigProperty
+      .key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "heartbeat.poll.secs")
+      .defaultValue(30L)
+      .markAdvanced()
+      .sinceVersion(SINCE_VERSION_1_0_2)
+      .withDocumentation(
+          "For storage-based lock provider, the amount of time in seconds to 
wait before renewing the lock. "
+              + "Defaults to 30 seconds.");
+
+  public long getValiditySeconds() {
+    return getLong(VALIDITY_TIMEOUT_SECONDS);
+  }
+
+  public long getHeartbeatPollSeconds() {
+    return getLong(HEARTBEAT_POLL_SECONDS);
+  }
+
+  public String getHudiTableBasePath() {
+    return getString(BASE_PATH);
+  }
+
+  public static class Builder {
+    private final StorageBasedLockConfig lockConfig = new 
StorageBasedLockConfig();
+
+    public StorageBasedLockConfig build() {
+      lockConfig.setDefaults(StorageBasedLockConfig.class.getName());
+      return lockConfig;
+    }
+
+    public StorageBasedLockConfig.Builder fromProperties(TypedProperties 
props) {
+      lockConfig.getProps().putAll(props);
+      checkRequiredProps();
+      return this;
+    }
+
+    private void checkRequiredProps() {
+      String notExistsMsg = " does not exist!";
+      if (!lockConfig.contains(BASE_PATH)) {
+        throw new IllegalArgumentException(BASE_PATH.key() + notExistsMsg);
+      }
+      if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) < 
lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS)
+          * 10) {
+        throw new IllegalArgumentException(
+            VALIDITY_TIMEOUT_SECONDS.key() + " should be greater than or equal 
to 10x " + HEARTBEAT_POLL_SECONDS.key());
+      }
+      if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) < 10) {
+        throw new IllegalArgumentException(
+            VALIDITY_TIMEOUT_SECONDS.key() + " should be greater than or equal 
to 10 seconds.");
+      }
+      if (lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS) < 1) {
+        throw new IllegalArgumentException(
+            HEARTBEAT_POLL_SECONDS.key() + " should be greater than or equal 
to 1 second.");
+      }
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
new file mode 100644
index 000000000000..4e95c33371f8
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
@@ -0,0 +1,669 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.StorageBasedLockConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.refEq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test class for StorageBasedLockProvider
+ */
+class TestStorageBasedLockProvider {
+  private StorageBasedLockProvider lockProvider;
+  private StorageLockClient mockLockService;
+  private HeartbeatManager mockHeartbeatManager;
+  private Logger mockLogger;
+  private final String ownerId = UUID.randomUUID().toString();
+  private static final int DEFAULT_LOCK_VALIDITY_MS = 10000;
+
+  @BeforeEach
+  void setupLockProvider() {
+    mockLockService = mock(StorageLockClient.class);
+    mockHeartbeatManager = mock(HeartbeatManager.class);
+    mockLogger = mock(Logger.class);
+    when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+    TypedProperties props = new TypedProperties();
+    props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "10");
+    props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
+    props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-default");
+
+    lockProvider = spy(new StorageBasedLockProvider(
+        ownerId,
+        props,
+        (a,b,c) -> mockHeartbeatManager,
+        (a,b,c) -> mockLockService,
+        mockLogger));
+  }
+
+  @AfterEach
+  void cleanupLockProvider() {
+    lockProvider.close();
+  }
+
+  @Test
+  void testValidLockStorageLocation() {
+    TypedProperties props = new TypedProperties();
+    props.put(BASE_PATH.key(), "s3://bucket/lake/db/tbl-default");
+
+    LockConfiguration lockConf = new LockConfiguration(props);
+    StorageConfiguration<?> storageConf = 
HoodieTestUtils.getDefaultStorageConf();
+
+    HoodieLockException ex = assertThrows(HoodieLockException.class,
+        () -> new StorageBasedLockProvider(lockConf, storageConf));
+    assertTrue(ex.getMessage().contains("Failed to load and initialize 
StorageLock"));
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = { "gs://bucket/lake/db/tbl-default", 
"s3://bucket/lake/db/tbl-default",
+      "s3a://bucket/lake/db/tbl-default" })
+  void testNonExistentWriteServiceWithDefaults(String tableBasePathString) {
+    TypedProperties props = new TypedProperties();
+    props.put(BASE_PATH.key(), tableBasePathString);
+
+    LockConfiguration lockConf = new LockConfiguration(props);
+    StorageConfiguration<?> storageConf = 
HoodieTestUtils.getDefaultStorageConf();
+
+    HoodieLockException ex = assertThrows(HoodieLockException.class,
+        () -> new StorageBasedLockProvider(lockConf, storageConf));
+    assertTrue(ex.getMessage().contains("Failed to load and initialize 
StorageLock"));
+  }
+
+  @Test
+  void testTryLockForTimeUnitThrowsOnInterrupt() throws Exception {
+    doReturn(false).when(lockProvider).tryLock();
+    CountDownLatch latch = new CountDownLatch(1);
+    Thread t = new Thread(() -> {
+      try {
+        lockProvider.tryLock(1, TimeUnit.SECONDS);
+      } catch (HoodieLockException e) {
+        latch.countDown();
+      }
+    });
+    t.start();
+    Thread.sleep(50);
+    t.interrupt();
+    assertTrue(latch.await(2, TimeUnit.SECONDS));
+  }
+
+  @Test
+  void testTryLockForTimeUnitAcquiresLockEventually() throws Exception {
+    AtomicInteger count = new AtomicInteger(0);
+    doAnswer(inv -> count.incrementAndGet() > 2).when(lockProvider).tryLock();
+    CountDownLatch latch = new CountDownLatch(1);
+    Thread t = new Thread(() -> {
+      assertTrue(lockProvider.tryLock(4, TimeUnit.SECONDS));
+      latch.countDown();
+    });
+    t.start();
+    assertTrue(latch.await(5, TimeUnit.SECONDS));
+  }
+
+  @Test
+  void testTryLockForTimeUnitFailsToAcquireLockEventually() throws Exception {
+    AtomicInteger count = new AtomicInteger(0);
+    doAnswer(inv -> count.incrementAndGet() > 2).when(lockProvider).tryLock();
+    CountDownLatch latch = new CountDownLatch(1);
+    Thread t = new Thread(() -> {
+      assertFalse(lockProvider.tryLock(1, TimeUnit.SECONDS));
+      latch.countDown();
+    });
+    t.start();
+    assertTrue(latch.await(2, TimeUnit.SECONDS));
+  }
+
+  @Test
+  void testTryLockSuccess() {
+    long t0 = 1_000L;
+    when(lockProvider.getCurrentEpochMs())
+        .thenReturn(t0);
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    StorageLockData data = new StorageLockData(false, t0 + 
DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryUpsertLockFile(refEq(data), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+    boolean acquired = lockProvider.tryLock();
+    assertTrue(acquired);
+    assertEquals(realLockFile, lockProvider.getLock());
+    verify(mockLockService, atLeastOnce()).tryUpsertLockFile(any(), any());
+  }
+
+  @Test
+  void testTryLockSuccessButFailureToStartHeartbeat() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+    
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(false);
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile))))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+
+    boolean acquired = lockProvider.tryLock();
+    assertFalse(acquired);
+  }
+
+  @Test
+  void testTryLockFailsFromOwnerMismatch() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    StorageLockFile returnedLockFile = new StorageLockFile(
+        new StorageLockData(false, System.currentTimeMillis() + 
DEFAULT_LOCK_VALIDITY_MS, "different-owner"), "v1");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(returnedLockFile)));
+
+    HoodieLockException ex = assertThrows(HoodieLockException.class, () -> 
lockProvider.tryLock());
+    assertTrue(ex.getMessage().contains("Owners do not match"));
+  }
+
+  @Test
+  void testTryLockFailsDueToExistingLock() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS,
+        "other-owner");
+    StorageLockFile existingLock = new StorageLockFile(data, "v2");
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS,
 Option.of(existingLock)));
+
+    boolean acquired = lockProvider.tryLock();
+    assertFalse(acquired);
+  }
+
+  @Test
+  void testTryLockFailsToUpdateFile() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, 
Option.empty()));
+    assertFalse(lockProvider.tryLock());
+  }
+
+  @Test
+  void testTryLockFailsDueToUnknownState() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.UNKNOWN_ERROR,
 Option.empty()));
+    assertFalse(lockProvider.tryLock());
+  }
+
+  @Test
+  void testTryLockSucceedsWhenExistingLockExpiredByTime() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
+        "other-owner");
+    StorageLockFile existingLock = new StorageLockFile(data, "v2");
+    StorageLockData newData = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS,
+        ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(newData, "v1");
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS,
 Option.of(existingLock)));
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(existingLock))))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+    boolean acquired = lockProvider.tryLock();
+    assertTrue(acquired);
+  }
+
+  @Test
+  void testTryLockReentrancySucceeds() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+    boolean acquired = lockProvider.tryLock();
+    assertTrue(acquired);
+    // Re-entrancy succeeds
+    assertTrue(lockProvider.tryLock());
+  }
+
+  @Test
+  void testTryLockReentrancyAfterLockExpiredByTime() {
+    // In an extremely unlikely scenario, we could have a local reference to a 
lock
+    // which is present but expired,
+    // and because we were unable to stop the heartbeat properly, we did not
+    // successfully set it to null.
+    // Due to the nature of the heartbeat manager, this is expected to 
introduce
+    // some delay, but not be permanently blocking.
+    // There are a few variations of this edge case, so we must test them all.
+
+    // Here the lock is still "unexpired" but the time shows expired.
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile expiredLock = new StorageLockFile(data, "v1");
+    doReturn(expiredLock).when(lockProvider).getLock();
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    StorageLockData validData = new StorageLockData(false, 
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
+        ownerId);
+    StorageLockFile validLock = new StorageLockFile(validData, "v2");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(validLock)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+    assertTrue(lockProvider.tryLock());
+  }
+
+  @Test
+  void testTryLockReentrancyAfterLockSetExpired() {
+    // In an extremely unlikely scenario, we could have a local reference to a 
lock
+    // which is present but expired,
+    // and because we were unable to stop the heartbeat properly, we did not
+    // successfully set it to null.
+    // Due to the nature of the heartbeat manager, this is expected to 
introduce
+    // some delay, but not be permanently blocking.
+    // There are a few variations of this edge case, so we must test them all.
+
+    // Here the lock is "expired" but the time shows unexpired.
+    StorageLockData data = new StorageLockData(true, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile expiredLock = new StorageLockFile(data, "v1");
+    doReturn(expiredLock).when(lockProvider).getLock();
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    StorageLockData validData = new StorageLockData(false, 
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
+        ownerId);
+    StorageLockFile validLock = new StorageLockFile(validData, "v2");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(validLock)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+    assertTrue(lockProvider.tryLock());
+  }
+
+  @Test
+  void testTryLockHeartbeatStillActive() {
+    // In an extremely unlikely scenario, we could have a local reference to a 
lock
+    // which is present but expired,
+    // and because we were unable to stop the heartbeat properly, we did not
+    // successfully set it to null.
+    // Due to the nature of the heartbeat manager, this is expected to 
introduce
+    // some delay, but not be permanently blocking.
+    // There are a few variations of this edge case, so we must test them all.
+
+    // Here the heartbeat is still active, so we have to error out.
+    StorageLockData data = new StorageLockData(true, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile expiredLock = new StorageLockFile(data, "v1");
+    doReturn(expiredLock).when(lockProvider).getLock();
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+    assertThrows(HoodieLockException.class, () -> lockProvider.tryLock());
+  }
+
+  @Test
+  void testUnlockSucceedsAndReentrancy() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+    when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile))))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
+            Option.of(new StorageLockFile(new StorageLockData(true, 
data.getValidUntil(), ownerId), "v2"))));
+    assertTrue(lockProvider.tryLock());
+    when(mockHeartbeatManager.hasActiveHeartbeat())
+        .thenReturn(true) // when we try to stop the heartbeat we will check 
if heartbeat is active return true.
+        .thenReturn(false); // when try to set lock to expire we will assert 
no active heartbeat as a precondition.
+    lockProvider.unlock();
+    assertNull(lockProvider.getLock());
+    lockProvider.unlock();
+  }
+
+  @Test
+  void testUnlockFailsToStopHeartbeat() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+    assertTrue(lockProvider.tryLock());
+    when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(false);
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+    assertThrows(HoodieLockException.class, () -> lockProvider.unlock());
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+  }
+
+  @Test
+  void testCloseFailsToStopHeartbeat() {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+    assertTrue(lockProvider.tryLock());
+    when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(false);
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+    // Should wrap the exception and log error.
+    lockProvider.close();
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+  }
+
+  @Test
+  void testRenewLockReturnsFalseWhenNoLockHeld() {
+    doReturn(null).when(lockProvider).getLock();
+    assertFalse(lockProvider.renewLock());
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+    verify(mockLogger).warn("Owner {}: Cannot renew, no lock held by this 
process", this.ownerId);
+  }
+
+  @Test
+  void testRenewLockWithoutHoldingLock() {
+    doReturn(null).when(lockProvider).getLock();
+    assertFalse(lockProvider.renewLock());
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+    verify(mockLogger).warn("Owner {}: Cannot renew, no lock held by this 
process", this.ownerId);
+  }
+
+  @Test
+  void testRenewLockWithFullyExpiredLock() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile nearExpiredLockFile = new StorageLockFile(data, "v1");
+    doReturn(nearExpiredLockFile).when(lockProvider).getLock();
+    when(mockLockService.tryUpsertLockFile(any(), 
eq(Option.of(nearExpiredLockFile))))
+        .thenReturn(Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, null));
+    assertFalse(lockProvider.renewLock());
+    verify(mockLogger).error("Owner {}: Unable to renew lock as it is acquired 
by others.", this.ownerId);
+  }
+
+  @Test
+  void testRenewLockUnableToUpsertLockFileButNotFatal() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    doReturn(lockFile).when(lockProvider).getLock();
+    // Signal the upsert attempt failed, but may be transient. See interface 
for
+    // more details.
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
+        .thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()));
+    assertTrue(lockProvider.renewLock());
+  }
+
+  @Test
+  void testRenewLockUnableToUpsertLockFileFatal() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    doReturn(lockFile).when(lockProvider).getLock();
+    // Signal the upsert attempt failed, but may be transient. See interface 
for
+    // more details.
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
+        .thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()));
+    // renewLock return true so it will be retried.
+    assertTrue(lockProvider.renewLock());
+
+    verify(mockLogger).warn("Owner {}: Unable to renew lock due to unknown 
error, could be transient.", this.ownerId);
+  }
+
+  @Test
+  void testRenewLockSucceedsButRenewalWithinExpirationWindow() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    doReturn(lockFile).when(lockProvider).getLock();
+
+    StorageLockData nearExpirationData = new StorageLockData(false, 
System.currentTimeMillis(), ownerId);
+    StorageLockFile lockFileNearExpiration = new 
StorageLockFile(nearExpirationData, "v2");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(lockFileNearExpiration)));
+
+    // We used to fail in this case before, but since we are only modifying a 
single
+    // lock file, this is ok now.
+    // Therefore, this can be a happy path variation.
+    assertTrue(lockProvider.renewLock());
+  }
+
+  @Test
+  void testRenewLockSucceeds() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    doReturn(lockFile).when(lockProvider).getLock();
+
+    StorageLockData successData = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS,
+        ownerId);
+    StorageLockFile successLockFile = new StorageLockFile(successData, "v2");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(successLockFile)));
+    assertTrue(lockProvider.renewLock());
+
+    verify(mockLogger).info(
+        eq("Owner {}: Lock renewal successful. The renewal completes {} ms 
before expiration for lock {}."),
+        eq(this.ownerId), anyLong(), 
eq("gs://bucket/lake/db/tbl-default/.hoodie/.locks/table_lock.json"));
+  }
+
+  @Test
+  void testRenewLockFails() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    doReturn(lockFile).when(lockProvider).getLock();
+
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
+        .thenThrow(new RuntimeException("Failure"));
+    assertFalse(lockProvider.renewLock());
+
+    verify(mockLogger).error(eq("Owner {}: Exception occurred while renewing 
lock"), eq(ownerId),
+        any(RuntimeException.class));
+  }
+
+  @Test
+  void testCloseCallsDependencies() throws Exception {
+    lockProvider.close();
+    verify(mockLockService, atLeastOnce()).close();
+    verify(mockHeartbeatManager, atLeastOnce()).close();
+    assertNull(lockProvider.getLock());
+  }
+
+  @Test
+  void testCloseWithErrorForLockService() throws Exception {
+    doThrow(new RuntimeException("Some 
failure")).when(mockLockService).close();
+    lockProvider.close();
+    verify(mockLogger).error(eq("Owner {}: Lock service failed to close."), 
eq(ownerId), any(RuntimeException.class));
+    assertNull(lockProvider.getLock());
+  }
+
+  @Test
+  void testCloseWithErrorForHeartbeatManager() throws Exception {
+    doThrow(new RuntimeException("Some 
failure")).when(mockHeartbeatManager).close();
+    lockProvider.close();
+    verify(mockLogger).error(eq("Owner {}: Heartbeat manager failed to 
close."), eq(ownerId),
+        any(RuntimeException.class));
+    assertNull(lockProvider.getLock());
+  }
+
+  @Test
+  public void testShutdownHookViaReflection() throws Exception {
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+            .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+    boolean acquired = lockProvider.tryLock();
+    assertTrue(acquired);
+    assertEquals(realLockFile, lockProvider.getLock());
+    verify(mockLockService, atLeastOnce()).tryUpsertLockFile(any(), any());
+
+    when(mockLockService.tryUpsertLockFile(any(StorageLockData.class), 
eq(Option.of(realLockFile))))
+            .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+
+    // Mock shutdown
+    lockProvider.shutdown(true);
+
+    // Verify that the expected shutdown behaviors occurred.
+    assertNull(lockProvider.getLock());
+    // We do not execute additional actions
+    verify(mockLockService, never()).close();
+    verify(mockHeartbeatManager, never()).close();
+  }
+
+  @Test
+  public void testShutdownHookWhenNoLockPresent() throws Exception {
+    // Now, when calling shutdown(true), the method should immediately return.
+    lockProvider.shutdown(true);
+
+    // Verify that unlock or close methods are NOT invoked, or adjust 
expectations accordingly.
+    verify(mockLockService, never()).close();
+    verify(mockHeartbeatManager, never()).close();
+  }
+
+  @Test
+  public void testShutdownHookFailsToBeRemoved() throws Exception {
+    doThrow(new IllegalStateException("Shutdown already in 
progress")).when(lockProvider).tryRemoveShutdownHook();
+    lockProvider.close();
+    verify(mockLockService, atLeastOnce()).close();
+    verify(mockHeartbeatManager, atLeastOnce()).close();
+    assertNull(lockProvider.getLock());
+  }
+
+  @Test
+  void testShutdownHookFiresDuringTryLockWithTimeout() throws Exception {
+    // This test simulates the scenario where the shutdown hook fires while 
tryLock(long time, TimeUnit unit) 
+    // is in progress, and expects that HoodieLockException is thrown when 
tryLock is called after shutdown.
+
+    // Setup mocks to simulate lock being held by another owner (to keep 
tryLock looping)
+    StorageLockData otherOwnerData = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, "other-owner");
+    StorageLockFile otherOwnerLock = new StorageLockFile(otherOwnerData, "v1");
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS,
 Option.of(otherOwnerLock)));
+
+    CountDownLatch tryLockStarted = new CountDownLatch(1);
+    CountDownLatch proceedWithShutdown = new CountDownLatch(1);
+    CountDownLatch shutdownCompleted = new CountDownLatch(1);
+    CountDownLatch tryLockCompleted = new CountDownLatch(1);
+    CountDownLatch exceptionThrown = new CountDownLatch(1);
+
+    // Spy on the real tryLock to know when it's been called and coordinate 
with shutdown
+    AtomicInteger tryLockCallCount = new AtomicInteger(0);
+    doAnswer(inv -> {
+      int count = tryLockCallCount.incrementAndGet();
+      if (count == 1) {
+        // First call - signal that tryLock has started
+        tryLockStarted.countDown();
+        // Wait for shutdown to be triggered
+        assertTrue(proceedWithShutdown.await(2, TimeUnit.SECONDS));
+      } else {
+        // Subsequent calls - wait briefly for shutdown to complete
+        assertTrue(shutdownCompleted.await(100, TimeUnit.MILLISECONDS));
+      }
+      // Call the real method
+      return inv.callRealMethod();
+    }).when(lockProvider).tryLock();
+
+    // Start a thread that will call tryLock with timeout
+    Thread tryLockThread = new Thread(() -> {
+      try {
+        lockProvider.tryLock(2, TimeUnit.SECONDS);
+        // Should not reach here - exception should be thrown after shutdown
+        fail("Should have thrown HoodieLockException after shutdown");
+      } catch (HoodieLockException e) {
+        // Expected - tryLock should throw exception after shutdown
+        exceptionThrown.countDown();
+      } finally {
+        tryLockCompleted.countDown();
+      }
+    });
+
+    tryLockThread.start();
+
+    // Wait for tryLock to start
+    assertTrue(tryLockStarted.await(2, TimeUnit.SECONDS), "tryLock should have 
started");
+
+    // Now invoke the shutdown hook while tryLock is in progress
+    // Invoke shutdown in a separate thread to simulate shutdown hook
+    Thread shutdownThread = new Thread(() -> {
+      proceedWithShutdown.countDown();  // Signal tryLock to proceed
+      lockProvider.shutdown(true);
+      shutdownCompleted.countDown();  // Signal that shutdown is complete
+    });
+    shutdownThread.start();
+
+    // Wait for both operations to complete
+    assertTrue(tryLockCompleted.await(5, TimeUnit.SECONDS), "tryLock should 
complete");
+    assertTrue(exceptionThrown.await(1, TimeUnit.SECONDS), 
"HoodieLockException should have been thrown");
+    shutdownThread.join(2000);
+
+    // Verify the state after shutdown
+    // The lock should be null after shutdown
+    assertNull(lockProvider.getLock(), "Lock should be null after shutdown 
hook fires");
+
+    // Verify that tryLock was called at least once
+    verify(lockProvider, atLeastOnce()).tryLock();
+  }
+
+  public static class StubStorageLockClient implements StorageLockClient {
+    public StubStorageLockClient(String ownerId, String lockFileUri, 
Properties props) {
+      assertTrue(lockFileUri.endsWith("table_lock.json"));
+    }
+
+    @Override
+    public Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile(
+        StorageLockData newLockData,
+        Option<StorageLockFile> previousLockFile) {
+      return null;
+    }
+
+    @Override
+    public Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile() {
+      return null;
+    }
+
+    @Override
+    public void close() throws Exception {
+      // stub, no-op
+    }
+  }
+
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java
index cdeca536b3ae..234aebe70b76 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java
@@ -156,13 +156,13 @@ public class TestLockProviderHeartbeatManager {
     when(semaphore.tryAcquire()).thenReturn(false);
     when(semaphore.tryAcquire(eq(DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS), 
eq(TimeUnit.MILLISECONDS))).thenReturn(true);
     manager = new LockProviderHeartbeatManager(
-            LOGGER_ID,
-            mockScheduler,
-            100L,
-            DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS,
-            () -> true,
-            semaphore,
-            mockLogger);
+        LOGGER_ID,
+        mockScheduler,
+        100L,
+        DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS,
+        () -> true,
+        semaphore,
+        mockLogger);
     assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
     t.get().start();
     assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java
new file mode 100644
index 000000000000..4855b6505877
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestStorageBasedLockConfig {
+
+  @Test
+  void testDefaultValues() {
+    // Asserts that the defaults are correct
+    TypedProperties props = new TypedProperties();
+    props.setProperty(BASE_PATH.key(), "s3://hudi-bucket/table/basepath");
+
+    StorageBasedLockConfig.Builder builder = new 
StorageBasedLockConfig.Builder();
+    StorageBasedLockConfig config = builder
+        .fromProperties(props)
+        .build();
+
+    assertEquals(5 * 60, config.getValiditySeconds(), "Default lock validity 
should be 5 minutes");
+    assertEquals(30, config.getHeartbeatPollSeconds(), "Default heartbeat poll 
time should be 30 seconds");
+  }
+
+  @Test
+  void testCustomValues() {
+    // Testing that custom values which differ from defaults can be read 
properly
+    TypedProperties props = new TypedProperties();
+    props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), 
"120");
+    props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), 
"10");
+    props.setProperty(BASE_PATH.key(), "/hudi/table/basepath");
+
+    StorageBasedLockConfig config = new StorageBasedLockConfig.Builder()
+        .fromProperties(props)
+        .build();
+
+    assertEquals(120, config.getValiditySeconds());
+    assertEquals(10, config.getHeartbeatPollSeconds());
+    assertEquals("/hudi/table/basepath", config.getHudiTableBasePath());
+  }
+
+  @Test
+  void testBasePathPropertiesValidation() {
+    // Tests that validations around the base path are present.
+    TypedProperties props = new TypedProperties();
+    props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), 
"120");
+    StorageBasedLockConfig.Builder propsBuilder = new 
StorageBasedLockConfig.Builder();
+
+    // Missing base path
+    IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
+        () -> propsBuilder.fromProperties(props));
+    assertTrue(exception.getMessage().contains(BASE_PATH.key()));
+  }
+
+  @Test
+  void testTimeThresholds() {
+    // Ensure that validations which restrict the time-based inputs are 
working.
+    TypedProperties props = new TypedProperties();
+    props.setProperty(BASE_PATH.key(), "/hudi/table/basepath");
+    // Invalid config case: validity timeout is less than 10x of heartbeat 
poll period
+    props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), 
"5");
+    props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), 
"3");
+    StorageBasedLockConfig.Builder propsBuilder = new 
StorageBasedLockConfig.Builder();
+
+    IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
+        () -> propsBuilder.fromProperties(props));
+    
assertTrue(exception.getMessage().contains(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key()));
+    // Invalid config case: validity timeout is less than 10 seconds
+    props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), 
"9");
+    props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), 
"1");
+    exception = assertThrows(IllegalArgumentException.class, () -> 
propsBuilder.fromProperties(props));
+    
assertTrue(exception.getMessage().contains(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key()));
+    // Invalid config case: heartbeat poll period is less than 1 second
+    props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), 
"10");
+    props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), 
"0");
+    exception = assertThrows(IllegalArgumentException.class, () -> 
propsBuilder.fromProperties(props));
+    
assertTrue(exception.getMessage().contains(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key()));
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 2c3e2b515bbb..3102e5e1e53f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -97,6 +97,8 @@ public class HoodieTableMetaClient implements Serializable {
   public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH =
       BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + StoragePath.SEPARATOR + ".fileids";
 
+  public static final String LOCKS_FOLDER_NAME = METAFOLDER_NAME + 
StoragePath.SEPARATOR + ".locks";
+
   public static final String SCHEMA_FOLDER_NAME = ".schema";
 
   public static final String MARKER_EXTN = ".marker";
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
index 1b1d32e4ac37..aa4932c0cc6c 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
@@ -84,6 +84,15 @@ public class TestStorageSchemes {
     assertThrows(IllegalArgumentException.class, () -> {
       StorageSchemes.isAppendSupported("s2");
     }, "Should throw exception for unsupported schemes");
+
+    for (StorageSchemes scheme : StorageSchemes.values()) {
+      String schemeName = scheme.getScheme();
+      if (scheme.getScheme().startsWith("s3") || 
scheme.getScheme().startsWith("gs")) {
+        
assertTrue(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent());
+      } else {
+        
assertFalse(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent());
+      }
+    }
   }
 
   @Test
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
index 129956166b3a..68c0a068188e 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
@@ -21,68 +21,71 @@ package org.apache.hudi.storage;
 
 import java.util.Arrays;
 
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+
 /**
  * All the supported storage schemes in Hoodie.
  */
 public enum StorageSchemes {
   // Local filesystem
-  FILE("file", false, false, true, true),
+  FILE("file", false, false, true, true, null),
   // Hadoop File System
-  HDFS("hdfs", true, false, true, false),
+  HDFS("hdfs", true, false, true, false, null),
   // Baidu Advanced File System
-  AFS("afs", true, null, null, null),
+  AFS("afs", true, null, null, null, null),
   // Mapr File System
-  MAPRFS("maprfs", true, null, null, null),
+  MAPRFS("maprfs", true, null, null, null, null),
   // Apache Ignite FS
-  IGNITE("igfs", true, null, null, null),
+  IGNITE("igfs", true, null, null, null, null),
   // AWS S3
-  S3A("s3a", false, true, null, true),
-  S3("s3", false, true, null, true),
+  S3A("s3a", false, true, null, true, 
"org.apache.hudi.aws.transaction.lock.S3StorageLockClient"),
+  S3("s3", false, true, null, true, 
"org.apache.hudi.aws.transaction.lock.S3StorageLockClient"),
   // Google Cloud Storage
-  GCS("gs", false, true, null, true),
+  GCS("gs", false, true, null, true, 
"org.apache.hudi.gcp.transaction.lock.GCSStorageLockClient"),
   // Azure WASB
-  WASB("wasb", false, null, null, null), WASBS("wasbs", false, null, null, 
null),
+  WASB("wasb", false, null, null, null, null), WASBS("wasbs", false, null, 
null, null, null),
   // Azure ADLS
-  ADL("adl", false, null, null, null),
+  ADL("adl", false, null, null, null, null),
   // Azure ADLS Gen2
-  ABFS("abfs", false, null, null, null), ABFSS("abfss", false, null, null, 
null),
+  ABFS("abfs", false, null, null, null, null), ABFSS("abfss", false, null, 
null, null, null),
   // Aliyun OSS
-  OSS("oss", false, null, null, null),
+  OSS("oss", false, null, null, null, null),
   // View FS for federated setups. If federating across cloud stores, then 
append support is false
   // View FS support atomic creation
-  VIEWFS("viewfs", true, null, true, null),
+  VIEWFS("viewfs", true, null, true, null, null),
   //ALLUXIO
-  ALLUXIO("alluxio", false, null, null, null),
+  ALLUXIO("alluxio", false, null, null, null, null),
   // Tencent Cloud Object Storage
-  COSN("cosn", false, null, null, null),
+  COSN("cosn", false, null, null, null, null),
   // Tencent Cloud HDFS
-  CHDFS("ofs", true, null, null, null),
+  CHDFS("ofs", true, null, null, null, null),
   // Tencent Cloud CacheFileSystem
-  GOOSEFS("gfs", false, null, null, null),
+  GOOSEFS("gfs", false, null, null, null, null),
   // Databricks file system
-  DBFS("dbfs", false, null, null, null),
+  DBFS("dbfs", false, null, null, null, null),
   // IBM Cloud Object Storage
-  COS("cos", false, null, null, null),
+  COS("cos", false, null, null, null, null),
   // Huawei Cloud Object Storage
-  OBS("obs", false, null, null, null),
+  OBS("obs", false, null, null, null, null),
   // Kingsoft Standard Storage ks3
-  KS3("ks3", false, null, null, null),
+  KS3("ks3", false, null, null, null, null),
   // Netease Object Storage nos
-  NOS("nos", false, null, null, null),
+  NOS("nos", false, null, null, null, null),
   // JuiceFileSystem
-  JFS("jfs", true, null, null, null),
+  JFS("jfs", true, null, null, null, null),
   // Baidu Object Storage
-  BOS("bos", false, null, null, null),
+  BOS("bos", false, null, null, null, null),
   // Oracle Cloud Infrastructure Object Storage
-  OCI("oci", false, null, null, null),
+  OCI("oci", false, null, null, null, null),
   // Volcengine Object Storage
-  TOS("tos", false, null, null, null),
+  TOS("tos", false, null, null, null, null),
   // Volcengine Cloud HDFS
-  CFS("cfs", true, null, null, null),
+  CFS("cfs", true, null, null, null, null),
   // Aliyun Apsara File Storage for HDFS
-  DFS("dfs", true, false, true, null),
+  DFS("dfs", true, false, true, null, null),
   // Hopsworks File System
-  HOPSFS("hopsfs", false, false, true, null);
+  HOPSFS("hopsfs", false, false, true, null, null);
 
   private String scheme;
   private boolean supportsAppend;
@@ -94,13 +97,15 @@ public enum StorageSchemes {
   // when we want to get only part of files under a directory rather than all 
files, use getStatus may be more friendly than listStatus.
   // here is a trade-off between rpc times and throughput of storage meta 
service
   private Boolean listStatusFriendly;
+  private String storageLockClass;
 
-  StorageSchemes(String scheme, boolean supportsAppend, Boolean 
isWriteTransactional, Boolean supportAtomicCreation, Boolean 
listStatusFriendly) {
+  StorageSchemes(String scheme, boolean supportsAppend, Boolean 
isWriteTransactional, Boolean supportAtomicCreation, Boolean 
listStatusFriendly, String storageLockClass) {
     this.scheme = scheme;
     this.supportsAppend = supportsAppend;
     this.isWriteTransactional = isWriteTransactional;
     this.supportAtomicCreation = supportAtomicCreation;
     this.listStatusFriendly = listStatusFriendly;
+    this.storageLockClass = storageLockClass;
   }
 
   public String getScheme() {
@@ -123,6 +128,14 @@ public enum StorageSchemes {
     return listStatusFriendly != null && listStatusFriendly;
   }
 
+  public boolean implementsStorageLock() {
+    return !StringUtils.isNullOrEmpty(storageLockClass);
+  }
+
+  public String getStorageLockClass() {
+    return storageLockClass;
+  }
+
   public static boolean isSchemeSupported(String scheme) {
     return Arrays.stream(values()).anyMatch(s -> s.getScheme().equals(scheme));
   }
@@ -155,4 +168,12 @@ public enum StorageSchemes {
     }
     return Arrays.stream(StorageSchemes.values()).anyMatch(s -> 
s.getListStatusFriendly() && s.scheme.equals(scheme));
   }
+
+  public static Option<StorageSchemes> 
getStorageLockImplementationIfExists(String scheme) {
+    if (!isSchemeSupported(scheme)) {
+      throw new IllegalArgumentException("Unsupported scheme :" + scheme);
+    }
+    return Option.fromJavaOptional(Arrays.stream(StorageSchemes.values())
+        .filter(s -> s.implementsStorageLock() && 
s.scheme.equals(scheme)).findFirst());
+  }
 }

Reply via email to