This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 825fdd4222dd65ef3099f1a975a1555226297379
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu Jul 16 06:20:16 2020 -0700

    [Broker] Timeout opening managed ledger operation … (#7506)
    
    *Motivation*
    
    Currently, broker has a timeout mechanism on loading topics. However, the 
underlying managed ledger library
    doesn't provide a timeout mechanism. This will get into a situation that a 
TopicLoad operation times out
    after 30 seconds. But the CompletableFuture of opening a managed ledger is 
still kept in the cache of managed ledger
    factory. The completable future will never return. So any sub-sequent topic 
lookups will fail because any
    attempts to load a topic will never attempt to re-open a managed ledger.
    
    *Modification*
    
    Introduce a timeout mechanism in the managed ledger factory. If a managed 
ledger is not open within a given timeout
    period, the CompletableFuture will be removed. This allows any subsequent 
attempts to load topics that can try to
    open the managed ledger again.
    
    *Tests*
    
    This problem can be constantly reproduced in a chaos test in Kubernetes by 
killing k8s worker nodes. It can cause
    producer stuck forever until the owner broker pod is restarted. The change 
has been verified in a chaos testing environment.
    
    (cherry picked from commit 14e3b7ae05e84ca13eefa16026288a384a961e45)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 16 +++--
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 70 ++++++++++++++++++----
 2 files changed, 72 insertions(+), 14 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 3e0d583..26f1bf3 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -64,6 +64,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -323,9 +324,9 @@ public class ManagedCursorImpl implements ManagedCursor {
         // a new ledger and write the position into it
         ledger.mbean.startCursorLedgerOpenOp();
         long ledgerId = info.getCursorsLedgerId();
-        bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), 
(rc, lh, ctx) -> {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Opened ledger {} for consumer {}. rc={}", 
ledger.getName(), ledgerId, name, rc);
+        OpenCallback openCallback = (rc, lh, ctx) -> {
+            if (log.isInfoEnabled()) {
+                log.info("[{}] Opened ledger {} for consumer {}. rc={}", 
ledger.getName(), ledgerId, name, rc);
             }
             if (isBkErrorNotRecoverable(rc)) {
                 log.error("[{}] Error opening metadata ledger {} for consumer 
{}: {}", ledger.getName(), ledgerId, name,
@@ -399,7 +400,14 @@ public class ManagedCursorImpl implements ManagedCursor {
                 recoveredCursor(position, recoveredProperties, lh);
                 callback.operationComplete();
             }, null);
-        }, null);
+        };
+        try {
+            bookkeeper.asyncOpenLedger(ledgerId, digestType, 
config.getPassword(), openCallback, null);
+        } catch (Throwable t) {
+            log.error("[{}] Encountered error on opening cursor ledger {} for 
cursor {}",
+                ledger.getName(), ledgerId, name, t);
+            
openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, 
null);
+        }
     }
 
     private void 
recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> 
individualDeletedMessagesList) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 517c62c..8b470c2 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -50,6 +50,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
@@ -101,6 +102,8 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     protected final ManagedLedgerFactoryMBeanImpl mbean;
 
     protected final ConcurrentHashMap<String, 
CompletableFuture<ManagedLedgerImpl>> ledgers = new ConcurrentHashMap<>();
+    protected final ConcurrentHashMap<String, PendingInitializeManagedLedger> 
pendingInitializeLedgers =
+        new ConcurrentHashMap<>();
     private final EntryCacheManager entryCacheManager;
 
     private long lastStatTimestamp = System.nanoTime();
@@ -111,6 +114,18 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
 
     private static final int StatsPeriodSeconds = 60;
 
+    private static class PendingInitializeManagedLedger {
+
+        private final ManagedLedgerImpl ledger;
+        private final long createTimeMs;
+
+        PendingInitializeManagedLedger(ManagedLedgerImpl ledger) {
+            this.ledger = ledger;
+            this.createTimeMs = System.currentTimeMillis();
+        }
+
+    }
+
     public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration, 
String zkConnection) throws Exception {
         this(bkClientConfiguration, zkConnection, new 
ManagedLedgerFactoryConfig());
     }
@@ -320,18 +335,32 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
 
         // If the ledger state is bad, remove it from the map.
         CompletableFuture<ManagedLedgerImpl> existingFuture = 
ledgers.get(name);
-        if (existingFuture != null && existingFuture.isDone()) {
-            try {
-                ManagedLedgerImpl l = existingFuture.get();
-                if (l.getState().equals(State.Fenced.toString()) || 
l.getState().equals(State.Closed.toString())) {
-                    // Managed ledger is in unusable state. Recreate it.
-                    log.warn("[{}] Attempted to open ledger in {} state. 
Removing from the map to recreate it", name,
+        if (existingFuture != null) {
+            if (existingFuture.isDone()) {
+                try {
+                    ManagedLedgerImpl l = existingFuture.get();
+                    if (l.getState().equals(State.Fenced.toString()) || 
l.getState().equals(State.Closed.toString())) {
+                        // Managed ledger is in unusable state. Recreate it.
+                        log.warn("[{}] Attempted to open ledger in {} state. 
Removing from the map to recreate it", name,
                             l.getState());
-                    ledgers.remove(name, existingFuture);
+                        ledgers.remove(name, existingFuture);
+                    }
+                } catch (Exception e) {
+                    // Unable to get the future
+                    log.warn("[{}] Got exception while trying to retrieve 
ledger", name, e);
                 }
-            } catch (Exception e) {
-                // Unable to get the future
-                log.warn("[{}] Got exception while trying to retrieve ledger", 
name, e);
+            } else {
+                PendingInitializeManagedLedger pendingLedger = 
pendingInitializeLedgers.get(name);
+                if (null != pendingLedger) {
+                    long pendingMs = System.currentTimeMillis() - 
pendingLedger.createTimeMs;
+                    if (pendingMs > 
TimeUnit.SECONDS.toMillis(config.getMetadataOperationsTimeoutSeconds())) {
+                        log.warn("[{}] Managed ledger has been pending in 
initialize state more than {} milliseconds,"
+                            + " remove it from cache to retry ...", name, 
pendingMs);
+                        ledgers.remove(name, existingFuture);
+                        pendingInitializeLedgers.remove(name, pendingLedger);
+                    }
+                }
+
             }
         }
 
@@ -345,16 +374,37 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                                     
config.getBookKeeperEnsemblePlacementPolicyProperties())),
                     store, config, scheduledExecutor,
                     orderedExecutor, name, mlOwnershipChecker);
+            PendingInitializeManagedLedger pendingLedger = new 
PendingInitializeManagedLedger(newledger);
+            pendingInitializeLedgers.put(name, pendingLedger);
             newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
                 @Override
                 public void initializeComplete() {
+                    log.info("[{}] Successfully initialize managed ledger", 
name);
+                    pendingInitializeLedgers.remove(name, pendingLedger);
                     future.complete(newledger);
                 }
 
                 @Override
                 public void initializeFailed(ManagedLedgerException e) {
+                    log.error("[{}] Failed to initialize managed ledger: {}", 
name, e.getMessage());
+
                     // Clean the map if initialization fails
                     ledgers.remove(name, future);
+
+                    if (pendingInitializeLedgers.remove(name, pendingLedger)) {
+                        pendingLedger.ledger.asyncClose(new CloseCallback() {
+                            @Override
+                            public void closeComplete(Object ctx) {
+                                // no-op
+                            }
+
+                            @Override
+                            public void closeFailed(ManagedLedgerException 
exception, Object ctx) {
+                                log.warn("[{}] Failed to a pending 
initialization managed ledger", name, exception);
+                            }
+                        }, null);
+                    }
+
                     future.completeExceptionally(e);
                 }
             }, null);

Reply via email to