This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 6a042255555 [HUDI-7198] Create nested node path if does not exist for zookeeper. (#10281) 6a042255555 is described below commit 6a042255555d2101812618d216f581b69a5b8b69 Author: harshal <harshal.j.pa...@gmail.com> AuthorDate: Wed Jan 3 12:20:30 2024 +0530 [HUDI-7198] Create nested node path if does not exist for zookeeper. (#10281) --- .../lock/ZookeeperBasedLockProvider.java | 34 ++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java index 31b92dcf914..0f31b6389cc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java @@ -74,8 +74,39 @@ public class ZookeeperBasedLockProvider implements LockProvider<InterProcessMute .connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY, DEFAULT_ZK_CONNECTION_TIMEOUT_MS)) .build(); this.curatorFrameworkClient.start(); + createPathIfNotExists(); } + private String getLockPath() { + return lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/" + + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY); + } + + private void createPathIfNotExists() { + try { + String lockPath = getLockPath(); + LOG.info(String.format("Creating zookeeper path %s if not exists", lockPath)); + String[] parts = lockPath.split("/"); + StringBuilder currentPath = new StringBuilder(); + for (String part : parts) { + if (!part.isEmpty()) { + currentPath.append("/").append(part); + createNodeIfNotExists(currentPath.toString()); + } + } + } catch (Exception e) { + LOG.error("Failed to create ZooKeeper path: " + e.getMessage()); + throw new RuntimeException("Failed to initialize ZooKeeper path", e); + } + } + + private void createNodeIfNotExists(String path) throws Exception { + if (this.curatorFrameworkClient.checkExists().forPath(path) == null) { + this.curatorFrameworkClient.create().forPath(path); + } + } + + // Only used for testing public ZookeeperBasedLockProvider( final LockConfiguration lockConfiguration, final CuratorFramework curatorFrameworkClient) { @@ -139,8 +170,7 @@ public class ZookeeperBasedLockProvider implements LockProvider<InterProcessMute private void acquireLock(long time, TimeUnit unit) throws Exception { ValidationUtils.checkArgument(this.lock == null, generateLogStatement(LockState.ALREADY_ACQUIRED, generateLogSuffixString())); InterProcessMutex newLock = new InterProcessMutex( - this.curatorFrameworkClient, lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/" - + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY)); + this.curatorFrameworkClient, getLockPath()); boolean acquired = newLock.acquire(time, unit); if (!acquired) { throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, generateLogSuffixString()));