This is an automated email from the ASF dual-hosted git repository.
wombatu-kun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f3ccf1b3c31b [MINOR] Wait for ZK connection in lock provider to
de-flake direct-marker detection test (#19014)
f3ccf1b3c31b is described below
commit f3ccf1b3c31b52e7ece2d596a7749369e2f7e0b0
Author: Vova Kolmakov <[email protected]>
AuthorDate: Tue Jun 16 15:20:11 2026 +0700
[MINOR] Wait for ZK connection in lock provider to de-flake direct-marker
detection test (#19014)
---
.../lock/BaseZookeeperBasedLockProvider.java | 27 ++++++++++++++++++--
.../TestZookeeperBasedLockProvider.java | 21 ++++++++++++++++
...erBasedDetectionStrategyWithZKLockProvider.java | 29 +++++++++++++++++++---
3 files changed, 71 insertions(+), 6 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
index d5b04c15c005..6cdee60e2d1b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
@@ -68,6 +68,7 @@ public abstract class BaseZookeeperBasedLockProvider
implements LockProvider<Int
this.lockConfiguration = lockConfiguration;
zkBasePath = getZkBasePath(lockConfiguration);
lockKey = getLockKey(lockConfiguration);
+ int connectionTimeoutMs =
ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(),
ZK_CONNECTION_TIMEOUT_MS);
this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
.connectString(ConfigUtils.getStringWithAltKeys(lockConfiguration.getConfig(),
ZK_CONNECT_URL))
.retryPolicy(new BoundedExponentialBackoffRetry(
@@ -75,10 +76,32 @@ public abstract class BaseZookeeperBasedLockProvider
implements LockProvider<Int
ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(),
LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS),
ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(),
LOCK_ACQUIRE_NUM_RETRIES)))
.sessionTimeoutMs(ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(),
ZK_SESSION_TIMEOUT_MS))
-
.connectionTimeoutMs(ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(),
ZK_CONNECTION_TIMEOUT_MS))
+ .connectionTimeoutMs(connectionTimeoutMs)
.build();
this.curatorFrameworkClient.start();
- createPathIfNotExists();
+ // Once started, the Curator client owns background threads. If anything
below throws, the
+ // constructor never returns the instance, so the caller can never invoke
close() - clean up here.
+ try {
+ if
(!this.curatorFrameworkClient.blockUntilConnected(connectionTimeoutMs,
TimeUnit.MILLISECONDS)) {
+ throw new HoodieLockException("Failed to connect to ZooKeeper within "
+ connectionTimeoutMs + " ms");
+ }
+ createPathIfNotExists();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ closeQuietly();
+ throw new HoodieLockException("Interrupted while waiting to connect to
ZooKeeper", e);
+ } catch (RuntimeException e) {
+ closeQuietly();
+ throw e;
+ }
+ }
+
+ private void closeQuietly() {
+ try {
+ this.curatorFrameworkClient.close();
+ } catch (Exception ex) {
+ log.warn("Failed to close ZooKeeper client after failed initialization",
ex);
+ }
}
protected abstract String getZkBasePath(LockConfiguration lockConfiguration);
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
index fab3dee8f8f6..fa1a8329b65f 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
+import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -194,4 +195,24 @@ public class TestZookeeperBasedLockProvider {
ZookeeperBasedLockProvider zookeeperBasedLockProvider = new
ZookeeperBasedLockProvider(zkConfWithZkBasePathAndLockKeyLock, null);
zookeeperBasedLockProvider.unlock();
}
+
+ @Test
+ public void testFailFastWhenZkUnreachable() {
+ Properties properties = new Properties();
+ // Nothing listens on 127.0.0.1:1, so the connect-wait must time out
instead of hanging.
+ properties.setProperty(ZK_CONNECT_URL_PROP_KEY, "127.0.0.1:1");
+ properties.setProperty(ZK_BASE_PATH_PROP_KEY, basePath);
+ properties.setProperty(ZK_LOCK_KEY_PROP_KEY, key);
+ properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"100");
+
properties.setProperty(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY,
"300");
+ properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "1");
+ properties.setProperty(ZK_SESSION_TIMEOUT_MS_PROP_KEY, "1000");
+ properties.setProperty(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY, "1000");
+ LockConfiguration unreachable = new LockConfiguration(properties);
+ // Construction must fail fast (seconds, bounded by the connection
timeout) with a
+ // HoodieLockException instead of being amplified into a multi-minute
retry hang.
+ Assertions.assertTimeoutPreemptively(Duration.ofSeconds(15), () ->
+ Assertions.assertThrows(HoodieLockException.class,
+ () -> new ZookeeperBasedLockProvider(unreachable, null)));
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockProvider.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockProvider.java
index 179504b9e161..73d9c40db316 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockProvider.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockProvider.java
@@ -51,9 +51,16 @@ import java.io.IOException;
import java.util.List;
import java.util.Properties;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP_KEY;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -82,6 +89,15 @@ public class
TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockPr
properties.setProperty(ZK_CONNECT_URL_PROP_KEY, server.getConnectString());
properties.setProperty(ZK_BASE_PATH_PROP_KEY,
server.getTempDirectory().getAbsolutePath());
properties.setProperty(ZK_LOCK_KEY_PROP_KEY, "key");
+ // Bound lock retries and ZK timeouts so a transient connection failure
fails fast in seconds
+ // instead of being amplified by the production-default retry layers into
a multi-minute hang.
+ properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"1000");
+
properties.setProperty(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY,
"3000");
+ properties.setProperty(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
+ properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
+ properties.setProperty(ZK_SESSION_TIMEOUT_MS_PROP_KEY, "10000");
+ properties.setProperty(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY, "10000");
+ properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
config = getConfigBuilder()
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
@@ -104,10 +120,15 @@ public class
TestSimpleTransactionDirectMarkerBasedDetectionStrategyWithZKLockPr
@AfterEach
public void clean() throws IOException {
- cleanupResources();
- FileIOUtils.deleteDirectory(new File(basePath));
- if (server != null) {
- server.close();
+ try {
+ cleanupResources();
+ FileIOUtils.deleteDirectory(new File(basePath));
+ } finally {
+ // Always stop the embedded ZooKeeper server, even if resource cleanup
or directory
+ // deletion above throws, so the server is not leaked across
parameterized runs.
+ if (server != null) {
+ server.close();
+ }
}
}