This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new 19cece4f6 Add exception for register ZK one time watcher when 
_usePersistWatcher flag is on (#2465)
19cece4f6 is described below

commit 19cece4f62476a21b427d3ede620100fdeabe446
Author: xyuanlu <[email protected]>
AuthorDate: Tue May 9 17:54:51 2023 -0700

    Add exception for register ZK one time watcher when _usePersistWatcher flag 
is on (#2465)
    
    When Zk Client register a one time listener on a path hat has already a 
persist watcher registered. ZK will over write the persist watcher and only 
trigger event once when change happen. In Helix ZkClient, we should disable one 
time wacher registration when _usePersistWatcher is set to true.
---
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 16 +++++
 .../zookeeper/impl/TestZooKeeperConnection.java    | 68 ++++++++++++++++++++++
 .../TestZkClientPersistWatcher.java                | 64 +++++++++++++++-----
 3 files changed, 132 insertions(+), 16 deletions(-)

diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 41c23a6d1..fb94e5436 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -1360,6 +1360,7 @@ public class ZkClient implements Watcher {
   }
 
   protected List<String> getChildren(final String path, final boolean watch) {
+    validateNativeZkWatcherType(watch);
     long startT = System.currentTimeMillis();
 
     try {
@@ -1424,6 +1425,7 @@ public class ZkClient implements Watcher {
   }
 
   protected boolean exists(final String path, final boolean watch) {
+    validateNativeZkWatcherType(watch);
     long startT = System.currentTimeMillis();
     try {
       boolean exists = retryUntilConnected(new Callable<Boolean>() {
@@ -1453,6 +1455,7 @@ public class ZkClient implements Watcher {
   }
 
   private Stat getStat(final String path, final boolean watch) {
+    validateNativeZkWatcherType(watch);
     long startT = System.currentTimeMillis();
     final Stat stat;
     try {
@@ -2152,6 +2155,7 @@ public class ZkClient implements Watcher {
 
   @SuppressWarnings("unchecked")
   public <T extends Object> T readData(final String path, final Stat stat, 
final boolean watch) {
+    validateNativeZkWatcherType(watch);
     long startT = System.currentTimeMillis();
     byte[] data = null;
     try {
@@ -3047,4 +3051,16 @@ public class ZkClient implements Watcher {
       _persistListenerMutex.unlock();
     }
   }
+
+  /*
+    Throws exception when try to subscribe watch when using 
_usePersistWatcher. When ZkClient
+    is subscribed as persist watcher, resubscribing the same object as onw 
time watcher will
+    over write the persist watcher causing missing following event.
+   */
+  private void validateNativeZkWatcherType(boolean watch) {
+    if (_usePersistWatcher && watch) {
+      throw new IllegalArgumentException(
+          "Can not subscribe one time watcher when ZkClient is using 
PersistWatcher");
+    }
+  }
 }
diff --git 
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestZooKeeperConnection.java
 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestZooKeeperConnection.java
new file mode 100644
index 000000000..1ea51e2f9
--- /dev/null
+++ 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestZooKeeperConnection.java
@@ -0,0 +1,68 @@
+package org.apache.helix.zookeeper.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.zookeeper.zkclient.IZkConnection;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZooKeeperConnection extends ZkTestBase {
+  final int count = 100;
+  final int[] get_count = {0};
+  CountDownLatch countDownLatch = new CountDownLatch(count*2);
+
+
+  /*
+  This function tests persist watchers' behavior in {@link 
org.apache.helix.zookeeper.zkclient.ZkConnection}
+  1. Register a persist watcher on a path and create 100 children Znode, edit 
the ZNode for 100 times.
+  Expecting 200 events.
+  2. register a one time listener on the path. Make the same change and count 
the total number of event.
+  */
+  @Test
+  void testPersistWatcher() throws Exception {
+    Watcher watcher1 = new PersistWatcher();
+    ZkClient zkClient =   new 
org.apache.helix.zookeeper.impl.client.ZkClient(ZK_ADDR);
+    IZkConnection _zk = zkClient.getConnection();
+    String path="/testPersistWatcher";
+    _zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    // register a persist listener on a path, change the ZNode 100 times, 
create 100 child ZNode,
+    // and expecting 200 events
+    _zk.addWatch(path, watcher1, AddWatchMode.PERSISTENT);
+    for (int i=0; i<count; ++i) {
+      _zk.writeData(path, "datat".getBytes(), -1);
+      _zk.create(path+"/c1_" +i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+    }
+    Assert.assertTrue(countDownLatch.await(50000, TimeUnit.MILLISECONDS));
+
+    // register a one time listener on the path. Count the total number of 
event.
+    // ZK will over write the persist watcher and only trigger event once for 
child and data change.
+    _zk.readData(path, null, true);
+    _zk.getChildren(path, true);
+    for (int i=0; i<200; ++i) {
+      _zk.writeData(path, ("datat"+i).getBytes(), -1);
+      _zk.create(path+"/c2_" +i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+    }
+    Assert.assertTrue(TestHelper.verify(() -> {
+          return (get_count[0] == 202);
+        }, TestHelper.WAIT_DURATION));
+    System.out.println("testPersistWatcher received event count: " + 
get_count[0]);
+    zkClient.close();
+  }
+
+  class PersistWatcher implements Watcher {
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+      get_count[0]++;
+      countDownLatch.countDown();
+    }
+  }
+
+}
\ No newline at end of file
diff --git 
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java
 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
similarity index 61%
rename from 
zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java
rename to 
zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
index 1a13fc3ba..3e09d1f43 100644
--- 
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientPersistWatcher.java
+++ 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
@@ -1,4 +1,4 @@
-package org.apache.helix.zookeeper.impl.client;
+package org.apache.helix.zookeeper.zkclient;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -24,9 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.zookeeper.impl.ZkTestBase;
-import org.apache.helix.zookeeper.impl.ZkTestHelper;
-import org.apache.helix.zookeeper.zkclient.IZkChildListener;
-import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.SerializableSerializer;
 import org.apache.zookeeper.CreateMode;
@@ -38,10 +36,11 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
 
   @Test
   void testZkClientDataChange() throws Exception {
-    ZkClient.Builder builder = new ZkClient.Builder();
-    builder.setZkServer(ZkTestBase.ZK_ADDR)
-        .setMonitorRootPathOnly(false).setUsePersistWatcher(true);
-    ZkClient zkClient = builder.build();
+    org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
+        new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
+    builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
+        .setUsePersistWatcher(true);
+    org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build();
     zkClient.setZkSerializer(new BasicZkSerializer(new 
SerializableSerializer()));
     int count = 1000;
     final int[] event_count = {0};
@@ -51,7 +50,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
       @Override
       public void handleDataChange(String dataPath, Object data) throws 
Exception {
         countDownLatch1.countDown();
-        event_count[0]++ ;
+        event_count[0]++;
       }
 
       @Override
@@ -71,10 +70,11 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
 
   @Test(dependsOnMethods = "testZkClientDataChange")
   void testZkClientChildChange() throws Exception {
-    ZkClient.Builder builder = new ZkClient.Builder();
-    builder.setZkServer(ZkTestBase.ZK_ADDR)
-        .setMonitorRootPathOnly(false).setUsePersistWatcher(true);
-    ZkClient zkClient = builder.build();
+    org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
+        new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
+    builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
+        .setUsePersistWatcher(true);
+    org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build();
     zkClient.setZkSerializer(new BasicZkSerializer(new 
SerializableSerializer()));
     int count = 100;
     final int[] event_count = {0};
@@ -94,18 +94,50 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
       public void handleChildChange(String parentPath, List<String> 
currentChilds)
           throws Exception {
         countDownLatch2.countDown();
-        event_count[0]++ ;
+        event_count[0]++;
       }
     };
     zkClient.subscribeChildChanges(path, childListener);
     zkClient.subscribeChildChanges(path, childListener2);
     zkClient.create(path, "datat", CreateMode.PERSISTENT);
-    for(int i=0; i<count; ++i) {
-      zkClient.create(path + "/child" +i , "datat", CreateMode.PERSISTENT);
+    for (int i = 0; i < count; ++i) {
+      zkClient.create(path + "/child" + i, "datat", CreateMode.PERSISTENT);
     }
     Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS));
     Assert.assertTrue(countDownLatch2.await(15000, TimeUnit.MILLISECONDS));
     zkClient.close();
   }
 
+  @Test
+  void testSubscribeOneTimeChangeWhenUsingPersistWatcher() {
+    org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
+        new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
+    builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
+        .setUsePersistWatcher(true);
+    ZkClient zkClient = builder.build();
+    zkClient.setZkSerializer(new BasicZkSerializer(new 
SerializableSerializer()));
+
+    String path = "/testSubscribeOneTimeChangeWhenUsingPersistWatcher";
+    zkClient.create(path, "datat", CreateMode.PERSISTENT);
+    try {
+      zkClient.exists(path, true);
+      Assert.fail("Should throw exception when subscribe one time listener");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getClass().getName(), 
"java.lang.IllegalArgumentException");
+    }
+
+    try {
+      zkClient.readData(path, null,  true);
+      Assert.fail("Should throw exception when subscribe one time listener");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getClass().getName(), 
"java.lang.IllegalArgumentException");
+    }
+
+    try {
+      zkClient.getChildren(path, true);
+      Assert.fail("Should throw exception when subscribe one time listener");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getClass().getName(), 
"java.lang.IllegalArgumentException");
+    }
+  }
 }
\ No newline at end of file

Reply via email to