This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d5e151f7233 [FLINK-33053][zookeeper] Manually remove the leader 
watcher after retriever closed to avoid the watcher leak at zookeeper server 
side
d5e151f7233 is described below

commit d5e151f72336abcc13082fe4bb3e05fd5a785e86
Author: Yangze Guo <guoyan...@bytedance.com>
AuthorDate: Thu Sep 14 14:50:54 2023 +0800

    [FLINK-33053][zookeeper] Manually remove the leader watcher after retriever 
closed to avoid the watcher leak at zookeeper server side
---
 .../leaderretrieval/ZooKeeperLeaderRetrievalDriver.java  | 16 ++++++++++++++++
 .../org/apache/flink/runtime/util/ZooKeeperUtils.java    |  3 ++-
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java
index ec1908e3185..a97198617a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java
@@ -30,6 +30,8 @@ import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cac
 import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache;
 import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionState;
 import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.Watcher;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +40,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ObjectInputStream;
 import java.util.UUID;
 
+import static 
org.apache.flink.runtime.util.ZooKeeperUtils.RESOURCE_MANAGER_NODE;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -122,6 +125,19 @@ public class ZooKeeperLeaderRetrievalDriver implements 
LeaderRetrievalDriver {
         
client.getConnectionStateListenable().removeListener(connectionStateListener);
 
         cache.close();
+
+        try {
+            if (client.getZookeeperClient().isConnected()
+                    && 
!connectionInformationPath.contains(RESOURCE_MANAGER_NODE)) {
+                client.watchers()
+                        .removeAll()
+                        .ofType(Watcher.WatcherType.Any)
+                        .forPath(connectionInformationPath);
+            }
+        } catch (KeeperException.NoWatcherException e) {
+            // Ignore the no watcher exception as it's just a safetynet to fix 
watcher leak issue.
+            // For more details, please refer to FLINK-33053.
+        }
     }
 
     private void retrieveLeaderInformationFromZooKeeper() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 3eff5ae64fb..38e7d92548b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -101,7 +101,8 @@ public class ZooKeeperUtils {
     /** The prefix of the completed checkpoint file. */
     public static final String HA_STORAGE_COMPLETED_CHECKPOINT = 
"completedCheckpoint";
 
-    private static final String RESOURCE_MANAGER_NODE = "resource_manager";
+    /** The prefix of the resource manager node. */
+    public static final String RESOURCE_MANAGER_NODE = "resource_manager";
 
     private static final String DISPATCHER_NODE = "dispatcher";
 

Reply via email to