This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zookeeper-api-ttlcontainer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/zookeeper-api-ttlcontainer by 
this push:
     new 881f06e7a Add TTL mode to async create API in ZkClient (#2082)
881f06e7a is described below

commit 881f06e7a10b1cdf22dae2dd1c03f01b62128a0b
Author: Ramin Bashizade <[email protected]>
AuthorDate: Wed May 11 13:35:00 2022 -0700

    Add TTL mode to async create API in ZkClient (#2082)
    
    This PR adds methods that support creating persistent nodes with TTL 
asynchronously to ZkClient.
---
 helix-core/helix-core-1.0.4-SNAPSHOT.ivy           |  2 +-
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 25 +++++++++++++++++-----
 .../zkclient/callback/ZkAsyncCallbacks.java        |  8 ++++++-
 zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy     |  2 +-
 4 files changed, 29 insertions(+), 8 deletions(-)

diff --git a/helix-core/helix-core-1.0.4-SNAPSHOT.ivy 
b/helix-core/helix-core-1.0.4-SNAPSHOT.ivy
index e4fffbe07..80ed665a1 100755
--- a/helix-core/helix-core-1.0.4-SNAPSHOT.ivy
+++ b/helix-core/helix-core-1.0.4-SNAPSHOT.ivy
@@ -52,7 +52,7 @@ under the License.
     <dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" 
rev="2.17.1" force="true" 
conf="compile->compile(*),master(*);runtime->runtime(*)">
         <artifact name="log4j-slf4j-impl" ext="jar"/>
     </dependency>
-    <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.4.13" 
conf="compile->compile(default);runtime->runtime(default);default->default"/>
+    <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.5.9" 
conf="compile->compile(default);runtime->runtime(default);default->default"/>
                <dependency org="com.fasterxml.jackson.core" 
name="jackson-databind" rev="2.12.6.1" 
conf="compile->compile(default);runtime->runtime(default);default->default"/>
     <dependency org="com.fasterxml.jackson.core" name="jackson-core" 
rev="2.12.6" 
conf="compile->compile(default);runtime->runtime(default);default->default"/>
     <dependency org="commons-io" name="commons-io" rev="2.11.0" 
conf="compile->compile(default);runtime->runtime(default);default->default"/>
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index be729205c..c6b74239b 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -85,6 +85,7 @@ import org.slf4j.LoggerFactory;
 public class ZkClient implements Watcher {
   private static final Logger LOG = LoggerFactory.getLogger(ZkClient.class);
 
+  public static final long TTL_NOT_SET = -1L;
   private static final long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds
 
   // If number of children exceeds this limit, getChildren() should not retry 
on connection loss.
@@ -1937,10 +1938,10 @@ public class ZkClient implements Watcher {
           new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
       return;
     }
-    doAsyncCreate(path, data, mode, startT, cb, parseExpectedSessionId(datat));
+    doAsyncCreate(path, data, mode, TTL_NOT_SET, startT, cb, 
parseExpectedSessionId(datat));
   }
 
-  private void doAsyncCreate(final String path, final byte[] data, final 
CreateMode mode,
+  private void doAsyncCreate(final String path, final byte[] data, final 
CreateMode mode, long ttl,
       final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb, 
final String expectedSessionId) {
     try {
       retryUntilConnected(() -> {
@@ -1949,19 +1950,33 @@ public class ZkClient implements Watcher {
                 GZipCompressionUtil.isCompressed(data)) {
               @Override
               protected void doRetry() {
-                doAsyncCreate(path, data, mode, System.currentTimeMillis(), 
cb, expectedSessionId);
+                doAsyncCreate(path, data, mode, ttl, 
System.currentTimeMillis(), cb, expectedSessionId);
               }
-            });
+            }, ttl);
         return null;
       });
     } catch (RuntimeException e) {
       // Process callback to release caller from waiting
       cb.processResult(KeeperException.Code.APIERROR.intValue(), path,
-          new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
+          new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null, 
null);
       throw e;
     }
   }
 
+  public void asyncCreate(final String path, Object datat, final CreateMode 
mode, long ttl,
+      final ZkAsyncCallbacks.CreateCallbackHandler cb) {
+    final long startT = System.currentTimeMillis();
+    final byte[] data;
+    try {
+      data = (datat == null ? null : serialize(datat, path));
+    } catch (ZkMarshallingError e) {
+      cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+          new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null, 
null);
+      return;
+    }
+    doAsyncCreate(path, data, mode, ttl, startT, cb, 
parseExpectedSessionId(datat));
+  }
+
   // Async Data Accessors
   public void asyncSetData(final String path, Object datat, final int version,
       final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index 506d23481..72e2b95b7 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.Create2Callback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
@@ -111,12 +112,17 @@ public class ZkAsyncCallbacks {
     }
   }
 
-  public static class CreateCallbackHandler extends DefaultCallback implements 
StringCallback {
+  public static class CreateCallbackHandler extends DefaultCallback implements 
StringCallback, Create2Callback {
     @Override
     public void processResult(int rc, String path, Object ctx, String name) {
       callback(rc, path, ctx);
     }
 
+    @Override
+    public void processResult(int rc, String path, Object ctx, String name, 
Stat stat) {
+      callback(rc, path, ctx);
+    }
+
     @Override
     public void handle() {
       // TODO Auto-generated method stub
diff --git a/zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy 
b/zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy
index 0e57930f0..ad604c962 100644
--- a/zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy
+++ b/zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy
@@ -43,7 +43,7 @@ under the License.
     <dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" 
rev="2.17.1" force="true" 
conf="compile->compile(*),master(*);runtime->runtime(*)">
         <artifact name="log4j-slf4j-impl" ext="jar"/>
     </dependency>
-    <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.4.13" 
conf="compile->compile(default);runtime->runtime(default);default->default"/>
+    <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.5.9" 
conf="compile->compile(default);runtime->runtime(default);default->default"/>
                <dependency org="com.fasterxml.jackson.core" 
name="jackson-databind" rev="2.12.6.1" force="true" 
conf="compile->compile(*),master(*);runtime->runtime(*)"/>
                <dependency org="com.fasterxml.jackson.core" 
name="jackson-core" rev="2.12.6" force="true" 
conf="compile->compile(*),master(*);runtime->runtime(*)"/>
                <dependency org="commons-cli" name="commons-cli" rev="1.2" 
force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>

Reply via email to