This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 86e71c4bc42316da49aeb8527210d76ba45a8665 Author: Masahiro Sakamoto <[email protected]> AuthorDate: Thu Feb 9 12:22:30 2023 +0900 Fix issue where checkAllLedgers could get stuck when read throttling is enabled (#3655) Some time ago, I fixed a bug where check of all ledgers periodically run by the auditor got stuck (cf. https://github.com/apache/bookkeeper/pull/3214). That PR was merged so we cherry-picked it and released it, but `checkAllLedgers` still got stuck when the `inFlightReadEntryNumInLedgerChecker` value was very small. The cause is that multiple `BookKeeperClientWorker-OrderedExecutor` threads are blocked waiting for the semaphore to be released. This semaphore is released by the `BookKeeperClientWorker-OrderedExecutor` threads, so they should not run the `LedgerChecker#checkLedger` method. ``` "BookKeeperClientWorker-OrderedExecutor-1-0" #60 prio=5 os_prio=0 tid=0x00007f680401d800 nid=0x75e8 waiting on condition [0x00007f684a6e9000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000a2121780> (a java.util.concurrent.Semaphore$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.Semaphore.acquire(Semaphore.java:467) at org.apache.bookkeeper.client.LedgerChecker.acquirePermit(LedgerChecker.java:163) at org.apache.bookkeeper.client.LedgerChecker.checkLedger(LedgerChecker.java:431) at org.apache.bookkeeper.replication.Auditor.lambda$null$4(Auditor.java:1216) at org.apache.bookkeeper.replication.Auditor$$Lambda$137/1553334572.openComplete(Unknown Source) at org.apache.bookkeeper.client.LedgerOpenOp.openComplete(LedgerOpenOp.java:232) at org.apache.bookkeeper.client.LedgerOpenOp$2.readLastConfirmedComplete(LedgerOpenOp.java:218) at org.apache.bookkeeper.client.LedgerHandle$14.getLacComplete(LedgerHandle.java:1718) at org.apache.bookkeeper.client.PendingReadLacOp.readLacComplete(PendingReadLacOp.java:154) at org.apache.bookkeeper.proto.PerChannelBookieClient$ReadLacCompletion$1.readLacComplete(PerChannelBookieClient.java:1801) at org.apache.bookkeeper.proto.PerChannelBookieClient$ReadLacCompletion.handleV3Response(PerChannelBookieClient.java:1840) at org.apache.bookkeeper.proto.PerChannelBookieClient$3.safeRun(PerChannelBookieClient.java:1473) at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:750) ``` In the `Auditor` class, have the `LedgerChecker#checkLedger` method run in a different thread from the `BookKeeperClientWorker-OrderedExecutor` threads. Master Issue: https://github.com/apache/bookkeeper/issues/3070 (cherry picked from commit 4ca4b4e4845ec1a3d5b17c96c99e97a62b5ec6c7) --- .../org/apache/bookkeeper/replication/Auditor.java | 39 ++++++++++++++++------ 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index 6492027853..1bbae78bdc 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -65,6 +65,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -143,6 +144,7 @@ public class Auditor implements AutoCloseable { private LedgerManager ledgerManager; private LedgerUnderreplicationManager ledgerUnderreplicationManager; private final ScheduledExecutorService executor; + private final ExecutorService ledgerCheckerExecutor; private List<String> knownBookies = new ArrayList<String>(); private final String bookieIdentifier; private volatile Future<?> auditTask; @@ -473,6 +475,14 @@ public class Auditor implements AutoCloseable { return t; } }); + ledgerCheckerExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "AuditorBookie-LedgerChecker-" + bookieIdentifier); + t.setDaemon(true); + return t; + } + }); } private void initialize(ServerConfiguration conf, BookKeeper bkc) @@ -1323,16 +1333,20 @@ public class Auditor implements AutoCloseable { localAdmin.asyncOpenLedgerNoRecovery(ledgerId, (rc, lh, ctx) -> { openLedgerNoRecoverySemaphore.release(); if (Code.OK == rc) { - checker.checkLedger(lh, - // the ledger handle will be closed after checkLedger is done. - new ProcessLostFragmentsCb(lh, callback), - conf.getAuditorLedgerVerificationPercentage()); - // we collect the following stats to get a measure of the - // distribution of a single ledger within the bk cluster - // the higher the number of fragments/bookies, the more distributed it is - numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments()); - numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies()); - numLedgersChecked.inc(); + // BookKeeperClientWorker-OrderedExecutor threads should not execute LedgerChecker#checkLedger + // as this can lead to deadlocks + ledgerCheckerExecutor.execute(() -> { + checker.checkLedger(lh, + // the ledger handle will be closed after checkLedger is done. + new ProcessLostFragmentsCb(lh, callback), + conf.getAuditorLedgerVerificationPercentage()); + // we collect the following stats to get a measure of the + // distribution of a single ledger within the bk cluster + // the higher the number of fragments/bookies, the more distributed it is + numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments()); + numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies()); + numLedgersChecked.inc(); + }); } else if (Code.NoSuchLedgerExistsOnMetadataServerException == rc) { if (LOG.isDebugEnabled()) { LOG.debug("Ledger {} was deleted before we could check it", ledgerId); @@ -2069,11 +2083,16 @@ public class Auditor implements AutoCloseable { public void shutdown() { LOG.info("Shutting down auditor"); executor.shutdown(); + ledgerCheckerExecutor.shutdown(); try { while (!executor.awaitTermination(30, TimeUnit.SECONDS)) { LOG.warn("Executor not shutting down, interrupting"); executor.shutdownNow(); } + while (!ledgerCheckerExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + LOG.warn("Executor for ledger checker not shutting down, interrupting"); + ledgerCheckerExecutor.shutdownNow(); + } if (ownAdmin) { admin.close(); }
