This is an automated email from the ASF dual-hosted git repository.

kezhuw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new bac8ba9ba Fix PersistentWatcher not working with NamespaceFacade 
(#1262)
bac8ba9ba is described below

commit bac8ba9bab5e69f4531ccb90164eec39d2570eb5
Author: Kezhu Wang <[email protected]>
AuthorDate: Sat Apr 12 19:26:29 2025 +0800

    Fix PersistentWatcher not working with NamespaceFacade (#1262)
    
    `NamespaceFacade` does not support `getCuratorListenable` while #520 use
    it to listen for `CuratorEventType.CLOSING` to fix CURATOR-729.
    
    This commit exports `CuratorFrameworkBase::client` to retrieve
    underlying framework client to listen for for `CuratorEventType.CLOSING`.
    
    Fixes #1259.
---
 .../framework/imps/CuratorFrameworkBase.java       |  5 +++
 .../framework/imps/CuratorFrameworkImpl.java       |  5 +++
 .../framework/imps/DelegatingCuratorFramework.java |  5 +++
 .../framework/recipes/watch/PersistentWatcher.java |  4 ++-
 .../recipes/watch/TestPersistentWatcher.java       | 39 ++++++++++++++++++++++
 5 files changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkBase.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkBase.java
index cc98a0957..c3009ac85 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkBase.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkBase.java
@@ -58,6 +58,11 @@ import org.apache.zookeeper.ZooKeeper;
 public abstract class CuratorFrameworkBase implements CuratorFramework {
     abstract NamespaceImpl getNamespaceImpl();
 
+    /**
+     * Return the underlying client which is the one constructed from {@link 
org.apache.curator.framework.CuratorFrameworkFactory}.
+     */
+    public abstract CuratorFramework client();
+
     @Override
     public final CuratorFramework nonNamespaceView() {
         return usingNamespace(null);
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 0f1680f3e..ae4ba3a81 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -254,6 +254,11 @@ public final class CuratorFrameworkImpl extends 
CuratorFrameworkBase {
         // NOP
     }
 
+    @Override
+    public CuratorFramework client() {
+        return this;
+    }
+
     @Override
     public CuratorFrameworkState getState() {
         return state.get();
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/DelegatingCuratorFramework.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/DelegatingCuratorFramework.java
index 5850242bf..df9ff0b61 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/DelegatingCuratorFramework.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/DelegatingCuratorFramework.java
@@ -47,6 +47,11 @@ abstract class DelegatingCuratorFramework extends 
CuratorFrameworkBase {
         this.client = client;
     }
 
+    @Override
+    public final CuratorFramework client() {
+        return client.client();
+    }
+
     @Override
     public CuratorFrameworkState getState() {
         return client.getState();
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index e8d191c46..38487d237 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -27,6 +27,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorClosedException;
 import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.imps.CuratorFrameworkBase;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.framework.state.ConnectionStateListener;
@@ -80,7 +81,8 @@ public class PersistentWatcher implements Closeable {
     public void start() {
         Preconditions.checkState(state.compareAndSet(State.LATENT, 
State.STARTED), "Already started");
         
client.getConnectionStateListenable().addListener(connectionStateListener);
-        client.getCuratorListenable().addListener(((ignored, event) -> {
+        // This could be a namespaced facade which does not support 
getCuratorListenable.
+        ((CuratorFrameworkBase) 
client).client().getCuratorListenable().addListener(((ignored, event) -> {
             if (event.getType() == CuratorEventType.CLOSING) {
                 onClientClosed();
             }
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
index 902b18a0a..a94a99281 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -48,6 +48,45 @@ public class TestPersistentWatcher extends CuratorTestBase {
         internalTest(false);
     }
 
+    @Test
+    public void testNamespacedWatching() throws Exception {
+        BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1))) {
+            client.start();
+            // given: connected curator client
+            client.blockUntilConnected();
+
+            // given: started persistent watcher under namespaced facade
+            PersistentWatcher persistentWatcher = new 
PersistentWatcher(client.usingNamespace("top"), "/main", true);
+            persistentWatcher.getListenable().addListener(events::add);
+            persistentWatcher.start();
+
+            // when: create paths
+            client.create().forPath("/top/main");
+            client.create().forPath("/top/main/a");
+
+            // then: receive node watch events
+            WatchedEvent event1 = events.poll(5, TimeUnit.SECONDS);
+            assertNotNull(event1);
+            assertEquals(Watcher.Event.EventType.NodeCreated, 
event1.getType());
+            assertEquals("/main", event1.getPath());
+
+            WatchedEvent event2 = events.poll(5, TimeUnit.SECONDS);
+            assertNotNull(event2);
+            assertEquals(Watcher.Event.EventType.NodeCreated, 
event2.getType());
+            assertEquals("/main/a", event2.getPath());
+        }
+
+        // when: curator client closed
+        // then: listener get Closed notification
+        WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
+        assertNotNull(event);
+        assertEquals(Watcher.Event.EventType.None, event.getType());
+        assertEquals(Watcher.Event.KeeperState.Closed, event.getState());
+    }
+
     @Test
     public void testConcurrentClientClose() throws Exception {
         BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();

Reply via email to