merlimat commented on a change in pull request #11198:
URL: https://github.com/apache/pulsar/pull/11198#discussion_r663144657



##########
File path: 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
##########
@@ -91,12 +95,38 @@ public ZKMetadataStore(ZooKeeper zkc) {
         this.sessionWatcher = new ZKSessionWatcher(zkc, 
this::receivedSessionEvent);
     }
 
+    @Override
+    protected void receivedSessionEvent(SessionEvent event) {
+        if (event == SessionEvent.SessionReestablished) {
+            // Recreate the persistent watch on the new session
+            zkc.addWatch("/", this::handleWatchEvent, 
AddWatchMode.PERSISTENT_RECURSIVE,
+                    (rc, path, ctx) -> {
+                        if (rc == Code.OK.intValue()) {
+                            super.receivedSessionEvent(event);
+                        } else {
+                            log.error("Failed to recreate persistent watch on 
ZooKeeper: {}", Code.get(rc));
+                            sessionWatcher.setSessionInvalid();
+                            // On the reconnectable client, mark the session 
as expired to trigger a new reconnect and 
+                            // we will have the chance to set the watch again.
+                            if (zkc instanceof ZooKeeperClient) {
+                                ((ZooKeeperClient) zkc).process(

Review comment:
       `ZooKeeperClient` class is from BK and it is the wrapper on top of 
`ZooKeeper` that handles creating a new session when the current session 
expires. 
   
   The problem here is that when the session gets reestablished, we also need 
to ensure the persistent watch is recreated. 
   
   If creating the watch fails, though, we need to retry creating the watch. In 
the meantime we would not be receiving watch notifications. For that here I'm 
down casting to the BK `ZooKeeperClient` and forcing it to recreate a new ZK 
session. When that will happen, we will then try again to add the watch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to