This is an automated email from the ASF dual-hosted git repository. nealsun pushed a commit to branch zookeeper-api-ttlcontainer in repository https://gitbox.apache.org/repos/asf/helix.git
commit f28c4940c609cd901549f559d719cf745251eeae Author: Ramin Bashizade <[email protected]> AuthorDate: Wed May 11 13:35:00 2022 -0700 Add TTL mode to async create API in ZkClient (#2082) This PR adds methods that support creating persistent nodes with TTL asynchronously to ZkClient. --- helix-core/helix-core-1.0.5-SNAPSHOT.ivy | 2 +- .../apache/helix/zookeeper/zkclient/ZkClient.java | 25 +++++++++++++++++----- .../zkclient/callback/ZkAsyncCallbacks.java | 8 ++++++- zookeeper-api/zookeeper-api-1.0.5-SNAPSHOT.ivy | 2 +- 4 files changed, 29 insertions(+), 8 deletions(-) diff --git a/helix-core/helix-core-1.0.5-SNAPSHOT.ivy b/helix-core/helix-core-1.0.5-SNAPSHOT.ivy index 630ab43fc..0a3ff82cd 100755 --- a/helix-core/helix-core-1.0.5-SNAPSHOT.ivy +++ b/helix-core/helix-core-1.0.5-SNAPSHOT.ivy @@ -52,7 +52,7 @@ under the License. <dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" rev="2.17.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"> <artifact name="log4j-slf4j-impl" ext="jar"/> </dependency> - <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.4.13" conf="compile->compile(default);runtime->runtime(default);default->default"/> + <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.5.9" conf="compile->compile(default);runtime->runtime(default);default->default"/> <dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="2.12.6.1" conf="compile->compile(default);runtime->runtime(default);default->default"/> <dependency org="com.fasterxml.jackson.core" name="jackson-core" rev="2.12.6" conf="compile->compile(default);runtime->runtime(default);default->default"/> <dependency org="commons-io" name="commons-io" rev="2.11.0" conf="compile->compile(default);runtime->runtime(default);default->default"/> diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index be729205c..c6b74239b 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -85,6 +85,7 @@ import org.slf4j.LoggerFactory; public class ZkClient implements Watcher { private static final Logger LOG = LoggerFactory.getLogger(ZkClient.class); + public static final long TTL_NOT_SET = -1L; private static final long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds // If number of children exceeds this limit, getChildren() should not retry on connection loss. @@ -1937,10 +1938,10 @@ public class ZkClient implements Watcher { new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null); return; } - doAsyncCreate(path, data, mode, startT, cb, parseExpectedSessionId(datat)); + doAsyncCreate(path, data, mode, TTL_NOT_SET, startT, cb, parseExpectedSessionId(datat)); } - private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode, + private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode, long ttl, final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb, final String expectedSessionId) { try { retryUntilConnected(() -> { @@ -1949,19 +1950,33 @@ public class ZkClient implements Watcher { GZipCompressionUtil.isCompressed(data)) { @Override protected void doRetry() { - doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb, expectedSessionId); + doAsyncCreate(path, data, mode, ttl, System.currentTimeMillis(), cb, expectedSessionId); } - }); + }, ttl); return null; }); } catch (RuntimeException e) { // Process callback to release caller from waiting cb.processResult(KeeperException.Code.APIERROR.intValue(), path, - new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null); + new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null, null); throw e; } } + public void asyncCreate(final String path, Object datat, final CreateMode mode, long ttl, + final ZkAsyncCallbacks.CreateCallbackHandler cb) { + final long startT = System.currentTimeMillis(); + final byte[] data; + try { + data = (datat == null ? null : serialize(datat, path)); + } catch (ZkMarshallingError e) { + cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path, + new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null, null); + return; + } + doAsyncCreate(path, data, mode, ttl, startT, cb, parseExpectedSessionId(datat)); + } + // Async Data Accessors public void asyncSetData(final String path, Object datat, final int version, final ZkAsyncCallbacks.SetDataCallbackHandler cb) { diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java index 506d23481..72e2b95b7 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor; import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.AsyncCallback.Create2Callback; import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.AsyncCallback.StringCallback; @@ -111,12 +112,17 @@ public class ZkAsyncCallbacks { } } - public static class CreateCallbackHandler extends DefaultCallback implements StringCallback { + public static class CreateCallbackHandler extends DefaultCallback implements StringCallback, Create2Callback { @Override public void processResult(int rc, String path, Object ctx, String name) { callback(rc, path, ctx); } + @Override + public void processResult(int rc, String path, Object ctx, String name, Stat stat) { + callback(rc, path, ctx); + } + @Override public void handle() { // TODO Auto-generated method stub diff --git a/zookeeper-api/zookeeper-api-1.0.5-SNAPSHOT.ivy b/zookeeper-api/zookeeper-api-1.0.5-SNAPSHOT.ivy index 17cb116be..065cdda8d 100644 --- a/zookeeper-api/zookeeper-api-1.0.5-SNAPSHOT.ivy +++ b/zookeeper-api/zookeeper-api-1.0.5-SNAPSHOT.ivy @@ -43,7 +43,7 @@ under the License. <dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" rev="2.17.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"> <artifact name="log4j-slf4j-impl" ext="jar"/> </dependency> - <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.4.13" conf="compile->compile(default);runtime->runtime(default);default->default"/> + <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.5.9" conf="compile->compile(default);runtime->runtime(default);default->default"/> <dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="2.12.6.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/> <dependency org="com.fasterxml.jackson.core" name="jackson-core" rev="2.12.6" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/> <dependency org="commons-cli" name="commons-cli" rev="1.2" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
