This is an automated email from the ASF dual-hosted git repository. heesung pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 750547b3e61 [fix][ml] Make mlOwnershipChecker asynchronous so that it doesn't block/deadlock threads (#21333) 750547b3e61 is described below commit 750547b3e611d4203172f1361dd7611fbcf55e0e Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Wed Oct 11 10:13:26 2023 +0300 [fix][ml] Make mlOwnershipChecker asynchronous so that it doesn't block/deadlock threads (#21333) (cherry picked from commit eb9fa63d6bcaa4e1cbc4b87e36ead5a3ff6c44ae) --- .../bookkeeper/mledger/ManagedLedgerFactory.java | 2 +- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 57 ++++++++++++++-------- .../mledger/impl/ManagedLedgerFactoryImpl.java | 2 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 +- .../mledger/impl/ShadowManagedLedgerImpl.java | 3 +- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +- .../pulsar/broker/service/BrokerService.java | 19 +++++--- .../pendingack/impl/MLPendingAckStoreProvider.java | 2 +- .../pulsar/broker/admin/TopicPoliciesTest.java | 2 +- .../OwnerShipCacheForCurrentServerTest.java | 2 +- 10 files changed, 57 insertions(+), 38 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java index b1427bab80b..e09fd84ea55 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java @@ -90,7 +90,7 @@ public interface ManagedLedgerFactory { * opaque context */ void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback callback, - Supplier<Boolean> mlOwnershipChecker, Object ctx); + Supplier<CompletableFuture<Boolean>> mlOwnershipChecker, Object ctx); /** * Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 2abf6ca9d17..8db8a571439 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2682,32 +2682,47 @@ public class ManagedCursorImpl implements ManagedCursor { } @Override - public void operationFailed(MetaStoreException e) { - if (e instanceof MetaStoreException.BadVersionException) { + public void operationFailed(MetaStoreException topLevelException) { + if (topLevelException instanceof MetaStoreException.BadVersionException) { log.warn("[{}] Failed to update cursor metadata for {} due to version conflict {}", - ledger.name, name, e.getMessage()); + ledger.name, name, topLevelException.getMessage()); // it means previous owner of the ml might have updated the version incorrectly. So, check // the ownership and refresh the version again. - if (ledger.mlOwnershipChecker != null && ledger.mlOwnershipChecker.get()) { - ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, - new MetaStoreCallback<ManagedCursorInfo>() { - @Override - public void operationComplete(ManagedCursorInfo info, Stat stat) { - updateCursorLedgerStat(info, stat); - } - - @Override - public void operationFailed(MetaStoreException e) { - if (log.isDebugEnabled()) { - log.debug( - "[{}] Failed to refresh cursor metadata-version for {} due " - + "to {}", ledger.name, name, e.getMessage()); - } - } - }); + if (ledger.mlOwnershipChecker != null) { + ledger.mlOwnershipChecker.get().whenComplete((hasOwnership, t) -> { + if (t == null && hasOwnership) { + ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, + new MetaStoreCallback<>() { + @Override + public void operationComplete(ManagedCursorInfo info, Stat stat) { + updateCursorLedgerStat(info, stat); + // fail the top level call so that the caller can retry + callback.operationFailed(topLevelException); + } + + @Override + public void operationFailed(MetaStoreException e) { + if (log.isDebugEnabled()) { + log.debug( + "[{}] Failed to refresh cursor metadata-version " + + "for {} due to {}", ledger.name, name, + e.getMessage()); + } + // fail the top level call so that the caller can retry + callback.operationFailed(topLevelException); + } + }); + } else { + // fail the top level call so that the caller can retry + callback.operationFailed(topLevelException); + } + }); + } else { + callback.operationFailed(topLevelException); } + } else { + callback.operationFailed(topLevelException); } - callback.operationFailed(e); } }); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 1bb23912b5e..42a5216b3ce 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -330,7 +330,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { @Override public void asyncOpen(final String name, final ManagedLedgerConfig config, final OpenLedgerCallback callback, - Supplier<Boolean> mlOwnershipChecker, final Object ctx) { + Supplier<CompletableFuture<Boolean>> mlOwnershipChecker, final Object ctx) { if (closed) { callback.openLedgerFailed(new ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx); return; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index c501ef0bfe9..72e0b2c9d06 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -231,7 +231,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private static final Random random = new Random(System.currentTimeMillis()); private long maximumRolloverTimeMs; - protected final Supplier<Boolean> mlOwnershipChecker; + protected final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker; volatile PositionImpl lastConfirmedEntry; @@ -335,7 +335,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, - final String name, final Supplier<Boolean> mlOwnershipChecker) { + final String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) { this.factory = factory; this.bookKeeper = bookKeeper; this.config = config; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index b33dd87543f..8b2742d9587 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.AsyncCallback; @@ -50,7 +51,7 @@ public class ShadowManagedLedgerImpl extends ManagedLedgerImpl { public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, - String name, final Supplier<Boolean> mlOwnershipChecker) { + String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) { super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker); this.sourceMLName = config.getShadowSourceName(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 6c4f21c3af2..34f65dfd00e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3399,7 +3399,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { @Override public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { } - }, checkOwnershipFlag ? () -> true : null, null); + }, checkOwnershipFlag ? () -> CompletableFuture.completedFuture(true) : null, null); latch.await(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9c67085f375..e08417a6498 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1797,7 +1797,7 @@ public class BrokerService implements Closeable { topicFuture.completeExceptionally(new PersistenceException(exception)); } } - }, () -> isTopicNsOwnedByBroker(topicName), null); + }, () -> isTopicNsOwnedByBrokerAsync(topicName), null); }).exceptionally((exception) -> { log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception); @@ -2189,13 +2189,16 @@ public class BrokerService implements Closeable { }); } - public boolean isTopicNsOwnedByBroker(TopicName topicName) { - try { - return pulsar.getNamespaceService().isServiceUnitOwned(topicName); - } catch (Exception e) { - log.warn("Failed to check the ownership of the topic: {}, {}", topicName, e.getMessage()); - } - return false; + public CompletableFuture<Boolean> isTopicNsOwnedByBrokerAsync(TopicName topicName) { + return pulsar.getNamespaceService().isServiceUnitOwnedAsync(topicName) + .handle((hasOwnership, t) -> { + if (t == null) { + return hasOwnership; + } else { + log.warn("Failed to check the ownership of the topic: {}, {}", topicName, t.getMessage()); + return false; + } + }); } public CompletableFuture<Void> checkTopicNsOwnership(final String topic) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java index ecc6599ce52..5308648b80c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java @@ -159,7 +159,7 @@ public class MLPendingAckStoreProvider implements TransactionPendingAckStoreProv , originPersistentTopic.getName(), subscription.getName(), exception); pendingAckStoreFuture.completeExceptionally(exception); } - }, () -> true, null); + }, () -> CompletableFuture.completedFuture(true), null); }).exceptionally(e -> { Throwable t = FutureUtil.unwrapCompletionException(e); log.error("[{}] [{}] Failed to get managedLedger config when init pending ack store!", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index fe0901bf2d5..c5111b47d20 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -183,7 +183,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { //load the nameserver, but topic is not init. log.info("lookup:{}",admin.lookups().lookupTopic(topic)); - assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBroker(topicName)); + assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBrokerAsync(topicName).join()); assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic)); //make sure namespace policy reader is fully started. Awaitility.await().untilAsserted(()-> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java index e53d5c25bd2..22fa2c32b56 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java @@ -76,7 +76,7 @@ public class OwnerShipCacheForCurrentServerTest extends OwnerShipForCurrentServe int verifiedBrokerNum = 0; for (PulsarService pulsarService : this.getPulsarServiceList()) { BrokerService bs = pulsarService.getBrokerService(); - if (bs.isTopicNsOwnedByBroker(TopicName.get(topicName))) { + if (bs.isTopicNsOwnedByBrokerAsync(TopicName.get(topicName)).join()) { continue; } verifiedBrokerNum ++;