This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.14 by this push:
     new a68013e  fix: use correct lockpath, store Lock to heldLocks at 
ZkLedgerUnderreplicationManager#acquireUnderreplicatedLedger (#2855)
a68013e is described below

commit a68013e660eb9ff008369667ae13bd756a6902cf
Author: Yuri Mizushima <[email protected]>
AuthorDate: Tue Nov 2 15:58:57 2021 +0900

    fix: use correct lockpath, store Lock to heldLocks at 
ZkLedgerUnderreplicationManager#acquireUnderreplicatedLedger (#2855)
---
 .../meta/ZkLedgerUnderreplicationManager.java      | 57 ++++++++++++----------
 .../bookie/GcOverreplicatedLedgerTest.java         | 32 ++++++++++++
 2 files changed, 64 insertions(+), 25 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index 680264a..b340e07 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -33,6 +33,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -94,9 +95,9 @@ public class ZkLedgerUnderreplicationManager implements 
LedgerUnderreplicationMa
 
     private static class Lock {
         private final String lockZNode;
-        private final int ledgerZNodeVersion;
+        private final Optional<Integer> ledgerZNodeVersion;
 
-        Lock(String lockZNode, int ledgerZNodeVersion) {
+        Lock(String lockZNode, Optional<Integer> ledgerZNodeVersion) {
             this.lockZNode = lockZNode;
             this.ledgerZNodeVersion = ledgerZNodeVersion;
         }
@@ -105,13 +106,14 @@ public class ZkLedgerUnderreplicationManager implements 
LedgerUnderreplicationMa
             return lockZNode;
         }
 
-        int getLedgerZNodeVersion() {
+        Optional<Integer> getLedgerZNodeVersion() {
             return ledgerZNodeVersion;
         }
     }
     private final Map<Long, Lock> heldLocks = new ConcurrentHashMap<Long, 
Lock>();
     private final Pattern idExtractionPattern;
 
+    private final String rootPath;
     private final String basePath;
     private final String urLedgerPath;
     private final String urLockPath;
@@ -127,7 +129,8 @@ public class ZkLedgerUnderreplicationManager implements 
LedgerUnderreplicationMa
     public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, 
ZooKeeper zkc)
             throws KeeperException, InterruptedException, 
ReplicationException.CompatibilityException {
         this.conf = conf;
-        basePath = 
getBasePath(ZKMetadataDriverBase.resolveZkLedgersRootPath(conf));
+        rootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
+        basePath = getBasePath(rootPath);
         layoutZNode = basePath + '/' + BookKeeperConstants.LAYOUT_ZNODE;
         urLedgerPath = basePath
                 + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
@@ -389,24 +392,27 @@ public class ZkLedgerUnderreplicationManager implements 
LedgerUnderreplicationMa
         try {
             Lock l = heldLocks.get(ledgerId);
             if (l != null) {
-                zkc.delete(getUrLedgerZnode(ledgerId), 
l.getLedgerZNodeVersion());
+                final Optional<Integer> ledgerZNodeVersion = 
l.getLedgerZNodeVersion();
+                if (ledgerZNodeVersion.isPresent()) {
+                    zkc.delete(getUrLedgerZnode(ledgerId), 
ledgerZNodeVersion.get());
 
-                try {
-                    // clean up the hierarchy
-                    String[] parts = getUrLedgerZnode(ledgerId).split("/");
-                    for (int i = 1; i <= 4; i++) {
-                        String[] p = Arrays.copyOf(parts, parts.length - i);
-                        String path = Joiner.on("/").join(p);
-                        Stat s = zkc.exists(path, null);
-                        if (s != null) {
-                            zkc.delete(path, s.getVersion());
+                    try {
+                        // clean up the hierarchy
+                        String[] parts = getUrLedgerZnode(ledgerId).split("/");
+                        for (int i = 1; i <= 4; i++) {
+                            String[] p = Arrays.copyOf(parts, parts.length - 
i);
+                            String path = Joiner.on("/").join(p);
+                            Stat s = zkc.exists(path, null);
+                            if (s != null) {
+                                zkc.delete(path, s.getVersion());
+                            }
                         }
+                    } catch (KeeperException.NotEmptyException nee) {
+                        // This can happen when cleaning up the hierarchy.
+                        // It's safe to ignore, it simply means another
+                        // ledger in the same hierarchy has been marked as
+                        // underreplicated.
                     }
-                } catch (KeeperException.NotEmptyException nee) {
-                    // This can happen when cleaning up the hierarchy.
-                    // It's safe to ignore, it simply means another
-                    // ledger in the same hierarchy has been marked as
-                    // underreplicated.
                 }
             }
         } catch (KeeperException.NoNodeException nne) {
@@ -528,7 +534,7 @@ public class ZkLedgerUnderreplicationManager implements 
LedgerUnderreplicationMa
                     String lockPath = urLockPath + "/" + tryChild;
                     long ledgerId = getLedgerId(tryChild);
                     zkc.create(lockPath, LOCK_DATA, zkAcls, 
CreateMode.EPHEMERAL);
-                    heldLocks.put(ledgerId, new Lock(lockPath, 
stat.getVersion()));
+                    heldLocks.put(ledgerId, new Lock(lockPath, 
Optional.of(stat.getVersion())));
                     return ledgerId;
                 } catch (KeeperException.NodeExistsException nee) {
                     children.remove(tryChild);
@@ -782,19 +788,20 @@ public class ZkLedgerUnderreplicationManager implements 
LedgerUnderreplicationMa
     /**
      * Acquire the underreplicated ledger lock.
      */
-    public static void acquireUnderreplicatedLedgerLock(ZooKeeper zkc, String 
zkLedgersRootPath,
+    public static String acquireUnderreplicatedLedgerLock(ZooKeeper zkc, 
String zkLedgersRootPath,
         long ledgerId, List<ACL> zkAcls)
             throws KeeperException, InterruptedException {
-        ZkUtils.createFullPathOptimistic(zkc, 
getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId),
-                LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
+        final String lockPath = 
getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId);
+        ZkUtils.createFullPathOptimistic(zkc, lockPath, LOCK_DATA, zkAcls, 
CreateMode.EPHEMERAL);
+        return lockPath;
     }
 
     @Override
     public void acquireUnderreplicatedLedger(long ledgerId)
             throws ReplicationException  {
         try {
-            acquireUnderreplicatedLedgerLock(zkc, 
getUrLedgerLockZnode(urLockPath, ledgerId), ledgerId,
-                    ZkUtils.getACLs(conf));
+            final String lockPath = acquireUnderreplicatedLedgerLock(zkc, 
rootPath, ledgerId, ZkUtils.getACLs(conf));
+            heldLocks.put(ledgerId, new Lock(lockPath, Optional.empty()));
         } catch (Exception e) {
             throw new ReplicationException.UnavailableException(
                     "Failed to acquire underreplicated ledger lock for " + 
ledgerId, e);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GcOverreplicatedLedgerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GcOverreplicatedLedgerTest.java
index d5b454b..67527d5 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GcOverreplicatedLedgerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GcOverreplicatedLedgerTest.java
@@ -25,11 +25,13 @@ import com.google.common.collect.Lists;
 
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -38,12 +40,17 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManagerTestCase;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.SnapshotMap;
+import org.apache.commons.configuration.ConfigurationException;
 import org.apache.zookeeper.ZooDefs;
 import org.junit.Assert;
 import org.junit.Before;
@@ -84,6 +91,16 @@ public class GcOverreplicatedLedgerTest extends 
LedgerManagerTestCase {
 
         BookieId bookieNotInEnsemble = 
getBookieNotInEnsemble(newLedgerMetadata);
         ServerConfiguration bkConf = getBkConf(bookieNotInEnsemble);
+
+        @Cleanup
+        final MetadataBookieDriver metadataDriver = 
instantiateMetadataDriver(bkConf);
+        @Cleanup
+        final LedgerManagerFactory lmf = 
metadataDriver.getLedgerManagerFactory();
+        @Cleanup
+        final LedgerUnderreplicationManager lum = 
lmf.newLedgerUnderreplicationManager();
+
+        Assert.assertFalse(lum.isLedgerBeingReplicated(lh.getId()));
+
         bkConf.setGcOverreplicatedLedgerWaitTime(10, TimeUnit.MILLISECONDS);
 
         lh.close();
@@ -105,9 +122,24 @@ public class GcOverreplicatedLedgerTest extends 
LedgerManagerTestCase {
             }
         });
 
+        Assert.assertFalse(lum.isLedgerBeingReplicated(lh.getId()));
         Assert.assertFalse(activeLedgers.containsKey(lh.getId()));
     }
 
+    private static MetadataBookieDriver 
instantiateMetadataDriver(ServerConfiguration conf)
+            throws BookieException {
+        try {
+            final String metadataServiceUriStr = conf.getMetadataServiceUri();
+            final MetadataBookieDriver driver = 
MetadataDrivers.getBookieDriver(URI.create(metadataServiceUriStr));
+            driver.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+            return driver;
+        } catch (MetadataException me) {
+            throw new BookieException.MetadataStoreException("Failed to 
initialize metadata bookie driver", me);
+        } catch (ConfigurationException e) {
+            throw new BookieException.BookieIllegalOpException(e);
+        }
+    }
+
     @Test
     public void testNoGcOfLedger() throws Exception {
         LedgerHandle lh = bkc.createLedger(2, 2, DigestType.MAC, 
"".getBytes());

Reply via email to