This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 577121fd5d02613d72f2e4e6ece2ef2b66490706 Author: Matteo Merli <[email protected]> AuthorDate: Mon Oct 18 08:46:07 2021 -0700 Eliminate direct ZK access in ScanAndCompareGarbageCollector (#2833) * Eliminate direct ZK access in ScanAndCompareGarbageCollector * Removed unused imports * Fixed zk ACLs * Addressed comments * Fixed checkstyle --- .../bookie/ScanAndCompareGarbageCollector.java | 82 +++++++++++++--------- .../meta/LedgerUnderreplicationManager.java | 6 ++ .../meta/ZkLedgerUnderreplicationManager.java | 27 ++++--- 3 files changed, 69 insertions(+), 46 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java index cff0250..82f8924 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java @@ -25,6 +25,7 @@ import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import com.google.common.collect.Sets; import java.io.IOException; +import java.net.URI; import java.util.List; import java.util.NavigableSet; import java.util.Set; @@ -36,22 +37,22 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManager.LedgerRange; import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; -import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; -import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; +import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.MetadataBookieDriver; +import org.apache.bookkeeper.meta.MetadataDrivers; +import org.apache.bookkeeper.meta.exceptions.MetadataException; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.versioning.Versioned; -import org.apache.bookkeeper.zookeeper.ZooKeeperClient; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.ACL; +import org.apache.commons.configuration.ConfigurationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,26 +69,22 @@ import org.slf4j.LoggerFactory; * <b>globalActiveLedgers</b>, do garbage collection on them. * </ul> * </p> - * - * <p>TODO: eliminate the direct usage of zookeeper here {@link https://github.com/apache/bookkeeper/issues/1331} */ public class ScanAndCompareGarbageCollector implements GarbageCollector { static final Logger LOG = LoggerFactory.getLogger(ScanAndCompareGarbageCollector.class); - static final int MAX_CONCURRENT_ZK_REQUESTS = 1000; + static final int MAX_CONCURRENT_METADATA_REQUESTS = 1000; private final LedgerManager ledgerManager; private final CompactableLedgerStorage ledgerStorage; private final ServerConfiguration conf; private final BookieId selfBookieAddress; - private ZooKeeper zk = null; private boolean enableGcOverReplicatedLedger; private final long gcOverReplicatedLedgerIntervalMillis; private long lastOverReplicatedLedgerGcTimeMillis; - private final String zkServers; - private final String zkLedgersRootPath; private final boolean verifyMetadataOnGc; private int activeLedgerCounter; + private StatsLogger statsLogger; public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, CompactableLedgerStorage ledgerStorage, ServerConfiguration conf, StatsLogger statsLogger) throws IOException { @@ -95,13 +92,13 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector { this.ledgerStorage = ledgerStorage; this.conf = conf; this.selfBookieAddress = Bookie.getBookieId(conf); + this.statsLogger = statsLogger; + this.gcOverReplicatedLedgerIntervalMillis = conf.getGcOverreplicatedLedgerWaitTimeMillis(); this.lastOverReplicatedLedgerGcTimeMillis = System.currentTimeMillis(); if (gcOverReplicatedLedgerIntervalMillis > 0) { this.enableGcOverReplicatedLedger = true; } - this.zkServers = ZKMetadataDriverBase.resolveZkServers(conf); - this.zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf); LOG.info("Over Replicated Ledger Deletion : enabled=" + enableGcOverReplicatedLedger + ", interval=" + gcOverReplicatedLedgerIntervalMillis); @@ -132,8 +129,6 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector { boolean checkOverreplicatedLedgers = (enableGcOverReplicatedLedger && curTime - lastOverReplicatedLedgerGcTimeMillis > gcOverReplicatedLedgerIntervalMillis); if (checkOverreplicatedLedgers) { - zk = ZooKeeperClient.newBuilder().connectString(zkServers) - .sessionTimeoutMs(conf.getZkTimeout()).build(); // remove all the overreplicated ledgers from the local bookie Set<Long> overReplicatedLedgers = removeOverReplicatedledgers(bkActiveLedgers, garbageCleaner); if (overReplicatedLedgers.isEmpty()) { @@ -215,37 +210,36 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector { } catch (Throwable t) { // ignore exception, collecting garbage next time LOG.warn("Exception when iterating over the metadata", t); - } finally { - if (zk != null) { - try { - zk.close(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Error closing zk session", e); - } - zk = null; - } } } private Set<Long> removeOverReplicatedledgers(Set<Long> bkActiveledgers, final GarbageCleaner garbageCleaner) - throws InterruptedException, KeeperException { - final List<ACL> zkAcls = ZkUtils.getACLs(conf); + throws Exception { final Set<Long> overReplicatedLedgers = Sets.newHashSet(); - final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_ZK_REQUESTS); + final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_METADATA_REQUESTS); final CountDownLatch latch = new CountDownLatch(bkActiveledgers.size()); + // instantiate zookeeper client to initialize ledger manager + + @Cleanup + MetadataBookieDriver metadataDriver = instantiateMetadataDriver(conf, statsLogger); + + @Cleanup + LedgerManagerFactory lmf = metadataDriver.getLedgerManagerFactory(); + + @Cleanup + LedgerUnderreplicationManager lum = lmf.newLedgerUnderreplicationManager(); + for (final Long ledgerId : bkActiveledgers) { try { // check if the ledger is being replicated already by the replication worker - if (ZkLedgerUnderreplicationManager.isLedgerBeingReplicated(zk, zkLedgersRootPath, ledgerId)) { + if (lum.isLedgerBeingReplicated(ledgerId)) { latch.countDown(); continue; } // we try to acquire the underreplicated ledger lock to not let the bookie replicate the ledger that is // already being checked for deletion, since that might change the ledger ensemble to include the // current bookie again and, in that case, we cannot remove the ledger from local storage - ZkLedgerUnderreplicationManager.acquireUnderreplicatedLedgerLock(zk, zkLedgersRootPath, ledgerId, - zkAcls); + lum.acquireUnderreplicatedLedger(ledgerId); semaphore.acquire(); ledgerManager.readLedgerMetadata(ledgerId) .whenComplete((metadata, exception) -> { @@ -273,8 +267,7 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector { semaphore.release(); latch.countDown(); try { - ZkLedgerUnderreplicationManager.releaseUnderreplicatedLedgerLock( - zk, zkLedgersRootPath, ledgerId); + lum.releaseUnderreplicatedLedger(ledgerId); } catch (Throwable t) { LOG.error("Exception when removing underreplicated lock for ledger {}", ledgerId, t); @@ -290,4 +283,23 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector { bkActiveledgers.removeAll(overReplicatedLedgers); return overReplicatedLedgers; } + + private static MetadataBookieDriver instantiateMetadataDriver(ServerConfiguration conf, StatsLogger statsLogger) + throws BookieException { + try { + String metadataServiceUriStr = conf.getMetadataServiceUri(); + MetadataBookieDriver driver = MetadataDrivers.getBookieDriver(URI.create(metadataServiceUriStr)); + driver.initialize( + conf, + () -> { + }, + statsLogger); + return driver; + } catch (MetadataException me) { + throw new BookieException.MetadataStoreException("Failed to initialize metadata bookie driver", me); + } catch (ConfigurationException e) { + throw new BookieException.BookieIllegalOpException(e); + } + } + } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java index 0718fb5..ac468d3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java @@ -45,6 +45,11 @@ public interface LedgerUnderreplicationManager extends AutoCloseable { } /** + * Check whether the ledger is being replicated by any bookie. + */ + boolean isLedgerBeingReplicated(long ledgerId) throws ReplicationException; + + /** * Mark a ledger as underreplicated with missing bookies. The replication should then * check which fragements are underreplicated and rereplicate them. * @@ -105,6 +110,7 @@ public interface LedgerUnderreplicationManager extends AutoCloseable { long pollLedgerToRereplicate() throws ReplicationException.UnavailableException; + void acquireUnderreplicatedLedger(long ledgerId) throws ReplicationException; /** * Release a previously acquired ledger. This allows others to acquire the ledger. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java index 266570f..680264a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java @@ -770,10 +770,13 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa /** * Check whether the ledger is being replicated by any bookie. */ - public static boolean isLedgerBeingReplicated(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId) - throws KeeperException, - InterruptedException { - return zkc.exists(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), false) != null; + @Override + public boolean isLedgerBeingReplicated(long ledgerId) throws ReplicationException { + try { + return zkc.exists(getUrLedgerLockZnode(urLockPath, ledgerId), false) != null; + } catch (Exception e) { + throw new ReplicationException.UnavailableException("Failed to check ledger lock", e); + } } /** @@ -786,13 +789,15 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa LOCK_DATA, zkAcls, CreateMode.EPHEMERAL); } - /** - * Release the underreplicated ledger lock if it exists. - */ - public static void releaseUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId) - throws InterruptedException, KeeperException { - if (isLedgerBeingReplicated(zkc, zkLedgersRootPath, ledgerId)) { - zkc.delete(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), -1); + @Override + public void acquireUnderreplicatedLedger(long ledgerId) + throws ReplicationException { + try { + acquireUnderreplicatedLedgerLock(zkc, getUrLedgerLockZnode(urLockPath, ledgerId), ledgerId, + ZkUtils.getACLs(conf)); + } catch (Exception e) { + throw new ReplicationException.UnavailableException( + "Failed to acquire underreplicated ledger lock for " + ledgerId, e); } }
