merlimat commented on a change in pull request #11198: URL: https://github.com/apache/pulsar/pull/11198#discussion_r663144657
########## File path: pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java ########## @@ -91,12 +95,38 @@ public ZKMetadataStore(ZooKeeper zkc) { this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent); } + @Override + protected void receivedSessionEvent(SessionEvent event) { + if (event == SessionEvent.SessionReestablished) { + // Recreate the persistent watch on the new session + zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE, + (rc, path, ctx) -> { + if (rc == Code.OK.intValue()) { + super.receivedSessionEvent(event); + } else { + log.error("Failed to recreate persistent watch on ZooKeeper: {}", Code.get(rc)); + sessionWatcher.setSessionInvalid(); + // On the reconnectable client, mark the session as expired to trigger a new reconnect and + // we will have the chance to set the watch again. + if (zkc instanceof ZooKeeperClient) { + ((ZooKeeperClient) zkc).process( Review comment: `ZooKeeperClient` class is from BK and it is the wrapper on top of `ZooKeeper` that handles creating a new session when the current session expires. The problem here is that when the session gets reestablished, we also need to ensure the persistent watch is recreated. If creating the watch fails, though, we need to retry creating the watch. In the meantime we would not be receiving watch notifications. For that here I'm down casting to the BK `ZooKeeperClient` and forcing it to recreate a new ZK session. When that will happen, we will then try again to add the watch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org