This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 750547b3e61 [fix][ml] Make mlOwnershipChecker asynchronous so that it 
doesn't block/deadlock threads (#21333)
750547b3e61 is described below

commit 750547b3e611d4203172f1361dd7611fbcf55e0e
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Wed Oct 11 10:13:26 2023 +0300

    [fix][ml] Make mlOwnershipChecker asynchronous so that it doesn't 
block/deadlock threads (#21333)
    
    (cherry picked from commit eb9fa63d6bcaa4e1cbc4b87e36ead5a3ff6c44ae)
---
 .../bookkeeper/mledger/ManagedLedgerFactory.java   |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 57 ++++++++++++++--------
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  4 +-
 .../mledger/impl/ShadowManagedLedgerImpl.java      |  3 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  2 +-
 .../pulsar/broker/service/BrokerService.java       | 19 +++++---
 .../pendingack/impl/MLPendingAckStoreProvider.java |  2 +-
 .../pulsar/broker/admin/TopicPoliciesTest.java     |  2 +-
 .../OwnerShipCacheForCurrentServerTest.java        |  2 +-
 10 files changed, 57 insertions(+), 38 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index b1427bab80b..e09fd84ea55 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -90,7 +90,7 @@ public interface ManagedLedgerFactory {
      *            opaque context
      */
     void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback 
callback,
-            Supplier<Boolean> mlOwnershipChecker, Object ctx);
+            Supplier<CompletableFuture<Boolean>> mlOwnershipChecker, Object 
ctx);
 
     /**
      * Open a {@link ReadOnlyCursor} positioned to the earliest entry for the 
specified managed ledger.
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 2abf6ca9d17..8db8a571439 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2682,32 +2682,47 @@ public class ManagedCursorImpl implements ManagedCursor 
{
                     }
 
                     @Override
-                    public void operationFailed(MetaStoreException e) {
-                        if (e instanceof 
MetaStoreException.BadVersionException) {
+                    public void operationFailed(MetaStoreException 
topLevelException) {
+                        if (topLevelException instanceof 
MetaStoreException.BadVersionException) {
                             log.warn("[{}] Failed to update cursor metadata 
for {} due to version conflict {}",
-                                    ledger.name, name, e.getMessage());
+                                    ledger.name, name, 
topLevelException.getMessage());
                             // it means previous owner of the ml might have 
updated the version incorrectly. So, check
                             // the ownership and refresh the version again.
-                            if (ledger.mlOwnershipChecker != null && 
ledger.mlOwnershipChecker.get()) {
-                                
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name,
-                                        new 
MetaStoreCallback<ManagedCursorInfo>() {
-                                            @Override
-                                            public void 
operationComplete(ManagedCursorInfo info, Stat stat) {
-                                                updateCursorLedgerStat(info, 
stat);
-                                            }
-
-                                            @Override
-                                            public void 
operationFailed(MetaStoreException e) {
-                                                if (log.isDebugEnabled()) {
-                                                    log.debug(
-                                                            "[{}] Failed to 
refresh cursor metadata-version for {} due "
-                                                            + "to {}", 
ledger.name, name, e.getMessage());
-                                                }
-                                            }
-                                        });
+                            if (ledger.mlOwnershipChecker != null) {
+                                
ledger.mlOwnershipChecker.get().whenComplete((hasOwnership, t) -> {
+                                    if (t == null && hasOwnership) {
+                                        
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name,
+                                                new MetaStoreCallback<>() {
+                                                    @Override
+                                                    public void 
operationComplete(ManagedCursorInfo info, Stat stat) {
+                                                        
updateCursorLedgerStat(info, stat);
+                                                        // fail the top level 
call so that the caller can retry
+                                                        
callback.operationFailed(topLevelException);
+                                                    }
+
+                                                    @Override
+                                                    public void 
operationFailed(MetaStoreException e) {
+                                                        if 
(log.isDebugEnabled()) {
+                                                            log.debug(
+                                                                    "[{}] 
Failed to refresh cursor metadata-version "
+                                                                            + 
"for {} due to {}", ledger.name, name,
+                                                                    
e.getMessage());
+                                                        }
+                                                        // fail the top level 
call so that the caller can retry
+                                                        
callback.operationFailed(topLevelException);
+                                                    }
+                                                });
+                                    } else {
+                                        // fail the top level call so that the 
caller can retry
+                                        
callback.operationFailed(topLevelException);
+                                    }
+                                });
+                            } else {
+                                callback.operationFailed(topLevelException);
                             }
+                        } else {
+                            callback.operationFailed(topLevelException);
                         }
-                        callback.operationFailed(e);
                     }
                 });
     }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 1bb23912b5e..42a5216b3ce 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -330,7 +330,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
 
     @Override
     public void asyncOpen(final String name, final ManagedLedgerConfig config, 
final OpenLedgerCallback callback,
-            Supplier<Boolean> mlOwnershipChecker, final Object ctx) {
+            Supplier<CompletableFuture<Boolean>> mlOwnershipChecker, final 
Object ctx) {
         if (closed) {
             callback.openLedgerFailed(new 
ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
             return;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c501ef0bfe9..72e0b2c9d06 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -231,7 +231,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     private static final Random random = new 
Random(System.currentTimeMillis());
     private long maximumRolloverTimeMs;
-    protected final Supplier<Boolean> mlOwnershipChecker;
+    protected final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker;
 
     volatile PositionImpl lastConfirmedEntry;
 
@@ -335,7 +335,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     }
     public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper 
bookKeeper, MetaStore store,
             ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
-            final String name, final Supplier<Boolean> mlOwnershipChecker) {
+            final String name, final Supplier<CompletableFuture<Boolean>> 
mlOwnershipChecker) {
         this.factory = factory;
         this.bookKeeper = bookKeeper;
         this.config = config;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index b33dd87543f..8b2742d9587 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.AsyncCallback;
@@ -50,7 +51,7 @@ public class ShadowManagedLedgerImpl extends 
ManagedLedgerImpl {
     public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, 
BookKeeper bookKeeper,
                                    MetaStore store, ManagedLedgerConfig config,
                                    OrderedScheduler scheduledExecutor,
-                                   String name, final Supplier<Boolean> 
mlOwnershipChecker) {
+                                   String name, final 
Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
         super(factory, bookKeeper, store, config, scheduledExecutor, name, 
mlOwnershipChecker);
         this.sourceMLName = config.getShadowSourceName();
     }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 6c4f21c3af2..34f65dfd00e 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3399,7 +3399,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
             @Override
             public void openLedgerFailed(ManagedLedgerException exception, 
Object ctx) {
             }
-        }, checkOwnershipFlag ? () -> true : null, null);
+        }, checkOwnershipFlag ? () -> CompletableFuture.completedFuture(true) 
: null, null);
         latch.await();
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9c67085f375..e08417a6498 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1797,7 +1797,7 @@ public class BrokerService implements Closeable {
                                 topicFuture.completeExceptionally(new 
PersistenceException(exception));
                             }
                         }
-                    }, () -> isTopicNsOwnedByBroker(topicName), null);
+                    }, () -> isTopicNsOwnedByBrokerAsync(topicName), null);
 
         }).exceptionally((exception) -> {
             log.warn("[{}] Failed to get topic configuration: {}", topic, 
exception.getMessage(), exception);
@@ -2189,13 +2189,16 @@ public class BrokerService implements Closeable {
         });
     }
 
-    public boolean isTopicNsOwnedByBroker(TopicName topicName) {
-        try {
-            return pulsar.getNamespaceService().isServiceUnitOwned(topicName);
-        } catch (Exception e) {
-            log.warn("Failed to check the ownership of the topic: {}, {}", 
topicName, e.getMessage());
-        }
-        return false;
+    public CompletableFuture<Boolean> isTopicNsOwnedByBrokerAsync(TopicName 
topicName) {
+        return pulsar.getNamespaceService().isServiceUnitOwnedAsync(topicName)
+                .handle((hasOwnership, t) -> {
+                    if (t == null) {
+                        return hasOwnership;
+                    } else {
+                        log.warn("Failed to check the ownership of the topic: 
{}, {}", topicName, t.getMessage());
+                        return false;
+                    }
+                });
     }
 
     public CompletableFuture<Void> checkTopicNsOwnership(final String topic) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index ecc6599ce52..5308648b80c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -159,7 +159,7 @@ public class MLPendingAckStoreProvider implements 
TransactionPendingAckStoreProv
                                                 , 
originPersistentTopic.getName(), subscription.getName(), exception);
                                         
pendingAckStoreFuture.completeExceptionally(exception);
                                     }
-                                }, () -> true, null);
+                                }, () -> 
CompletableFuture.completedFuture(true), null);
                     }).exceptionally(e -> {
                         Throwable t = FutureUtil.unwrapCompletionException(e);
                         log.error("[{}] [{}] Failed to get managedLedger 
config when init pending ack store!",
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index fe0901bf2d5..c5111b47d20 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -183,7 +183,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
         //load the nameserver, but topic is not init.
         log.info("lookup:{}",admin.lookups().lookupTopic(topic));
-        
assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBroker(topicName));
+        
assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBrokerAsync(topicName).join());
         assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
         //make sure namespace policy reader is fully started.
         Awaitility.await().untilAsserted(()-> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
index e53d5c25bd2..22fa2c32b56 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
@@ -76,7 +76,7 @@ public class OwnerShipCacheForCurrentServerTest extends 
OwnerShipForCurrentServe
         int verifiedBrokerNum = 0;
         for (PulsarService pulsarService : this.getPulsarServiceList()) {
             BrokerService bs = pulsarService.getBrokerService();
-            if (bs.isTopicNsOwnedByBroker(TopicName.get(topicName))) {
+            if 
(bs.isTopicNsOwnedByBrokerAsync(TopicName.get(topicName)).join()) {
                 continue;
             }
             verifiedBrokerNum ++;

Reply via email to