This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 855cf9b7c589bc7e757e1b9b35a0eb0401942b22
Author: Yan Zhao <[email protected]>
AuthorDate: Tue Jul 26 17:32:40 2022 +0800

    Fix autoRecover memory leak. (#3361)
    
    (cherry picked from commit da1b29a510fa924f612d22f36b3c73d61d7953d2)
---
 .../bookkeeper/meta/AbstractZkLedgerManager.java   | 42 ++++++++++++++++++++++
 .../meta/AbstractZkLedgerManagerTest.java          |  7 ++++
 .../zookeeper/MockZooKeeperTestCase.java           | 32 +++++++++++++++++
 3 files changed, 81 insertions(+)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index cda93704e0..0d4d6ee425 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -51,6 +51,7 @@ import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
@@ -154,6 +155,30 @@ public abstract class AbstractZkLedgerManager implements 
LedgerManager, Watcher
         }
     }
 
+    /**
+     * CancelWatchLedgerMetadataTask class.
+     */
+    protected class CancelWatchLedgerMetadataTask implements Runnable {
+
+        final long ledgerId;
+
+        CancelWatchLedgerMetadataTask(long ledgerId) {
+            this.ledgerId = ledgerId;
+        }
+
+        @Override
+        public void run() {
+            Set<LedgerMetadataListener> listeners = 
AbstractZkLedgerManager.this.listeners.get(ledgerId);
+            if (!CollectionUtils.isEmpty(listeners)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Still watch ledgerId: {}, ignore this unwatch 
task.", ledgerId);
+                }
+                return;
+            }
+            cancelMetadataWatch(ledgerId, AbstractZkLedgerManager.this);
+        }
+    }
+
     /**
      * ZooKeeper-based Ledger Manager Constructor.
      *
@@ -420,11 +445,28 @@ public abstract class AbstractZkLedgerManager implements 
LedgerManager, Watcher
                 }
                 if (listenerSet.isEmpty()) {
                     listeners.remove(ledgerId, listenerSet);
+                    new CancelWatchLedgerMetadataTask(ledgerId).run();
                 }
             }
         }
     }
 
+    private void cancelMetadataWatch(long ledgerId, Watcher watcher) {
+        zk.removeWatches(getLedgerPath(ledgerId), watcher, WatcherType.Data, 
true, new VoidCallback() {
+            @Override
+            public void processResult(int rc, String path, Object o) {
+                if (rc != KeeperException.Code.OK.intValue()) {
+                    LOG.error("Cancel watch ledger {} metadata failed.", 
ledgerId,
+                            
KeeperException.create(KeeperException.Code.get(rc), path));
+                    return;
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cancel watch ledger {} metadata succeed.", 
ledgerId);
+                }
+            }
+        }, null);
+    }
+
     @Override
     public CompletableFuture<Versioned<LedgerMetadata>> 
readLedgerMetadata(long ledgerId) {
         return readLedgerMetadata(ledgerId, null);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
index 5c6a514a7e..720ed3a594 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -824,9 +824,16 @@ public class AbstractZkLedgerManagerTest extends 
MockZooKeeperTestCase {
             ledgerStr, true,
             KeeperException.Code.OK.intValue(), serDe.serialize(metadata), 
stat);
 
+        mockZkRemoveWatcher();
+
         // unregister the listener
         ledgerManager.unregisterLedgerMetadataListener(ledgerId, listener);
         assertFalse(ledgerManager.listeners.containsKey(ledgerId));
+        assertFalse(watchers.containsKey(ledgerStr));
+        verify(mockZk, 
times(1)).removeWatches(eq(ledgerManager.getLedgerPath(ledgerId)),
+                any(Watcher.class), any(Watcher.WatcherType.class), 
any(Boolean.class),
+                any(VoidCallback.class), any());
+
 
         // notify the watcher event
         notifyWatchedEvent(
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
index 4190bf5c42..fdabc5e0d4 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
@@ -40,6 +40,7 @@ import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
@@ -83,6 +84,20 @@ public abstract class MockZooKeeperTestCase {
         watcherSet.add(watcher);
     }
 
+    private void removeWatcher(String path, Watcher watcher) {
+        if (watcher == null) {
+            return;
+        }
+        Set<Watcher> watcherSet = watchers.get(path);
+        if (null == watcherSet) {
+            return;
+        }
+        watcherSet.remove(watcher);
+        if (watcherSet.isEmpty()) {
+            watchers.remove(path);
+        }
+    }
+
     protected void mockZkUtilsAsyncCreateFullPathOptimistic(
         String expectedLedgerPath,
         CreateMode expectedCreateMode,
@@ -187,7 +202,24 @@ public abstract class MockZooKeeperTestCase {
             expectedWatcher ? any(Watcher.class) : eq(null),
             any(DataCallback.class),
             any());
+    }
 
+    protected void mockZkRemoveWatcher () throws Exception {
+        doAnswer(invocationOnMock -> {
+            String path = invocationOnMock.getArgument(0);
+            Watcher watcher = invocationOnMock.getArgument(1);
+            VoidCallback callback = invocationOnMock.getArgument(4);
+            removeWatcher(path, watcher);
+
+            callback.processResult(KeeperException.Code.OK.intValue(), path, 
null);
+            return null;
+        }).when(mockZk).removeWatches(
+                any(String.class),
+                any(Watcher.class),
+                any(Watcher.WatcherType.class),
+                any(Boolean.class),
+                any(VoidCallback.class),
+                any());
     }
 
     protected void mockZkSetData(

Reply via email to