This is an automated email from the ASF dual-hosted git repository.
kezhuw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new bac8ba9ba Fix PersistentWatcher not working with NamespaceFacade
(#1262)
bac8ba9ba is described below
commit bac8ba9bab5e69f4531ccb90164eec39d2570eb5
Author: Kezhu Wang <[email protected]>
AuthorDate: Sat Apr 12 19:26:29 2025 +0800
Fix PersistentWatcher not working with NamespaceFacade (#1262)
`NamespaceFacade` does not support `getCuratorListenable` while #520 use
it to listen for `CuratorEventType.CLOSING` to fix CURATOR-729.
This commit exports `CuratorFrameworkBase::client` to retrieve
underlying framework client to listen for for `CuratorEventType.CLOSING`.
Fixes #1259.
---
.../framework/imps/CuratorFrameworkBase.java | 5 +++
.../framework/imps/CuratorFrameworkImpl.java | 5 +++
.../framework/imps/DelegatingCuratorFramework.java | 5 +++
.../framework/recipes/watch/PersistentWatcher.java | 4 ++-
.../recipes/watch/TestPersistentWatcher.java | 39 ++++++++++++++++++++++
5 files changed, 57 insertions(+), 1 deletion(-)
diff --git
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkBase.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkBase.java
index cc98a0957..c3009ac85 100644
---
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkBase.java
+++
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkBase.java
@@ -58,6 +58,11 @@ import org.apache.zookeeper.ZooKeeper;
public abstract class CuratorFrameworkBase implements CuratorFramework {
abstract NamespaceImpl getNamespaceImpl();
+ /**
+ * Return the underlying client which is the one constructed from {@link
org.apache.curator.framework.CuratorFrameworkFactory}.
+ */
+ public abstract CuratorFramework client();
+
@Override
public final CuratorFramework nonNamespaceView() {
return usingNamespace(null);
diff --git
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 0f1680f3e..ae4ba3a81 100644
---
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -254,6 +254,11 @@ public final class CuratorFrameworkImpl extends
CuratorFrameworkBase {
// NOP
}
+ @Override
+ public CuratorFramework client() {
+ return this;
+ }
+
@Override
public CuratorFrameworkState getState() {
return state.get();
diff --git
a/curator-framework/src/main/java/org/apache/curator/framework/imps/DelegatingCuratorFramework.java
b/curator-framework/src/main/java/org/apache/curator/framework/imps/DelegatingCuratorFramework.java
index 5850242bf..df9ff0b61 100644
---
a/curator-framework/src/main/java/org/apache/curator/framework/imps/DelegatingCuratorFramework.java
+++
b/curator-framework/src/main/java/org/apache/curator/framework/imps/DelegatingCuratorFramework.java
@@ -47,6 +47,11 @@ abstract class DelegatingCuratorFramework extends
CuratorFrameworkBase {
this.client = client;
}
+ @Override
+ public final CuratorFramework client() {
+ return client.client();
+ }
+
@Override
public CuratorFrameworkState getState() {
return client.getState();
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index e8d191c46..38487d237 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -27,6 +27,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorClosedException;
import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.imps.CuratorFrameworkBase;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.state.ConnectionStateListener;
@@ -80,7 +81,8 @@ public class PersistentWatcher implements Closeable {
public void start() {
Preconditions.checkState(state.compareAndSet(State.LATENT,
State.STARTED), "Already started");
client.getConnectionStateListenable().addListener(connectionStateListener);
- client.getCuratorListenable().addListener(((ignored, event) -> {
+ // This could be a namespaced facade which does not support
getCuratorListenable.
+ ((CuratorFrameworkBase)
client).client().getCuratorListenable().addListener(((ignored, event) -> {
if (event.getType() == CuratorEventType.CLOSING) {
onClientClosed();
}
diff --git
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
index 902b18a0a..a94a99281 100644
---
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
+++
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -48,6 +48,45 @@ public class TestPersistentWatcher extends CuratorTestBase {
internalTest(false);
}
+ @Test
+ public void testNamespacedWatching() throws Exception {
+ BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), timing.session(),
timing.connection(), new RetryOneTime(1))) {
+ client.start();
+ // given: connected curator client
+ client.blockUntilConnected();
+
+ // given: started persistent watcher under namespaced facade
+ PersistentWatcher persistentWatcher = new
PersistentWatcher(client.usingNamespace("top"), "/main", true);
+ persistentWatcher.getListenable().addListener(events::add);
+ persistentWatcher.start();
+
+ // when: create paths
+ client.create().forPath("/top/main");
+ client.create().forPath("/top/main/a");
+
+ // then: receive node watch events
+ WatchedEvent event1 = events.poll(5, TimeUnit.SECONDS);
+ assertNotNull(event1);
+ assertEquals(Watcher.Event.EventType.NodeCreated,
event1.getType());
+ assertEquals("/main", event1.getPath());
+
+ WatchedEvent event2 = events.poll(5, TimeUnit.SECONDS);
+ assertNotNull(event2);
+ assertEquals(Watcher.Event.EventType.NodeCreated,
event2.getType());
+ assertEquals("/main/a", event2.getPath());
+ }
+
+ // when: curator client closed
+ // then: listener get Closed notification
+ WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
+ assertNotNull(event);
+ assertEquals(Watcher.Event.EventType.None, event.getType());
+ assertEquals(Watcher.Event.KeeperState.Closed, event.getState());
+ }
+
@Test
public void testConcurrentClientClose() throws Exception {
BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();