This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.14 by this push:
new a68013e fix: use correct lockpath, store Lock to heldLocks at
ZkLedgerUnderreplicationManager#acquireUnderreplicatedLedger (#2855)
a68013e is described below
commit a68013e660eb9ff008369667ae13bd756a6902cf
Author: Yuri Mizushima <[email protected]>
AuthorDate: Tue Nov 2 15:58:57 2021 +0900
fix: use correct lockpath, store Lock to heldLocks at
ZkLedgerUnderreplicationManager#acquireUnderreplicatedLedger (#2855)
---
.../meta/ZkLedgerUnderreplicationManager.java | 57 ++++++++++++----------
.../bookie/GcOverreplicatedLedgerTest.java | 32 ++++++++++++
2 files changed, 64 insertions(+), 25 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index 680264a..b340e07 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -33,6 +33,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -94,9 +95,9 @@ public class ZkLedgerUnderreplicationManager implements
LedgerUnderreplicationMa
private static class Lock {
private final String lockZNode;
- private final int ledgerZNodeVersion;
+ private final Optional<Integer> ledgerZNodeVersion;
- Lock(String lockZNode, int ledgerZNodeVersion) {
+ Lock(String lockZNode, Optional<Integer> ledgerZNodeVersion) {
this.lockZNode = lockZNode;
this.ledgerZNodeVersion = ledgerZNodeVersion;
}
@@ -105,13 +106,14 @@ public class ZkLedgerUnderreplicationManager implements
LedgerUnderreplicationMa
return lockZNode;
}
- int getLedgerZNodeVersion() {
+ Optional<Integer> getLedgerZNodeVersion() {
return ledgerZNodeVersion;
}
}
private final Map<Long, Lock> heldLocks = new ConcurrentHashMap<Long,
Lock>();
private final Pattern idExtractionPattern;
+ private final String rootPath;
private final String basePath;
private final String urLedgerPath;
private final String urLockPath;
@@ -127,7 +129,8 @@ public class ZkLedgerUnderreplicationManager implements
LedgerUnderreplicationMa
public ZkLedgerUnderreplicationManager(AbstractConfiguration conf,
ZooKeeper zkc)
throws KeeperException, InterruptedException,
ReplicationException.CompatibilityException {
this.conf = conf;
- basePath =
getBasePath(ZKMetadataDriverBase.resolveZkLedgersRootPath(conf));
+ rootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
+ basePath = getBasePath(rootPath);
layoutZNode = basePath + '/' + BookKeeperConstants.LAYOUT_ZNODE;
urLedgerPath = basePath
+ BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
@@ -389,24 +392,27 @@ public class ZkLedgerUnderreplicationManager implements
LedgerUnderreplicationMa
try {
Lock l = heldLocks.get(ledgerId);
if (l != null) {
- zkc.delete(getUrLedgerZnode(ledgerId),
l.getLedgerZNodeVersion());
+ final Optional<Integer> ledgerZNodeVersion =
l.getLedgerZNodeVersion();
+ if (ledgerZNodeVersion.isPresent()) {
+ zkc.delete(getUrLedgerZnode(ledgerId),
ledgerZNodeVersion.get());
- try {
- // clean up the hierarchy
- String[] parts = getUrLedgerZnode(ledgerId).split("/");
- for (int i = 1; i <= 4; i++) {
- String[] p = Arrays.copyOf(parts, parts.length - i);
- String path = Joiner.on("/").join(p);
- Stat s = zkc.exists(path, null);
- if (s != null) {
- zkc.delete(path, s.getVersion());
+ try {
+ // clean up the hierarchy
+ String[] parts = getUrLedgerZnode(ledgerId).split("/");
+ for (int i = 1; i <= 4; i++) {
+ String[] p = Arrays.copyOf(parts, parts.length -
i);
+ String path = Joiner.on("/").join(p);
+ Stat s = zkc.exists(path, null);
+ if (s != null) {
+ zkc.delete(path, s.getVersion());
+ }
}
+ } catch (KeeperException.NotEmptyException nee) {
+ // This can happen when cleaning up the hierarchy.
+ // It's safe to ignore, it simply means another
+ // ledger in the same hierarchy has been marked as
+ // underreplicated.
}
- } catch (KeeperException.NotEmptyException nee) {
- // This can happen when cleaning up the hierarchy.
- // It's safe to ignore, it simply means another
- // ledger in the same hierarchy has been marked as
- // underreplicated.
}
}
} catch (KeeperException.NoNodeException nne) {
@@ -528,7 +534,7 @@ public class ZkLedgerUnderreplicationManager implements
LedgerUnderreplicationMa
String lockPath = urLockPath + "/" + tryChild;
long ledgerId = getLedgerId(tryChild);
zkc.create(lockPath, LOCK_DATA, zkAcls,
CreateMode.EPHEMERAL);
- heldLocks.put(ledgerId, new Lock(lockPath,
stat.getVersion()));
+ heldLocks.put(ledgerId, new Lock(lockPath,
Optional.of(stat.getVersion())));
return ledgerId;
} catch (KeeperException.NodeExistsException nee) {
children.remove(tryChild);
@@ -782,19 +788,20 @@ public class ZkLedgerUnderreplicationManager implements
LedgerUnderreplicationMa
/**
* Acquire the underreplicated ledger lock.
*/
- public static void acquireUnderreplicatedLedgerLock(ZooKeeper zkc, String
zkLedgersRootPath,
+ public static String acquireUnderreplicatedLedgerLock(ZooKeeper zkc,
String zkLedgersRootPath,
long ledgerId, List<ACL> zkAcls)
throws KeeperException, InterruptedException {
- ZkUtils.createFullPathOptimistic(zkc,
getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId),
- LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
+ final String lockPath =
getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId);
+ ZkUtils.createFullPathOptimistic(zkc, lockPath, LOCK_DATA, zkAcls,
CreateMode.EPHEMERAL);
+ return lockPath;
}
@Override
public void acquireUnderreplicatedLedger(long ledgerId)
throws ReplicationException {
try {
- acquireUnderreplicatedLedgerLock(zkc,
getUrLedgerLockZnode(urLockPath, ledgerId), ledgerId,
- ZkUtils.getACLs(conf));
+ final String lockPath = acquireUnderreplicatedLedgerLock(zkc,
rootPath, ledgerId, ZkUtils.getACLs(conf));
+ heldLocks.put(ledgerId, new Lock(lockPath, Optional.empty()));
} catch (Exception e) {
throw new ReplicationException.UnavailableException(
"Failed to acquire underreplicated ledger lock for " +
ledgerId, e);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GcOverreplicatedLedgerTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GcOverreplicatedLedgerTest.java
index d5b454b..67527d5 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GcOverreplicatedLedgerTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GcOverreplicatedLedgerTest.java
@@ -25,11 +25,13 @@ import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.UnknownHostException;
+import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -38,12 +40,17 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerManagerTestCase;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.SnapshotMap;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.zookeeper.ZooDefs;
import org.junit.Assert;
import org.junit.Before;
@@ -84,6 +91,16 @@ public class GcOverreplicatedLedgerTest extends
LedgerManagerTestCase {
BookieId bookieNotInEnsemble =
getBookieNotInEnsemble(newLedgerMetadata);
ServerConfiguration bkConf = getBkConf(bookieNotInEnsemble);
+
+ @Cleanup
+ final MetadataBookieDriver metadataDriver =
instantiateMetadataDriver(bkConf);
+ @Cleanup
+ final LedgerManagerFactory lmf =
metadataDriver.getLedgerManagerFactory();
+ @Cleanup
+ final LedgerUnderreplicationManager lum =
lmf.newLedgerUnderreplicationManager();
+
+ Assert.assertFalse(lum.isLedgerBeingReplicated(lh.getId()));
+
bkConf.setGcOverreplicatedLedgerWaitTime(10, TimeUnit.MILLISECONDS);
lh.close();
@@ -105,9 +122,24 @@ public class GcOverreplicatedLedgerTest extends
LedgerManagerTestCase {
}
});
+ Assert.assertFalse(lum.isLedgerBeingReplicated(lh.getId()));
Assert.assertFalse(activeLedgers.containsKey(lh.getId()));
}
+ private static MetadataBookieDriver
instantiateMetadataDriver(ServerConfiguration conf)
+ throws BookieException {
+ try {
+ final String metadataServiceUriStr = conf.getMetadataServiceUri();
+ final MetadataBookieDriver driver =
MetadataDrivers.getBookieDriver(URI.create(metadataServiceUriStr));
+ driver.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ return driver;
+ } catch (MetadataException me) {
+ throw new BookieException.MetadataStoreException("Failed to
initialize metadata bookie driver", me);
+ } catch (ConfigurationException e) {
+ throw new BookieException.BookieIllegalOpException(e);
+ }
+ }
+
@Test
public void testNoGcOfLedger() throws Exception {
LedgerHandle lh = bkc.createLedger(2, 2, DigestType.MAC,
"".getBytes());