This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 949017eda3d1 [HUDI-9158] Add storage based lock provider abstract
implementation to 0.x branch (#13712)
949017eda3d1 is described below
commit 949017eda3d158833088a98736eb1115fa9a060c
Author: Alex R <[email protected]>
AuthorDate: Tue Aug 12 15:14:09 2025 -0700
[HUDI-9158] Add storage based lock provider abstract implementation to 0.x
branch (#13712)
---
.../transaction/lock/StorageBasedLockProvider.java | 545 +++++++++++++++++
.../lock/models/LockProviderHeartbeatManager.java | 2 +-
.../apache/hudi/config/StorageBasedLockConfig.java | 100 +++
.../lock/TestStorageBasedLockProvider.java | 669 +++++++++++++++++++++
.../models/TestLockProviderHeartbeatManager.java | 14 +-
.../hudi/config/TestStorageBasedLockConfig.java | 101 ++++
.../hudi/common/table/HoodieTableMetaClient.java | 2 +
.../apache/hudi/common/fs/TestStorageSchemes.java | 9 +
.../org/apache/hudi/storage/StorageSchemes.java | 81 ++-
9 files changed, 1485 insertions(+), 38 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
new file mode 100644
index 000000000000..00f9bcd74742
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import
org.apache.hudi.client.transaction.lock.models.LockProviderHeartbeatManager;
+import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.StorageBasedLockConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StorageSchemes;
+
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.net.URI;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
+import static org.apache.hudi.common.lock.LockState.ACQUIRED;
+import static org.apache.hudi.common.lock.LockState.ACQUIRING;
+import static org.apache.hudi.common.lock.LockState.FAILED_TO_ACQUIRE;
+import static org.apache.hudi.common.lock.LockState.FAILED_TO_RELEASE;
+import static org.apache.hudi.common.lock.LockState.RELEASED;
+import static org.apache.hudi.common.lock.LockState.RELEASING;
+import static
org.apache.hudi.common.table.HoodieTableMetaClient.LOCKS_FOLDER_NAME;
+
+/**
+ * A distributed filesystem storage based lock provider. This {@link
LockProvider} implementation
+ * leverages conditional writes to ensure transactional consistency for
multi-writer scenarios.
+ * The underlying storage client interface {@link StorageLockClient} is
pluggable so it can be implemented for any
+ * filesystem which supports conditional writes.
+ */
+@ThreadSafe
+public class StorageBasedLockProvider implements LockProvider<StorageLockFile>
{
+
+ public static final String DEFAULT_TABLE_LOCK_FILE_NAME = "table_lock.json";
+ // How long to wait before retrying lock acquisition in blocking calls.
+ // Maximum expected clock drift between two nodes.
+ // This is similar idea as SkewAdjustingTimeGenerator.
+ // In reality, within a single cloud provider all nodes share the same ntp
+ // server
+ // therefore we do not expect drift more than a few ms.
+ // However, since our lock leases are pretty long, we can use a high buffer.
+ private static final long CLOCK_DRIFT_BUFFER_MS = 500;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StorageBasedLockProvider.class);
+
+ // Use for testing
+ private final Logger logger;
+
+ // The lock service implementation which interacts with storage
+ private final StorageLockClient storageLockClient;
+
+ private final long lockValiditySecs;
+ private final String ownerId;
+ private final String lockFilePath;
+ private final HeartbeatManager heartbeatManager;
+ private final transient Thread shutdownThread;
+
+ @GuardedBy("this")
+ private StorageLockFile currentLockObj = null;
+ @GuardedBy("this")
+ private boolean isClosed = false;
+
+ private synchronized void setLock(StorageLockFile lockObj) {
+ if (lockObj != null && !Objects.equals(lockObj.getOwner(), this.ownerId)) {
+ throw new HoodieLockException("Owners do not match. Current lock owner:
" + this.ownerId + " lock path: "
+ + this.lockFilePath + " owner: " + lockObj.getOwner());
+ }
+ this.currentLockObj = lockObj;
+ }
+
+ /**
+ * Default constructor for StorageBasedLockProvider, required by LockManager
+ * to instantiate it using reflection.
+ *
+ * @param lockConfiguration The lock configuration, should be transformable
into
+ * StorageBasedLockConfig
+ * @param conf Storage config, ignored.
+ */
+ public StorageBasedLockProvider(final LockConfiguration lockConfiguration,
final StorageConfiguration<?> conf) {
+ this(
+ UUID.randomUUID().toString(),
+ lockConfiguration.getConfig(),
+ LockProviderHeartbeatManager::new,
+ getStorageLockClientClassName(),
+ LOGGER);
+ }
+
+ private static Functions.Function3<String, String, TypedProperties,
StorageLockClient> getStorageLockClientClassName() {
+ return (ownerId, lockFilePath, lockConfig) -> {
+ try {
+ return (StorageLockClient) ReflectionUtils.loadClass(
+ getLockServiceClassName(new URI(lockFilePath).getScheme()),
+ new Class<?>[]{String.class, String.class, Properties.class},
+ new Object[]{ownerId, lockFilePath, lockConfig});
+ } catch (Throwable e) {
+ throw new HoodieLockException("Failed to load and initialize
StorageLock", e);
+ }
+ };
+ }
+
+ private static @NotNull String getLockServiceClassName(String scheme) {
+ Option<StorageSchemes> schemeOptional =
StorageSchemes.getStorageLockImplementationIfExists(scheme);
+ if (schemeOptional.isPresent()) {
+ return schemeOptional.get().getStorageLockClass();
+ } else {
+ throw new HoodieNotSupportedException("No implementation of StorageLock
supports this scheme: " + scheme);
+ }
+ }
+
+ @VisibleForTesting
+ StorageBasedLockProvider(
+ String ownerId,
+ TypedProperties properties,
+ Functions.Function3<String, Long, Supplier<Boolean>, HeartbeatManager>
heartbeatManagerLoader,
+ Functions.Function3<String, String, TypedProperties, StorageLockClient>
storageLockClientLoader,
+ Logger logger) {
+ StorageBasedLockConfig config = new
StorageBasedLockConfig.Builder().fromProperties(properties).build();
+ long heartbeatPollSeconds = config.getHeartbeatPollSeconds();
+ this.lockValiditySecs = config.getValiditySeconds();
+ this.lockFilePath = String.format(
+ "%s%s%s%s%s",
+ config.getHudiTableBasePath(),
+ StoragePath.SEPARATOR,
+ LOCKS_FOLDER_NAME,
+ StoragePath.SEPARATOR,
+ DEFAULT_TABLE_LOCK_FILE_NAME);
+ this.heartbeatManager = heartbeatManagerLoader.apply(ownerId,
TimeUnit.SECONDS.toMillis(heartbeatPollSeconds), this::renewLock);
+ this.storageLockClient = storageLockClientLoader.apply(ownerId,
lockFilePath, properties);
+ this.ownerId = ownerId;
+ this.logger = logger;
+ shutdownThread = new Thread(() -> shutdown(true));
+ Runtime.getRuntime().addShutdownHook(shutdownThread);
+ logger.info("Instantiated new storage-based lock provider, owner: {},
lockfilePath: {}", ownerId, lockFilePath);
+ }
+
+ // -----------------------------------------
+ // BASE METHODS
+ // -----------------------------------------
+
+ @Override
+ public synchronized StorageLockFile getLock() {
+ return currentLockObj;
+ }
+
+ /**
+ * Attempts to acquire the lock within the given timeout.
+ */
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ long deadlineNanos = System.nanoTime() + unit.toNanos(time);
+
+ while (System.nanoTime() < deadlineNanos) {
+ try {
+ logDebugLockState(ACQUIRING);
+ if (tryLock()) {
+ return true;
+ }
+
Thread.sleep(Long.parseLong(DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new
HoodieLockException(generateLockStateMessage(LockState.FAILED_TO_ACQUIRE), e);
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public synchronized void close() {
+ shutdown(false);
+ }
+
+ @VisibleForTesting
+ synchronized void shutdown(boolean fromShutdownHook) {
+ if (fromShutdownHook) {
+ // Try to expire the lock from the shutdown hook.
+ if (!isClosed && actuallyHoldsLock()) {
+ tryExpireCurrentLock(true);
+ }
+ // Do not execute any further actions, mark closed
+ this.isClosed = true;
+ return;
+ } else {
+ try {
+ tryRemoveShutdownHook();
+ } catch (IllegalStateException e) {
+ logger.warn("Owner {}: Failed to remove shutdown hook, JVM is already
shutting down.", ownerId, e);
+ }
+ }
+ try {
+ this.unlock();
+ } catch (Exception e) {
+ logger.error("Owner {}: Failed to unlock current lock.", ownerId, e);
+ }
+ try {
+ this.storageLockClient.close();
+ } catch (Exception e) {
+ logger.error("Owner {}: Lock service failed to close.", ownerId, e);
+ }
+ try {
+ this.heartbeatManager.close();
+ } catch (Exception e) {
+ logger.error("Owner {}: Heartbeat manager failed to close.", ownerId, e);
+ }
+
+ this.isClosed = true;
+ }
+
+ @VisibleForTesting
+ void tryRemoveShutdownHook() {
+ Runtime.getRuntime().removeShutdownHook(shutdownThread);
+ }
+
+ private synchronized boolean isLockStillValid(StorageLockFile lock) {
+ return !lock.isExpired() &&
!isCurrentTimeCertainlyOlderThanDistributedTime(lock.getValidUntilMs());
+ }
+
+ /**
+ * Attempts a single pass to acquire the lock (non-blocking).
+ *
+ * @return true if lock acquired, false otherwise
+ */
+ @Override
+ public synchronized boolean tryLock() {
+ assertHeartbeatManagerExists();
+ assertUnclosed();
+ logDebugLockState(ACQUIRING);
+ if (actuallyHoldsLock()) {
+ // Supports reentrant locks
+ return true;
+ }
+
+ if (this.heartbeatManager.hasActiveHeartbeat()) {
+ logger.error("Detected broken invariant: there is an active heartbeat
without a lock being held.");
+ // Breach of object invariant - we should never have an active heartbeat
without
+ // holding a lock.
+ throw new
HoodieLockException(generateLockStateMessage(FAILED_TO_ACQUIRE));
+ }
+
+ Pair<LockGetResult, Option<StorageLockFile>> latestLock =
this.storageLockClient.readCurrentLockFile();
+ if (latestLock.getLeft() == LockGetResult.UNKNOWN_ERROR) {
+ logInfoLockState(FAILED_TO_ACQUIRE, "Failed to get the latest lock
status");
+ // We were not able to determine whether a lock was present.
+ return false;
+ }
+
+ if (latestLock.getLeft() == LockGetResult.SUCCESS &&
isLockStillValid(latestLock.getRight().get())) {
+ String msg = String.format("Lock already held by %s",
latestLock.getRight().get().getOwner());
+ // Lock held by others.
+ logInfoLockState(FAILED_TO_ACQUIRE, msg);
+ return false;
+ }
+
+ // Try to acquire the lock
+ StorageLockData newLockData = new StorageLockData(false,
getCurrentEpochMs() + TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId);
+ Pair<LockUpsertResult, Option<StorageLockFile>> lockUpdateStatus =
this.storageLockClient.tryUpsertLockFile(
+ newLockData,
+ latestLock.getRight());
+ if (lockUpdateStatus.getLeft() != LockUpsertResult.SUCCESS) {
+ // failed to acquire the lock, indicates concurrent contention
+ logInfoLockState(FAILED_TO_ACQUIRE);
+ return false;
+ }
+ this.setLock(lockUpdateStatus.getRight().get());
+
+ // There is a remote chance that
+ // - after lock is acquired but before heartbeat starts the lock is
expired.
+ // - lock is acquired and heartbeat is up yet it does not run timely
before the
+ // lock is expired
+ // It is mitigated by setting the lock validity period to a reasonably long
+ // period to survive until heartbeat comes, plus
+ // set the heartbeat interval relatively small enough.
+ if
(!this.heartbeatManager.startHeartbeatForThread(Thread.currentThread())) {
+ // Precondition "no active heartbeat" is checked previously, so when
+ // startHeartbeatForThread returns false,
+ // we are confident no heartbeat thread is running.
+ logErrorLockState(RELEASING, "We were unable to start the heartbeat!");
+ tryExpireCurrentLock(false);
+ return false;
+ }
+
+ logInfoLockState(ACQUIRED);
+ return true;
+ }
+
+ /**
+ * Determines whether this provider currently holds a valid lock.
+ *
+ * <p>
+ * This method checks both the existence of a lock object and its validity. A
+ * lock is considered
+ * valid only if it exists and has not expired according to its timestamp.
+ *
+ * @return {@code true} if this provider holds a valid lock, {@code false}
+ * otherwise
+ */
+ private boolean actuallyHoldsLock() {
+ return believesLockMightBeHeld() && isLockStillValid(getLock());
+ }
+
+ /**
+ * Checks if this provider has a non-null lock object reference.
+ *
+ * <p>
+ * A non-null lock object indicates that this provider has previously
+ * **successfully** acquired a lock via
+ * StorageBasedLockProvider##lock and has not yet **successfully** released
+ * it via StorageBasedLockProvider#unlock().
+ * It is merely an indicator that the lock might be held by this provider. To
+ * truly certify we are the owner of the lock,
+ * StorageBasedLockProvider#actuallyHoldsLock should be used.
+ *
+ * @return {@code true} if this provider has a non-null lock object,
+ * {@code false} otherwise
+ * @see StorageBasedLockProvider#actuallyHoldsLock()
+ */
+ private boolean believesLockMightBeHeld() {
+ return this.getLock() != null;
+ }
+
+ /**
+ * Unlock by marking our current lock file "expired": true.
+ */
+ @Override
+ public synchronized void unlock() {
+ assertHeartbeatManagerExists();
+ if (!believesLockMightBeHeld()) {
+ return;
+ }
+ boolean believesNoLongerHoldsLock = true;
+
+ // Try to stop the heartbeat first
+ if (heartbeatManager.hasActiveHeartbeat()) {
+ logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId);
+ believesNoLongerHoldsLock &= heartbeatManager.stopHeartbeat(true);
+ }
+
+ // Then expire the current lock.
+ believesNoLongerHoldsLock &= tryExpireCurrentLock(false);
+ if (!believesNoLongerHoldsLock) {
+ throw new
HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE));
+ }
+ }
+
+ private void assertHeartbeatManagerExists() {
+ if (heartbeatManager == null) {
+ // broken function precondition.
+ throw new HoodieLockException("Unexpected null heartbeatManager");
+ }
+ }
+
+ private void assertUnclosed() {
+ if (this.isClosed) {
+ throw new HoodieLockException("Lock provider already closed");
+ }
+ }
+
+ /**
+ * Tries to expire the currently held lock.
+ * @param fromShutdownHook Whether we are attempting best effort quick
unlock from shutdown hook.
+ * @return True if we were successfully able to upload an expired lock.
+ */
+ private synchronized boolean tryExpireCurrentLock(boolean fromShutdownHook) {
+ // It does not make sense to have heartbeat alive extending the lock lease
while
+ // here we are trying
+ // to expire the lock.
+ if (!fromShutdownHook && heartbeatManager.hasActiveHeartbeat()) {
+ // broken function precondition.
+ throw new HoodieLockException("Must stop heartbeat before expire lock
file");
+ }
+ logDebugLockState(RELEASING);
+ // Upload metadata that will unlock this lock.
+ StorageLockData expiredLockData = new StorageLockData(true,
this.getLock().getValidUntilMs(), ownerId);
+ Pair<LockUpsertResult, Option<StorageLockFile>> result;
+ result = this.storageLockClient.tryUpsertLockFile(expiredLockData,
Option.of(this.getLock()));
+ switch (result.getLeft()) {
+ case UNKNOWN_ERROR:
+ // Here we do not know the state of the lock.
+ logErrorLockState(FAILED_TO_RELEASE, "Lock state is unknown.");
+ return false;
+ case SUCCESS:
+ logInfoLockState(RELEASED);
+ setLock(null);
+ return true;
+ case ACQUIRED_BY_OTHERS:
+ // As we are confident no lock is held by itself, clean up the cached
lock object.
+ // However, this is an edge case, so warn.
+ logWarnLockState(RELEASED, "lock should not have been acquired by
others.");
+ setLock(null);
+ return true;
+ default:
+ throw new HoodieLockException("Unexpected lock update result: " +
result.getLeft());
+ }
+ }
+
+ /**
+ * Renews (heartbeats) the current lock if we are the holder, it forcefully
set
+ * the expiration flag
+ * to false and the lock expiration time to a later time in the future.
+ * @return True if we successfully renewed the lock, false if not.
+ */
+ @VisibleForTesting
+ protected synchronized boolean renewLock() {
+ try {
+ // If we don't hold the lock, no-op.
+ if (!believesLockMightBeHeld()) {
+ logger.warn("Owner {}: Cannot renew, no lock held by this process",
ownerId);
+ // No need to extend lock lease.
+ return false;
+ }
+
+ long oldExpirationMs = getLock().getValidUntilMs();
+ // Attempt conditional update, extend lock. There are 3 cases:
+ // 1. Happy case: lock has not expired yet, we extend the lease to a
longer
+ // period.
+ // 2. Corner case 1: lock is expired and is acquired by others, lock
renewal
+ // failed with ACQUIRED_BY_OTHERS.
+ // 3. Corner case 2: lock is expired but no one has acquired it yet, lock
+ // renewal "revived" the expired lock.
+ // Please note we expect the corner cases almost never happens.
+ // Action taken for corner case 2 is just a best effort mitigation. At
least it
+ // prevents further data corruption by
+ // letting someone else acquire the lock.
+ Pair<LockUpsertResult, Option<StorageLockFile>> currentLock =
this.storageLockClient.tryUpsertLockFile(
+ new StorageLockData(false, getCurrentEpochMs() +
TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId),
+ Option.of(getLock()));
+ switch (currentLock.getLeft()) {
+ case ACQUIRED_BY_OTHERS:
+ logger.error("Owner {}: Unable to renew lock as it is acquired by
others.", ownerId);
+ // No need to extend lock lease anymore.
+ return false;
+ case UNKNOWN_ERROR:
+ // This could be transient, but unclear, we will let the heartbeat
continue
+ // normally.
+ // If the next heartbeat run identifies our lock has expired we will
error out.
+ logger.warn("Owner {}: Unable to renew lock due to unknown error,
could be transient.", ownerId);
+ // Let heartbeat retry later.
+ return true;
+ case SUCCESS:
+ // Only positive outcome
+ this.setLock(currentLock.getRight().get());
+ logger.info("Owner {}: Lock renewal successful. The renewal
completes {} ms before expiration for lock {}.",
+ ownerId, oldExpirationMs - getCurrentEpochMs(), lockFilePath);
+ // Let heartbeat continue to renew lock lease again later.
+ return true;
+ default:
+ throw new HoodieLockException("Unexpected lock update result: " +
currentLock.getLeft());
+ }
+ } catch (Exception e) {
+ logger.error("Owner {}: Exception occurred while renewing lock",
ownerId, e);
+ return false;
+ }
+ }
+
+ // ---------
+ // Utilities
+ // ---------
+
+ /**
+ * Method to calculate whether a timestamp from a distributed source has
+ * definitively occurred yet.
+ */
+ protected boolean isCurrentTimeCertainlyOlderThanDistributedTime(long
epochMs) {
+ return getCurrentEpochMs() > epochMs + CLOCK_DRIFT_BUFFER_MS;
+ }
+
+ private String generateLockStateMessage(LockState state) {
+ String threadName = Thread.currentThread().getName();
+ return String.format(
+ "Owner %s: Lock file path %s, Thread %s, Storage based lock state
%s",
+ ownerId,
+ lockFilePath,
+ threadName,
+ state.toString());
+ }
+
+ private static final String LOCK_STATE_LOGGER_MSG = "Owner {}: Lock file
path {}, Thread {}, Storage based lock state {}";
+ private static final String LOCK_STATE_LOGGER_MSG_WITH_INFO = "Owner {}:
Lock file path {}, Thread {}, Storage based lock state {}, {}";
+
+ private void logDebugLockState(LockState state) {
+ logger.debug(LOCK_STATE_LOGGER_MSG, ownerId, lockFilePath,
Thread.currentThread(), state);
+ }
+
+ private void logInfoLockState(LockState state) {
+ logger.info(LOCK_STATE_LOGGER_MSG, ownerId, lockFilePath,
Thread.currentThread(), state);
+ }
+
+ private void logInfoLockState(LockState state, String msg) {
+ logger.info(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath,
Thread.currentThread(), state, msg);
+ }
+
+ private void logWarnLockState(LockState state, String msg) {
+ logger.warn(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath,
Thread.currentThread(), state, msg);
+ }
+
+ private void logErrorLockState(LockState state, String msg) {
+ logger.error(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath,
Thread.currentThread(), state, msg);
+ }
+
+ @VisibleForTesting
+ long getCurrentEpochMs() {
+ return System.currentTimeMillis();
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
index be73bd7cb2c0..a91193d5dc05 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
@@ -167,7 +167,7 @@ public class LockProviderHeartbeatManager implements
HeartbeatManager {
*/
private static ScheduledExecutorService createThreadScheduler(String
shortUuid) {
return Executors.newSingleThreadScheduledExecutor(
- r -> new Thread(r, "LockProvider-HeartbeatManager-Thread-" +
shortUuid));
+ r -> new Thread(r, "LockProvider-HeartbeatManager-Thread-" +
shortUuid));
}
/**
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java
new file mode 100644
index 000000000000..8f4d4779ad1c
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
+
+public class StorageBasedLockConfig extends HoodieConfig {
+ private static final String SINCE_VERSION_1_0_2 = "1.0.2";
+ private static final String STORAGE_BASED_LOCK_PROPERTY_PREFIX =
LockConfiguration.LOCK_PREFIX
+ + "storage.";
+
+ public static final ConfigProperty<Long> VALIDITY_TIMEOUT_SECONDS =
ConfigProperty
+ .key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "validity.timeout.secs")
+ .defaultValue(TimeUnit.MINUTES.toSeconds(5))
+ .markAdvanced()
+ .sinceVersion(SINCE_VERSION_1_0_2)
+ .withDocumentation(
+ "For storage-based lock provider, the amount of time in seconds each
new lock is valid for. "
+ + "The lock provider will attempt to renew its lock until it
successfully extends the lock lease period "
+ + "or the validity timeout is reached.");
+
+ public static final ConfigProperty<Long> HEARTBEAT_POLL_SECONDS =
ConfigProperty
+ .key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "heartbeat.poll.secs")
+ .defaultValue(30L)
+ .markAdvanced()
+ .sinceVersion(SINCE_VERSION_1_0_2)
+ .withDocumentation(
+ "For storage-based lock provider, the amount of time in seconds to
wait before renewing the lock. "
+ + "Defaults to 30 seconds.");
+
+ public long getValiditySeconds() {
+ return getLong(VALIDITY_TIMEOUT_SECONDS);
+ }
+
+ public long getHeartbeatPollSeconds() {
+ return getLong(HEARTBEAT_POLL_SECONDS);
+ }
+
+ public String getHudiTableBasePath() {
+ return getString(BASE_PATH);
+ }
+
+ public static class Builder {
+ private final StorageBasedLockConfig lockConfig = new
StorageBasedLockConfig();
+
+ public StorageBasedLockConfig build() {
+ lockConfig.setDefaults(StorageBasedLockConfig.class.getName());
+ return lockConfig;
+ }
+
+ public StorageBasedLockConfig.Builder fromProperties(TypedProperties
props) {
+ lockConfig.getProps().putAll(props);
+ checkRequiredProps();
+ return this;
+ }
+
+ private void checkRequiredProps() {
+ String notExistsMsg = " does not exist!";
+ if (!lockConfig.contains(BASE_PATH)) {
+ throw new IllegalArgumentException(BASE_PATH.key() + notExistsMsg);
+ }
+ if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) <
lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS)
+ * 10) {
+ throw new IllegalArgumentException(
+ VALIDITY_TIMEOUT_SECONDS.key() + " should be greater than or equal
to 10x " + HEARTBEAT_POLL_SECONDS.key());
+ }
+ if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) < 10) {
+ throw new IllegalArgumentException(
+ VALIDITY_TIMEOUT_SECONDS.key() + " should be greater than or equal
to 10 seconds.");
+ }
+ if (lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS) < 1) {
+ throw new IllegalArgumentException(
+ HEARTBEAT_POLL_SECONDS.key() + " should be greater than or equal
to 1 second.");
+ }
+ }
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
new file mode 100644
index 000000000000..4e95c33371f8
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
@@ -0,0 +1,669 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.StorageBasedLockConfig;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.refEq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test class for StorageBasedLockProvider
+ */
+class TestStorageBasedLockProvider {
+ private StorageBasedLockProvider lockProvider;
+ private StorageLockClient mockLockService;
+ private HeartbeatManager mockHeartbeatManager;
+ private Logger mockLogger;
+ private final String ownerId = UUID.randomUUID().toString();
+ private static final int DEFAULT_LOCK_VALIDITY_MS = 10000;
+
+ @BeforeEach
+ void setupLockProvider() {
+ mockLockService = mock(StorageLockClient.class);
+ mockHeartbeatManager = mock(HeartbeatManager.class);
+ mockLogger = mock(Logger.class);
+ when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+ TypedProperties props = new TypedProperties();
+ props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "10");
+ props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
+ props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-default");
+
+ lockProvider = spy(new StorageBasedLockProvider(
+ ownerId,
+ props,
+ (a,b,c) -> mockHeartbeatManager,
+ (a,b,c) -> mockLockService,
+ mockLogger));
+ }
+
+ @AfterEach
+ void cleanupLockProvider() {
+ lockProvider.close();
+ }
+
+ @Test
+ void testValidLockStorageLocation() {
+ TypedProperties props = new TypedProperties();
+ props.put(BASE_PATH.key(), "s3://bucket/lake/db/tbl-default");
+
+ LockConfiguration lockConf = new LockConfiguration(props);
+ StorageConfiguration<?> storageConf =
HoodieTestUtils.getDefaultStorageConf();
+
+ HoodieLockException ex = assertThrows(HoodieLockException.class,
+ () -> new StorageBasedLockProvider(lockConf, storageConf));
+ assertTrue(ex.getMessage().contains("Failed to load and initialize
StorageLock"));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = { "gs://bucket/lake/db/tbl-default",
"s3://bucket/lake/db/tbl-default",
+ "s3a://bucket/lake/db/tbl-default" })
+ void testNonExistentWriteServiceWithDefaults(String tableBasePathString) {
+ TypedProperties props = new TypedProperties();
+ props.put(BASE_PATH.key(), tableBasePathString);
+
+ LockConfiguration lockConf = new LockConfiguration(props);
+ StorageConfiguration<?> storageConf =
HoodieTestUtils.getDefaultStorageConf();
+
+ HoodieLockException ex = assertThrows(HoodieLockException.class,
+ () -> new StorageBasedLockProvider(lockConf, storageConf));
+ assertTrue(ex.getMessage().contains("Failed to load and initialize
StorageLock"));
+ }
+
+ @Test
+ void testTryLockForTimeUnitThrowsOnInterrupt() throws Exception {
+ doReturn(false).when(lockProvider).tryLock();
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread t = new Thread(() -> {
+ try {
+ lockProvider.tryLock(1, TimeUnit.SECONDS);
+ } catch (HoodieLockException e) {
+ latch.countDown();
+ }
+ });
+ t.start();
+ Thread.sleep(50);
+ t.interrupt();
+ assertTrue(latch.await(2, TimeUnit.SECONDS));
+ }
+
+ @Test
+ void testTryLockForTimeUnitAcquiresLockEventually() throws Exception {
+ AtomicInteger count = new AtomicInteger(0);
+ doAnswer(inv -> count.incrementAndGet() > 2).when(lockProvider).tryLock();
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread t = new Thread(() -> {
+ assertTrue(lockProvider.tryLock(4, TimeUnit.SECONDS));
+ latch.countDown();
+ });
+ t.start();
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ void testTryLockForTimeUnitFailsToAcquireLockEventually() throws Exception {
+ AtomicInteger count = new AtomicInteger(0);
+ doAnswer(inv -> count.incrementAndGet() > 2).when(lockProvider).tryLock();
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread t = new Thread(() -> {
+ assertFalse(lockProvider.tryLock(1, TimeUnit.SECONDS));
+ latch.countDown();
+ });
+ t.start();
+ assertTrue(latch.await(2, TimeUnit.SECONDS));
+ }
+
+ @Test
+ void testTryLockSuccess() {
+ long t0 = 1_000L;
+ when(lockProvider.getCurrentEpochMs())
+ .thenReturn(t0);
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ StorageLockData data = new StorageLockData(false, t0 +
DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryUpsertLockFile(refEq(data), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+ boolean acquired = lockProvider.tryLock();
+ assertTrue(acquired);
+ assertEquals(realLockFile, lockProvider.getLock());
+ verify(mockLockService, atLeastOnce()).tryUpsertLockFile(any(), any());
+ }
+
+ @Test
+ void testTryLockSuccessButFailureToStartHeartbeat() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(false);
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile))))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+
+ boolean acquired = lockProvider.tryLock();
+ assertFalse(acquired);
+ }
+
+ @Test
+ void testTryLockFailsFromOwnerMismatch() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ StorageLockFile returnedLockFile = new StorageLockFile(
+ new StorageLockData(false, System.currentTimeMillis() +
DEFAULT_LOCK_VALIDITY_MS, "different-owner"), "v1");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(returnedLockFile)));
+
+ HoodieLockException ex = assertThrows(HoodieLockException.class, () ->
lockProvider.tryLock());
+ assertTrue(ex.getMessage().contains("Owners do not match"));
+ }
+
+ @Test
+ void testTryLockFailsDueToExistingLock() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS,
+ "other-owner");
+ StorageLockFile existingLock = new StorageLockFile(data, "v2");
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS,
Option.of(existingLock)));
+
+ boolean acquired = lockProvider.tryLock();
+ assertFalse(acquired);
+ }
+
+ @Test
+ void testTryLockFailsToUpdateFile() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS,
Option.empty()));
+ assertFalse(lockProvider.tryLock());
+ }
+
+ @Test
+ void testTryLockFailsDueToUnknownState() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.UNKNOWN_ERROR,
Option.empty()));
+ assertFalse(lockProvider.tryLock());
+ }
+
+ @Test
+ void testTryLockSucceedsWhenExistingLockExpiredByTime() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
+ "other-owner");
+ StorageLockFile existingLock = new StorageLockFile(data, "v2");
+ StorageLockData newData = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS,
+ ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(newData, "v1");
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS,
Option.of(existingLock)));
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(existingLock))))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+ boolean acquired = lockProvider.tryLock();
+ assertTrue(acquired);
+ }
+
+ @Test
+ void testTryLockReentrancySucceeds() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+ boolean acquired = lockProvider.tryLock();
+ assertTrue(acquired);
+ // Re-entrancy succeeds
+ assertTrue(lockProvider.tryLock());
+ }
+
+ @Test
+ void testTryLockReentrancyAfterLockExpiredByTime() {
+ // In an extremely unlikely scenario, we could have a local reference to a
lock
+ // which is present but expired,
+ // and because we were unable to stop the heartbeat properly, we did not
+ // successfully set it to null.
+ // Due to the nature of the heartbeat manager, this is expected to
introduce
+ // some delay, but not be permanently blocking.
+ // There are a few variations of this edge case, so we must test them all.
+
+ // Here the lock is still "unexpired" but the time shows expired.
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile expiredLock = new StorageLockFile(data, "v1");
+ doReturn(expiredLock).when(lockProvider).getLock();
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ StorageLockData validData = new StorageLockData(false,
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
+ ownerId);
+ StorageLockFile validLock = new StorageLockFile(validData, "v2");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(validLock)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+ assertTrue(lockProvider.tryLock());
+ }
+
+ @Test
+ void testTryLockReentrancyAfterLockSetExpired() {
+ // In an extremely unlikely scenario, we could have a local reference to a
lock
+ // which is present but expired,
+ // and because we were unable to stop the heartbeat properly, we did not
+ // successfully set it to null.
+ // Due to the nature of the heartbeat manager, this is expected to
introduce
+ // some delay, but not be permanently blocking.
+ // There are a few variations of this edge case, so we must test them all.
+
+ // Here the lock is "expired" but the time shows unexpired.
+ StorageLockData data = new StorageLockData(true,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile expiredLock = new StorageLockFile(data, "v1");
+ doReturn(expiredLock).when(lockProvider).getLock();
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ StorageLockData validData = new StorageLockData(false,
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS,
+ ownerId);
+ StorageLockFile validLock = new StorageLockFile(validData, "v2");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(validLock)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+ assertTrue(lockProvider.tryLock());
+ }
+
+ @Test
+ void testTryLockHeartbeatStillActive() {
+ // In an extremely unlikely scenario, we could have a local reference to a
lock
+ // which is present but expired,
+ // and because we were unable to stop the heartbeat properly, we did not
+ // successfully set it to null.
+ // Due to the nature of the heartbeat manager, this is expected to
introduce
+ // some delay, but not be permanently blocking.
+ // There are a few variations of this edge case, so we must test them all.
+
+ // Here the heartbeat is still active, so we have to error out.
+ StorageLockData data = new StorageLockData(true,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile expiredLock = new StorageLockFile(data, "v1");
+ doReturn(expiredLock).when(lockProvider).getLock();
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+ assertThrows(HoodieLockException.class, () -> lockProvider.tryLock());
+ }
+
+ @Test
+ void testUnlockSucceedsAndReentrancy() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+ when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile))))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
+ Option.of(new StorageLockFile(new StorageLockData(true,
data.getValidUntil(), ownerId), "v2"))));
+ assertTrue(lockProvider.tryLock());
+ when(mockHeartbeatManager.hasActiveHeartbeat())
+ .thenReturn(true) // when we try to stop the heartbeat we will check
if heartbeat is active return true.
+ .thenReturn(false); // when try to set lock to expire we will assert
no active heartbeat as a precondition.
+ lockProvider.unlock();
+ assertNull(lockProvider.getLock());
+ lockProvider.unlock();
+ }
+
+ @Test
+ void testUnlockFailsToStopHeartbeat() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+ assertTrue(lockProvider.tryLock());
+ when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(false);
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+ assertThrows(HoodieLockException.class, () -> lockProvider.unlock());
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+ }
+
+ @Test
+ void testCloseFailsToStopHeartbeat() {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+ assertTrue(lockProvider.tryLock());
+ when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(false);
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+ // Should wrap the exception and log error.
+ lockProvider.close();
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+ }
+
+ @Test
+ void testRenewLockReturnsFalseWhenNoLockHeld() {
+ doReturn(null).when(lockProvider).getLock();
+ assertFalse(lockProvider.renewLock());
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true);
+ verify(mockLogger).warn("Owner {}: Cannot renew, no lock held by this
process", this.ownerId);
+ }
+
+ @Test
+ void testRenewLockWithoutHoldingLock() {
+ doReturn(null).when(lockProvider).getLock();
+ assertFalse(lockProvider.renewLock());
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+ verify(mockLogger).warn("Owner {}: Cannot renew, no lock held by this
process", this.ownerId);
+ }
+
+ @Test
+ void testRenewLockWithFullyExpiredLock() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile nearExpiredLockFile = new StorageLockFile(data, "v1");
+ doReturn(nearExpiredLockFile).when(lockProvider).getLock();
+ when(mockLockService.tryUpsertLockFile(any(),
eq(Option.of(nearExpiredLockFile))))
+ .thenReturn(Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, null));
+ assertFalse(lockProvider.renewLock());
+ verify(mockLogger).error("Owner {}: Unable to renew lock as it is acquired
by others.", this.ownerId);
+ }
+
+ @Test
+ void testRenewLockUnableToUpsertLockFileButNotFatal() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ doReturn(lockFile).when(lockProvider).getLock();
+ // Signal the upsert attempt failed, but may be transient. See interface
for
+ // more details.
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
+ .thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()));
+ assertTrue(lockProvider.renewLock());
+ }
+
+ @Test
+ void testRenewLockUnableToUpsertLockFileFatal() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ doReturn(lockFile).when(lockProvider).getLock();
+ // Signal the upsert attempt failed, but may be transient. See interface
for
+ // more details.
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
+ .thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()));
+ // renewLock return true so it will be retried.
+ assertTrue(lockProvider.renewLock());
+
+ verify(mockLogger).warn("Owner {}: Unable to renew lock due to unknown
error, could be transient.", this.ownerId);
+ }
+
+ @Test
+ void testRenewLockSucceedsButRenewalWithinExpirationWindow() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ doReturn(lockFile).when(lockProvider).getLock();
+
+ StorageLockData nearExpirationData = new StorageLockData(false,
System.currentTimeMillis(), ownerId);
+ StorageLockFile lockFileNearExpiration = new
StorageLockFile(nearExpirationData, "v2");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(lockFileNearExpiration)));
+
+ // We used to fail in this case before, but since we are only modifying a
single
+ // lock file, this is ok now.
+ // Therefore, this can be a happy path variation.
+ assertTrue(lockProvider.renewLock());
+ }
+
+ @Test
+ void testRenewLockSucceeds() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ doReturn(lockFile).when(lockProvider).getLock();
+
+ StorageLockData successData = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS,
+ ownerId);
+ StorageLockFile successLockFile = new StorageLockFile(successData, "v2");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(successLockFile)));
+ assertTrue(lockProvider.renewLock());
+
+ verify(mockLogger).info(
+ eq("Owner {}: Lock renewal successful. The renewal completes {} ms
before expiration for lock {}."),
+ eq(this.ownerId), anyLong(),
eq("gs://bucket/lake/db/tbl-default/.hoodie/.locks/table_lock.json"));
+ }
+
+ @Test
+ void testRenewLockFails() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ doReturn(lockFile).when(lockProvider).getLock();
+
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
+ .thenThrow(new RuntimeException("Failure"));
+ assertFalse(lockProvider.renewLock());
+
+ verify(mockLogger).error(eq("Owner {}: Exception occurred while renewing
lock"), eq(ownerId),
+ any(RuntimeException.class));
+ }
+
+ @Test
+ void testCloseCallsDependencies() throws Exception {
+ lockProvider.close();
+ verify(mockLockService, atLeastOnce()).close();
+ verify(mockHeartbeatManager, atLeastOnce()).close();
+ assertNull(lockProvider.getLock());
+ }
+
+ @Test
+ void testCloseWithErrorForLockService() throws Exception {
+ doThrow(new RuntimeException("Some
failure")).when(mockLockService).close();
+ lockProvider.close();
+ verify(mockLogger).error(eq("Owner {}: Lock service failed to close."),
eq(ownerId), any(RuntimeException.class));
+ assertNull(lockProvider.getLock());
+ }
+
+ @Test
+ void testCloseWithErrorForHeartbeatManager() throws Exception {
+ doThrow(new RuntimeException("Some
failure")).when(mockHeartbeatManager).close();
+ lockProvider.close();
+ verify(mockLogger).error(eq("Owner {}: Heartbeat manager failed to
close."), eq(ownerId),
+ any(RuntimeException.class));
+ assertNull(lockProvider.getLock());
+ }
+
+ @Test
+ public void testShutdownHookViaReflection() throws Exception {
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+ boolean acquired = lockProvider.tryLock();
+ assertTrue(acquired);
+ assertEquals(realLockFile, lockProvider.getLock());
+ verify(mockLockService, atLeastOnce()).tryUpsertLockFile(any(), any());
+
+ when(mockLockService.tryUpsertLockFile(any(StorageLockData.class),
eq(Option.of(realLockFile))))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+
+ // Mock shutdown
+ lockProvider.shutdown(true);
+
+ // Verify that the expected shutdown behaviors occurred.
+ assertNull(lockProvider.getLock());
+ // We do not execute additional actions
+ verify(mockLockService, never()).close();
+ verify(mockHeartbeatManager, never()).close();
+ }
+
+ @Test
+ public void testShutdownHookWhenNoLockPresent() throws Exception {
+ // Now, when calling shutdown(true), the method should immediately return.
+ lockProvider.shutdown(true);
+
+ // Verify that unlock or close methods are NOT invoked, or adjust
expectations accordingly.
+ verify(mockLockService, never()).close();
+ verify(mockHeartbeatManager, never()).close();
+ }
+
+ @Test
+ public void testShutdownHookFailsToBeRemoved() throws Exception {
+ doThrow(new IllegalStateException("Shutdown already in
progress")).when(lockProvider).tryRemoveShutdownHook();
+ lockProvider.close();
+ verify(mockLockService, atLeastOnce()).close();
+ verify(mockHeartbeatManager, atLeastOnce()).close();
+ assertNull(lockProvider.getLock());
+ }
+
+ @Test
+ void testShutdownHookFiresDuringTryLockWithTimeout() throws Exception {
+ // This test simulates the scenario where the shutdown hook fires while
tryLock(long time, TimeUnit unit)
+ // is in progress, and expects that HoodieLockException is thrown when
tryLock is called after shutdown.
+
+ // Setup mocks to simulate lock being held by another owner (to keep
tryLock looping)
+ StorageLockData otherOwnerData = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, "other-owner");
+ StorageLockFile otherOwnerLock = new StorageLockFile(otherOwnerData, "v1");
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS,
Option.of(otherOwnerLock)));
+
+ CountDownLatch tryLockStarted = new CountDownLatch(1);
+ CountDownLatch proceedWithShutdown = new CountDownLatch(1);
+ CountDownLatch shutdownCompleted = new CountDownLatch(1);
+ CountDownLatch tryLockCompleted = new CountDownLatch(1);
+ CountDownLatch exceptionThrown = new CountDownLatch(1);
+
+ // Spy on the real tryLock to know when it's been called and coordinate
with shutdown
+ AtomicInteger tryLockCallCount = new AtomicInteger(0);
+ doAnswer(inv -> {
+ int count = tryLockCallCount.incrementAndGet();
+ if (count == 1) {
+ // First call - signal that tryLock has started
+ tryLockStarted.countDown();
+ // Wait for shutdown to be triggered
+ assertTrue(proceedWithShutdown.await(2, TimeUnit.SECONDS));
+ } else {
+ // Subsequent calls - wait briefly for shutdown to complete
+ assertTrue(shutdownCompleted.await(100, TimeUnit.MILLISECONDS));
+ }
+ // Call the real method
+ return inv.callRealMethod();
+ }).when(lockProvider).tryLock();
+
+ // Start a thread that will call tryLock with timeout
+ Thread tryLockThread = new Thread(() -> {
+ try {
+ lockProvider.tryLock(2, TimeUnit.SECONDS);
+ // Should not reach here - exception should be thrown after shutdown
+ fail("Should have thrown HoodieLockException after shutdown");
+ } catch (HoodieLockException e) {
+ // Expected - tryLock should throw exception after shutdown
+ exceptionThrown.countDown();
+ } finally {
+ tryLockCompleted.countDown();
+ }
+ });
+
+ tryLockThread.start();
+
+ // Wait for tryLock to start
+ assertTrue(tryLockStarted.await(2, TimeUnit.SECONDS), "tryLock should have
started");
+
+ // Now invoke the shutdown hook while tryLock is in progress
+ // Invoke shutdown in a separate thread to simulate shutdown hook
+ Thread shutdownThread = new Thread(() -> {
+ proceedWithShutdown.countDown(); // Signal tryLock to proceed
+ lockProvider.shutdown(true);
+ shutdownCompleted.countDown(); // Signal that shutdown is complete
+ });
+ shutdownThread.start();
+
+ // Wait for both operations to complete
+ assertTrue(tryLockCompleted.await(5, TimeUnit.SECONDS), "tryLock should
complete");
+ assertTrue(exceptionThrown.await(1, TimeUnit.SECONDS),
"HoodieLockException should have been thrown");
+ shutdownThread.join(2000);
+
+ // Verify the state after shutdown
+ // The lock should be null after shutdown
+ assertNull(lockProvider.getLock(), "Lock should be null after shutdown
hook fires");
+
+ // Verify that tryLock was called at least once
+ verify(lockProvider, atLeastOnce()).tryLock();
+ }
+
+ public static class StubStorageLockClient implements StorageLockClient {
+ public StubStorageLockClient(String ownerId, String lockFileUri,
Properties props) {
+ assertTrue(lockFileUri.endsWith("table_lock.json"));
+ }
+
+ @Override
+ public Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile(
+ StorageLockData newLockData,
+ Option<StorageLockFile> previousLockFile) {
+ return null;
+ }
+
+ @Override
+ public Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile() {
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // stub, no-op
+ }
+ }
+
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java
index cdeca536b3ae..234aebe70b76 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java
@@ -156,13 +156,13 @@ public class TestLockProviderHeartbeatManager {
when(semaphore.tryAcquire()).thenReturn(false);
when(semaphore.tryAcquire(eq(DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS),
eq(TimeUnit.MILLISECONDS))).thenReturn(true);
manager = new LockProviderHeartbeatManager(
- LOGGER_ID,
- mockScheduler,
- 100L,
- DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS,
- () -> true,
- semaphore,
- mockLogger);
+ LOGGER_ID,
+ mockScheduler,
+ 100L,
+ DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS,
+ () -> true,
+ semaphore,
+ mockLogger);
assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
t.get().start();
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java
new file mode 100644
index 000000000000..4855b6505877
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestStorageBasedLockConfig {
+
+ @Test
+ void testDefaultValues() {
+ // Asserts that the defaults are correct
+ TypedProperties props = new TypedProperties();
+ props.setProperty(BASE_PATH.key(), "s3://hudi-bucket/table/basepath");
+
+ StorageBasedLockConfig.Builder builder = new
StorageBasedLockConfig.Builder();
+ StorageBasedLockConfig config = builder
+ .fromProperties(props)
+ .build();
+
+ assertEquals(5 * 60, config.getValiditySeconds(), "Default lock validity
should be 5 minutes");
+ assertEquals(30, config.getHeartbeatPollSeconds(), "Default heartbeat poll
time should be 30 seconds");
+ }
+
+ @Test
+ void testCustomValues() {
+ // Testing that custom values which differ from defaults can be read
properly
+ TypedProperties props = new TypedProperties();
+ props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
"120");
+ props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(),
"10");
+ props.setProperty(BASE_PATH.key(), "/hudi/table/basepath");
+
+ StorageBasedLockConfig config = new StorageBasedLockConfig.Builder()
+ .fromProperties(props)
+ .build();
+
+ assertEquals(120, config.getValiditySeconds());
+ assertEquals(10, config.getHeartbeatPollSeconds());
+ assertEquals("/hudi/table/basepath", config.getHudiTableBasePath());
+ }
+
+ @Test
+ void testBasePathPropertiesValidation() {
+ // Tests that validations around the base path are present.
+ TypedProperties props = new TypedProperties();
+ props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
"120");
+ StorageBasedLockConfig.Builder propsBuilder = new
StorageBasedLockConfig.Builder();
+
+ // Missing base path
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class,
+ () -> propsBuilder.fromProperties(props));
+ assertTrue(exception.getMessage().contains(BASE_PATH.key()));
+ }
+
+ @Test
+ void testTimeThresholds() {
+ // Ensure that validations which restrict the time-based inputs are
working.
+ TypedProperties props = new TypedProperties();
+ props.setProperty(BASE_PATH.key(), "/hudi/table/basepath");
+ // Invalid config case: validity timeout is less than 10x of heartbeat
poll period
+ props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
"5");
+ props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(),
"3");
+ StorageBasedLockConfig.Builder propsBuilder = new
StorageBasedLockConfig.Builder();
+
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class,
+ () -> propsBuilder.fromProperties(props));
+
assertTrue(exception.getMessage().contains(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key()));
+ // Invalid config case: validity timeout is less than 10 seconds
+ props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
"9");
+ props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(),
"1");
+ exception = assertThrows(IllegalArgumentException.class, () ->
propsBuilder.fromProperties(props));
+
assertTrue(exception.getMessage().contains(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key()));
+ // Invalid config case: heartbeat poll period is less than 1 second
+ props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(),
"10");
+ props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(),
"0");
+ exception = assertThrows(IllegalArgumentException.class, () ->
propsBuilder.fromProperties(props));
+
assertTrue(exception.getMessage().contains(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key()));
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 2c3e2b515bbb..3102e5e1e53f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -97,6 +97,8 @@ public class HoodieTableMetaClient implements Serializable {
public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH =
BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + StoragePath.SEPARATOR + ".fileids";
+ public static final String LOCKS_FOLDER_NAME = METAFOLDER_NAME +
StoragePath.SEPARATOR + ".locks";
+
public static final String SCHEMA_FOLDER_NAME = ".schema";
public static final String MARKER_EXTN = ".marker";
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
index 1b1d32e4ac37..aa4932c0cc6c 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java
@@ -84,6 +84,15 @@ public class TestStorageSchemes {
assertThrows(IllegalArgumentException.class, () -> {
StorageSchemes.isAppendSupported("s2");
}, "Should throw exception for unsupported schemes");
+
+ for (StorageSchemes scheme : StorageSchemes.values()) {
+ String schemeName = scheme.getScheme();
+ if (scheme.getScheme().startsWith("s3") ||
scheme.getScheme().startsWith("gs")) {
+
assertTrue(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent());
+ } else {
+
assertFalse(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent());
+ }
+ }
}
@Test
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
index 129956166b3a..68c0a068188e 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java
@@ -21,68 +21,71 @@ package org.apache.hudi.storage;
import java.util.Arrays;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+
/**
* All the supported storage schemes in Hoodie.
*/
public enum StorageSchemes {
// Local filesystem
- FILE("file", false, false, true, true),
+ FILE("file", false, false, true, true, null),
// Hadoop File System
- HDFS("hdfs", true, false, true, false),
+ HDFS("hdfs", true, false, true, false, null),
// Baidu Advanced File System
- AFS("afs", true, null, null, null),
+ AFS("afs", true, null, null, null, null),
// Mapr File System
- MAPRFS("maprfs", true, null, null, null),
+ MAPRFS("maprfs", true, null, null, null, null),
// Apache Ignite FS
- IGNITE("igfs", true, null, null, null),
+ IGNITE("igfs", true, null, null, null, null),
// AWS S3
- S3A("s3a", false, true, null, true),
- S3("s3", false, true, null, true),
+ S3A("s3a", false, true, null, true,
"org.apache.hudi.aws.transaction.lock.S3StorageLockClient"),
+ S3("s3", false, true, null, true,
"org.apache.hudi.aws.transaction.lock.S3StorageLockClient"),
// Google Cloud Storage
- GCS("gs", false, true, null, true),
+ GCS("gs", false, true, null, true,
"org.apache.hudi.gcp.transaction.lock.GCSStorageLockClient"),
// Azure WASB
- WASB("wasb", false, null, null, null), WASBS("wasbs", false, null, null,
null),
+ WASB("wasb", false, null, null, null, null), WASBS("wasbs", false, null,
null, null, null),
// Azure ADLS
- ADL("adl", false, null, null, null),
+ ADL("adl", false, null, null, null, null),
// Azure ADLS Gen2
- ABFS("abfs", false, null, null, null), ABFSS("abfss", false, null, null,
null),
+ ABFS("abfs", false, null, null, null, null), ABFSS("abfss", false, null,
null, null, null),
// Aliyun OSS
- OSS("oss", false, null, null, null),
+ OSS("oss", false, null, null, null, null),
// View FS for federated setups. If federating across cloud stores, then
append support is false
// View FS support atomic creation
- VIEWFS("viewfs", true, null, true, null),
+ VIEWFS("viewfs", true, null, true, null, null),
//ALLUXIO
- ALLUXIO("alluxio", false, null, null, null),
+ ALLUXIO("alluxio", false, null, null, null, null),
// Tencent Cloud Object Storage
- COSN("cosn", false, null, null, null),
+ COSN("cosn", false, null, null, null, null),
// Tencent Cloud HDFS
- CHDFS("ofs", true, null, null, null),
+ CHDFS("ofs", true, null, null, null, null),
// Tencent Cloud CacheFileSystem
- GOOSEFS("gfs", false, null, null, null),
+ GOOSEFS("gfs", false, null, null, null, null),
// Databricks file system
- DBFS("dbfs", false, null, null, null),
+ DBFS("dbfs", false, null, null, null, null),
// IBM Cloud Object Storage
- COS("cos", false, null, null, null),
+ COS("cos", false, null, null, null, null),
// Huawei Cloud Object Storage
- OBS("obs", false, null, null, null),
+ OBS("obs", false, null, null, null, null),
// Kingsoft Standard Storage ks3
- KS3("ks3", false, null, null, null),
+ KS3("ks3", false, null, null, null, null),
// Netease Object Storage nos
- NOS("nos", false, null, null, null),
+ NOS("nos", false, null, null, null, null),
// JuiceFileSystem
- JFS("jfs", true, null, null, null),
+ JFS("jfs", true, null, null, null, null),
// Baidu Object Storage
- BOS("bos", false, null, null, null),
+ BOS("bos", false, null, null, null, null),
// Oracle Cloud Infrastructure Object Storage
- OCI("oci", false, null, null, null),
+ OCI("oci", false, null, null, null, null),
// Volcengine Object Storage
- TOS("tos", false, null, null, null),
+ TOS("tos", false, null, null, null, null),
// Volcengine Cloud HDFS
- CFS("cfs", true, null, null, null),
+ CFS("cfs", true, null, null, null, null),
// Aliyun Apsara File Storage for HDFS
- DFS("dfs", true, false, true, null),
+ DFS("dfs", true, false, true, null, null),
// Hopsworks File System
- HOPSFS("hopsfs", false, false, true, null);
+ HOPSFS("hopsfs", false, false, true, null, null);
private String scheme;
private boolean supportsAppend;
@@ -94,13 +97,15 @@ public enum StorageSchemes {
// when we want to get only part of files under a directory rather than all
files, use getStatus may be more friendly than listStatus.
// here is a trade-off between rpc times and throughput of storage meta
service
private Boolean listStatusFriendly;
+ private String storageLockClass;
- StorageSchemes(String scheme, boolean supportsAppend, Boolean
isWriteTransactional, Boolean supportAtomicCreation, Boolean
listStatusFriendly) {
+ StorageSchemes(String scheme, boolean supportsAppend, Boolean
isWriteTransactional, Boolean supportAtomicCreation, Boolean
listStatusFriendly, String storageLockClass) {
this.scheme = scheme;
this.supportsAppend = supportsAppend;
this.isWriteTransactional = isWriteTransactional;
this.supportAtomicCreation = supportAtomicCreation;
this.listStatusFriendly = listStatusFriendly;
+ this.storageLockClass = storageLockClass;
}
public String getScheme() {
@@ -123,6 +128,14 @@ public enum StorageSchemes {
return listStatusFriendly != null && listStatusFriendly;
}
+ public boolean implementsStorageLock() {
+ return !StringUtils.isNullOrEmpty(storageLockClass);
+ }
+
+ public String getStorageLockClass() {
+ return storageLockClass;
+ }
+
public static boolean isSchemeSupported(String scheme) {
return Arrays.stream(values()).anyMatch(s -> s.getScheme().equals(scheme));
}
@@ -155,4 +168,12 @@ public enum StorageSchemes {
}
return Arrays.stream(StorageSchemes.values()).anyMatch(s ->
s.getListStatusFriendly() && s.scheme.equals(scheme));
}
+
+ public static Option<StorageSchemes>
getStorageLockImplementationIfExists(String scheme) {
+ if (!isSchemeSupported(scheme)) {
+ throw new IllegalArgumentException("Unsupported scheme :" + scheme);
+ }
+ return Option.fromJavaOptional(Arrays.stream(StorageSchemes.values())
+ .filter(s -> s.implementsStorageLock() &&
s.scheme.equals(scheme)).findFirst());
+ }
}