This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 86e71c4bc42316da49aeb8527210d76ba45a8665
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Thu Feb 9 12:22:30 2023 +0900

    Fix issue where checkAllLedgers could get stuck when read throttling is 
enabled (#3655)
    
    Some time ago, I fixed a bug where check of all ledgers periodically run by 
the auditor got stuck (cf. https://github.com/apache/bookkeeper/pull/3214). 
That PR was merged so we cherry-picked it and released it, but 
`checkAllLedgers` still got stuck when the 
`inFlightReadEntryNumInLedgerChecker` value was very small.
    
    The cause is that multiple `BookKeeperClientWorker-OrderedExecutor` threads 
are blocked waiting for the semaphore to be released. This semaphore is 
released by the `BookKeeperClientWorker-OrderedExecutor` threads, so they 
should not run the `LedgerChecker#checkLedger` method.
    ```
    "BookKeeperClientWorker-OrderedExecutor-1-0" #60 prio=5 os_prio=0 
tid=0x00007f680401d800 nid=0x75e8 waiting on condition [0x00007f684a6e9000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x00000000a2121780> (a 
java.util.concurrent.Semaphore$NonfairSync)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
            at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
            at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
            at java.util.concurrent.Semaphore.acquire(Semaphore.java:467)
            at 
org.apache.bookkeeper.client.LedgerChecker.acquirePermit(LedgerChecker.java:163)
            at 
org.apache.bookkeeper.client.LedgerChecker.checkLedger(LedgerChecker.java:431)
            at 
org.apache.bookkeeper.replication.Auditor.lambda$null$4(Auditor.java:1216)
            at 
org.apache.bookkeeper.replication.Auditor$$Lambda$137/1553334572.openComplete(Unknown
 Source)
            at 
org.apache.bookkeeper.client.LedgerOpenOp.openComplete(LedgerOpenOp.java:232)
            at 
org.apache.bookkeeper.client.LedgerOpenOp$2.readLastConfirmedComplete(LedgerOpenOp.java:218)
            at 
org.apache.bookkeeper.client.LedgerHandle$14.getLacComplete(LedgerHandle.java:1718)
            at 
org.apache.bookkeeper.client.PendingReadLacOp.readLacComplete(PendingReadLacOp.java:154)
            at 
org.apache.bookkeeper.proto.PerChannelBookieClient$ReadLacCompletion$1.readLacComplete(PerChannelBookieClient.java:1801)
            at 
org.apache.bookkeeper.proto.PerChannelBookieClient$ReadLacCompletion.handleV3Response(PerChannelBookieClient.java:1840)
            at 
org.apache.bookkeeper.proto.PerChannelBookieClient$3.safeRun(PerChannelBookieClient.java:1473)
            at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:750)
    ```
    
    In the `Auditor` class, have the `LedgerChecker#checkLedger` method run in 
a different thread from the `BookKeeperClientWorker-OrderedExecutor` threads.
    
    Master Issue: https://github.com/apache/bookkeeper/issues/3070
    
    (cherry picked from commit 4ca4b4e4845ec1a3d5b17c96c99e97a62b5ec6c7)
---
 .../org/apache/bookkeeper/replication/Auditor.java | 39 ++++++++++++++++------
 1 file changed, 29 insertions(+), 10 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 6492027853..1bbae78bdc 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -65,6 +65,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -143,6 +144,7 @@ public class Auditor implements AutoCloseable {
     private LedgerManager ledgerManager;
     private LedgerUnderreplicationManager ledgerUnderreplicationManager;
     private final ScheduledExecutorService executor;
+    private final ExecutorService ledgerCheckerExecutor;
     private List<String> knownBookies = new ArrayList<String>();
     private final String bookieIdentifier;
     private volatile Future<?> auditTask;
@@ -473,6 +475,14 @@ public class Auditor implements AutoCloseable {
                     return t;
                 }
             });
+        ledgerCheckerExecutor = Executors.newSingleThreadExecutor(new 
ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread t = new Thread(r, "AuditorBookie-LedgerChecker-" + 
bookieIdentifier);
+                t.setDaemon(true);
+                return t;
+            }
+        });
     }
 
     private void initialize(ServerConfiguration conf, BookKeeper bkc)
@@ -1323,16 +1333,20 @@ public class Auditor implements AutoCloseable {
                 localAdmin.asyncOpenLedgerNoRecovery(ledgerId, (rc, lh, ctx) 
-> {
                     openLedgerNoRecoverySemaphore.release();
                     if (Code.OK == rc) {
-                        checker.checkLedger(lh,
-                                // the ledger handle will be closed after 
checkLedger is done.
-                                new ProcessLostFragmentsCb(lh, callback),
-                                conf.getAuditorLedgerVerificationPercentage());
-                        // we collect the following stats to get a measure of 
the
-                        // distribution of a single ledger within the bk 
cluster
-                        // the higher the number of fragments/bookies, the 
more distributed it is
-                        
numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
-                        
numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
-                        numLedgersChecked.inc();
+                        // BookKeeperClientWorker-OrderedExecutor threads 
should not execute LedgerChecker#checkLedger
+                        // as this can lead to deadlocks
+                        ledgerCheckerExecutor.execute(() -> {
+                            checker.checkLedger(lh,
+                                    // the ledger handle will be closed after 
checkLedger is done.
+                                    new ProcessLostFragmentsCb(lh, callback),
+                                    
conf.getAuditorLedgerVerificationPercentage());
+                            // we collect the following stats to get a measure 
of the
+                            // distribution of a single ledger within the bk 
cluster
+                            // the higher the number of fragments/bookies, the 
more distributed it is
+                            
numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
+                            
numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
+                            numLedgersChecked.inc();
+                        });
                     } else if 
(Code.NoSuchLedgerExistsOnMetadataServerException == rc) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Ledger {} was deleted before we could 
check it", ledgerId);
@@ -2069,11 +2083,16 @@ public class Auditor implements AutoCloseable {
     public void shutdown() {
         LOG.info("Shutting down auditor");
         executor.shutdown();
+        ledgerCheckerExecutor.shutdown();
         try {
             while (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                 LOG.warn("Executor not shutting down, interrupting");
                 executor.shutdownNow();
             }
+            while (!ledgerCheckerExecutor.awaitTermination(30, 
TimeUnit.SECONDS)) {
+                LOG.warn("Executor for ledger checker not shutting down, 
interrupting");
+                ledgerCheckerExecutor.shutdownNow();
+            }
             if (ownAdmin) {
                 admin.close();
             }

Reply via email to