This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d5e151f7233 [FLINK-33053][zookeeper] Manually remove the leader watcher after retriever closed to avoid the watcher leak at zookeeper server side d5e151f7233 is described below commit d5e151f72336abcc13082fe4bb3e05fd5a785e86 Author: Yangze Guo <guoyan...@bytedance.com> AuthorDate: Thu Sep 14 14:50:54 2023 +0800 [FLINK-33053][zookeeper] Manually remove the leader watcher after retriever closed to avoid the watcher leak at zookeeper server side --- .../leaderretrieval/ZooKeeperLeaderRetrievalDriver.java | 16 ++++++++++++++++ .../org/apache/flink/runtime/util/ZooKeeperUtils.java | 3 ++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java index ec1908e3185..a97198617a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java @@ -30,6 +30,8 @@ import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cac import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionState; import org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +40,7 @@ import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.UUID; +import static org.apache.flink.runtime.util.ZooKeeperUtils.RESOURCE_MANAGER_NODE; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -122,6 +125,19 @@ public class ZooKeeperLeaderRetrievalDriver implements LeaderRetrievalDriver { client.getConnectionStateListenable().removeListener(connectionStateListener); cache.close(); + + try { + if (client.getZookeeperClient().isConnected() + && !connectionInformationPath.contains(RESOURCE_MANAGER_NODE)) { + client.watchers() + .removeAll() + .ofType(Watcher.WatcherType.Any) + .forPath(connectionInformationPath); + } + } catch (KeeperException.NoWatcherException e) { + // Ignore the no watcher exception as it's just a safetynet to fix watcher leak issue. + // For more details, please refer to FLINK-33053. + } } private void retrieveLeaderInformationFromZooKeeper() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 3eff5ae64fb..38e7d92548b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -101,7 +101,8 @@ public class ZooKeeperUtils { /** The prefix of the completed checkpoint file. */ public static final String HA_STORAGE_COMPLETED_CHECKPOINT = "completedCheckpoint"; - private static final String RESOURCE_MANAGER_NODE = "resource_manager"; + /** The prefix of the resource manager node. */ + public static final String RESOURCE_MANAGER_NODE = "resource_manager"; private static final String DISPATCHER_NODE = "dispatcher";