This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 825fdd4222dd65ef3099f1a975a1555226297379 Author: Sijie Guo <si...@apache.org> AuthorDate: Thu Jul 16 06:20:16 2020 -0700 [Broker] Timeout opening managed ledger operation … (#7506) *Motivation* Currently, broker has a timeout mechanism on loading topics. However, the underlying managed ledger library doesn't provide a timeout mechanism. This will get into a situation that a TopicLoad operation times out after 30 seconds. But the CompletableFuture of opening a managed ledger is still kept in the cache of managed ledger factory. The completable future will never return. So any sub-sequent topic lookups will fail because any attempts to load a topic will never attempt to re-open a managed ledger. *Modification* Introduce a timeout mechanism in the managed ledger factory. If a managed ledger is not open within a given timeout period, the CompletableFuture will be removed. This allows any subsequent attempts to load topics that can try to open the managed ledger again. *Tests* This problem can be constantly reproduced in a chaos test in Kubernetes by killing k8s worker nodes. It can cause producer stuck forever until the owner broker pod is restarted. The change has been verified in a chaos testing environment. (cherry picked from commit 14e3b7ae05e84ca13eefa16026288a384a961e45) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 16 +++-- .../mledger/impl/ManagedLedgerFactoryImpl.java | 70 ++++++++++++++++++---- 2 files changed, 72 insertions(+), 14 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 3e0d583..26f1bf3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -64,6 +64,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; +import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; @@ -323,9 +324,9 @@ public class ManagedCursorImpl implements ManagedCursor { // a new ledger and write the position into it ledger.mbean.startCursorLedgerOpenOp(); long ledgerId = info.getCursorsLedgerId(); - bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), (rc, lh, ctx) -> { - if (log.isDebugEnabled()) { - log.debug("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc); + OpenCallback openCallback = (rc, lh, ctx) -> { + if (log.isInfoEnabled()) { + log.info("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc); } if (isBkErrorNotRecoverable(rc)) { log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, @@ -399,7 +400,14 @@ public class ManagedCursorImpl implements ManagedCursor { recoveredCursor(position, recoveredProperties, lh); callback.operationComplete(); }, null); - }, null); + }; + try { + bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), openCallback, null); + } catch (Throwable t) { + log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}", + ledger.getName(), ledgerId, name, t); + openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, null); + } } private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 517c62c..8b470c2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -50,6 +50,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; @@ -101,6 +102,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { protected final ManagedLedgerFactoryMBeanImpl mbean; protected final ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap<String, PendingInitializeManagedLedger> pendingInitializeLedgers = + new ConcurrentHashMap<>(); private final EntryCacheManager entryCacheManager; private long lastStatTimestamp = System.nanoTime(); @@ -111,6 +114,18 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private static final int StatsPeriodSeconds = 60; + private static class PendingInitializeManagedLedger { + + private final ManagedLedgerImpl ledger; + private final long createTimeMs; + + PendingInitializeManagedLedger(ManagedLedgerImpl ledger) { + this.ledger = ledger; + this.createTimeMs = System.currentTimeMillis(); + } + + } + public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration, String zkConnection) throws Exception { this(bkClientConfiguration, zkConnection, new ManagedLedgerFactoryConfig()); } @@ -320,18 +335,32 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { // If the ledger state is bad, remove it from the map. CompletableFuture<ManagedLedgerImpl> existingFuture = ledgers.get(name); - if (existingFuture != null && existingFuture.isDone()) { - try { - ManagedLedgerImpl l = existingFuture.get(); - if (l.getState().equals(State.Fenced.toString()) || l.getState().equals(State.Closed.toString())) { - // Managed ledger is in unusable state. Recreate it. - log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it", name, + if (existingFuture != null) { + if (existingFuture.isDone()) { + try { + ManagedLedgerImpl l = existingFuture.get(); + if (l.getState().equals(State.Fenced.toString()) || l.getState().equals(State.Closed.toString())) { + // Managed ledger is in unusable state. Recreate it. + log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it", name, l.getState()); - ledgers.remove(name, existingFuture); + ledgers.remove(name, existingFuture); + } + } catch (Exception e) { + // Unable to get the future + log.warn("[{}] Got exception while trying to retrieve ledger", name, e); } - } catch (Exception e) { - // Unable to get the future - log.warn("[{}] Got exception while trying to retrieve ledger", name, e); + } else { + PendingInitializeManagedLedger pendingLedger = pendingInitializeLedgers.get(name); + if (null != pendingLedger) { + long pendingMs = System.currentTimeMillis() - pendingLedger.createTimeMs; + if (pendingMs > TimeUnit.SECONDS.toMillis(config.getMetadataOperationsTimeoutSeconds())) { + log.warn("[{}] Managed ledger has been pending in initialize state more than {} milliseconds," + + " remove it from cache to retry ...", name, pendingMs); + ledgers.remove(name, existingFuture); + pendingInitializeLedgers.remove(name, pendingLedger); + } + } + } } @@ -345,16 +374,37 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { config.getBookKeeperEnsemblePlacementPolicyProperties())), store, config, scheduledExecutor, orderedExecutor, name, mlOwnershipChecker); + PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger); + pendingInitializeLedgers.put(name, pendingLedger); newledger.initialize(new ManagedLedgerInitializeLedgerCallback() { @Override public void initializeComplete() { + log.info("[{}] Successfully initialize managed ledger", name); + pendingInitializeLedgers.remove(name, pendingLedger); future.complete(newledger); } @Override public void initializeFailed(ManagedLedgerException e) { + log.error("[{}] Failed to initialize managed ledger: {}", name, e.getMessage()); + // Clean the map if initialization fails ledgers.remove(name, future); + + if (pendingInitializeLedgers.remove(name, pendingLedger)) { + pendingLedger.ledger.asyncClose(new CloseCallback() { + @Override + public void closeComplete(Object ctx) { + // no-op + } + + @Override + public void closeFailed(ManagedLedgerException exception, Object ctx) { + log.warn("[{}] Failed to a pending initialization managed ledger", name, exception); + } + }, null); + } + future.completeExceptionally(e); } }, null);