This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a042255555 [HUDI-7198] Create nested node path if does not exist for 
zookeeper. (#10281)
6a042255555 is described below

commit 6a042255555d2101812618d216f581b69a5b8b69
Author: harshal <harshal.j.pa...@gmail.com>
AuthorDate: Wed Jan 3 12:20:30 2024 +0530

    [HUDI-7198] Create nested node path if does not exist for zookeeper. 
(#10281)
---
 .../lock/ZookeeperBasedLockProvider.java           | 34 ++++++++++++++++++++--
 1 file changed, 32 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
index 31b92dcf914..0f31b6389cc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
@@ -74,8 +74,39 @@ public class ZookeeperBasedLockProvider implements 
LockProvider<InterProcessMute
         
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY,
 DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
         .build();
     this.curatorFrameworkClient.start();
+    createPathIfNotExists();
   }
 
+  private String getLockPath() {
+    return lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
+        + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
+  }
+
+  private void createPathIfNotExists() {
+    try {
+      String lockPath = getLockPath();
+      LOG.info(String.format("Creating zookeeper path %s if not exists", 
lockPath));
+      String[] parts = lockPath.split("/");
+      StringBuilder currentPath = new StringBuilder();
+      for (String part : parts) {
+        if (!part.isEmpty()) {
+          currentPath.append("/").append(part);
+          createNodeIfNotExists(currentPath.toString());
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to create ZooKeeper path: " + e.getMessage());
+      throw new RuntimeException("Failed to initialize ZooKeeper path", e);
+    }
+  }
+
+  private void createNodeIfNotExists(String path) throws Exception {
+    if (this.curatorFrameworkClient.checkExists().forPath(path) == null) {
+      this.curatorFrameworkClient.create().forPath(path);
+    }
+  }
+
+
   // Only used for testing
   public ZookeeperBasedLockProvider(
       final LockConfiguration lockConfiguration, final CuratorFramework 
curatorFrameworkClient) {
@@ -139,8 +170,7 @@ public class ZookeeperBasedLockProvider implements 
LockProvider<InterProcessMute
   private void acquireLock(long time, TimeUnit unit) throws Exception {
     ValidationUtils.checkArgument(this.lock == null, 
generateLogStatement(LockState.ALREADY_ACQUIRED, generateLogSuffixString()));
     InterProcessMutex newLock = new InterProcessMutex(
-        this.curatorFrameworkClient, 
lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
-        + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY));
+        this.curatorFrameworkClient, getLockPath());
     boolean acquired = newLock.acquire(time, unit);
     if (!acquired) {
       throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, 
generateLogSuffixString()));

Reply via email to