This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d74010c271a [improve] Refactored BK ClientFactory to return futures 
(#22853)
d74010c271a is described below

commit d74010c271abfb0a77a4dacf0ab072a957afeb5a
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Jun 5 17:09:32 2024 -0700

    [improve] Refactored BK ClientFactory to return futures (#22853)
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 223 ++++++++++-----------
 .../mledger/impl/ManagedLedgerOfflineBacklog.java  |  20 +-
 .../pulsar/broker/BookKeeperClientFactory.java     |  19 +-
 .../pulsar/broker/BookKeeperClientFactoryImpl.java |  28 +--
 .../pulsar/broker/ManagedLedgerClientFactory.java  |  39 ++--
 .../bucket/BookkeeperBucketSnapshotStorage.java    |   2 +-
 .../service/schema/BookkeeperSchemaStorage.java    |   2 +-
 .../apache/pulsar/compaction/CompactorTool.java    |   2 +-
 .../broker/MockedBookKeeperClientFactory.java      |  18 +-
 .../testcontext/MockBookKeeperClientFactory.java   |  15 +-
 .../pulsar/compaction/CompactedTopicTest.java      |   6 +-
 .../pulsar/compaction/CompactionRetentionTest.java |   2 +-
 .../apache/pulsar/compaction/CompactionTest.java   |   2 +-
 .../apache/pulsar/compaction/CompactorTest.java    |   2 +-
 .../compaction/ServiceUnitStateCompactionTest.java |   2 +-
 .../compaction/TopicCompactionServiceTest.java     |   2 +-
 16 files changed, 193 insertions(+), 191 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index d867f2f4c02..ed803a81462 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -161,7 +161,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, 
BookKeeper bookKeeper,
                                     ManagedLedgerFactoryConfig config)
             throws Exception {
-        this(metadataStore, (policyConfig) -> bookKeeper, config);
+        this(metadataStore, (policyConfig) -> 
CompletableFuture.completedFuture(bookKeeper), config);
     }
 
     public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
@@ -233,8 +233,8 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         }
 
         @Override
-        public BookKeeper get(EnsemblePlacementPolicyConfig policy) {
-            return bkClient;
+        public CompletableFuture<BookKeeper> get(EnsemblePlacementPolicyConfig 
policy) {
+            return CompletableFuture.completedFuture(bkClient);
         }
     }
 
@@ -378,56 +378,63 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         ledgers.computeIfAbsent(name, (mlName) -> {
             // Create the managed ledger
             CompletableFuture<ManagedLedgerImpl> future = new 
CompletableFuture<>();
-            BookKeeper bk = bookkeeperFactory.get(
-                    new 
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
-                            
config.getBookKeeperEnsemblePlacementPolicyProperties()));
-            final ManagedLedgerImpl newledger = config.getShadowSource() == 
null
-                    ? new ManagedLedgerImpl(this, bk, store, config, 
scheduledExecutor, name, mlOwnershipChecker)
-                    : new ShadowManagedLedgerImpl(this, bk, store, config, 
scheduledExecutor, name,
-                    mlOwnershipChecker);
-            PendingInitializeManagedLedger pendingLedger = new 
PendingInitializeManagedLedger(newledger);
-            pendingInitializeLedgers.put(name, pendingLedger);
-            newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
-                @Override
-                public void initializeComplete() {
-                    log.info("[{}] Successfully initialize managed ledger", 
name);
-                    pendingInitializeLedgers.remove(name, pendingLedger);
-                    future.complete(newledger);
-
-                    // May need to update the cursor position
-                    newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
-                    // May need to trigger offloading
-                    if (config.isTriggerOffloadOnTopicLoad()) {
-                        
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
-                    }
-                }
-
-                @Override
-                public void initializeFailed(ManagedLedgerException e) {
-                    if (config.isCreateIfMissing()) {
-                        log.error("[{}] Failed to initialize managed ledger: 
{}", name, e.getMessage());
-                    }
-
-                    // Clean the map if initialization fails
-                    ledgers.remove(name, future);
-
-                    if (pendingInitializeLedgers.remove(name, pendingLedger)) {
-                        pendingLedger.ledger.asyncClose(new CloseCallback() {
+            bookkeeperFactory.get(
+                            new 
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                                    
config.getBookKeeperEnsemblePlacementPolicyProperties()))
+                    .thenAccept(bk -> {
+                        final ManagedLedgerImpl newledger = 
config.getShadowSource() == null
+                                ? new ManagedLedgerImpl(this, bk, store, 
config, scheduledExecutor, name,
+                                mlOwnershipChecker)
+                                : new ShadowManagedLedgerImpl(this, bk, store, 
config, scheduledExecutor, name,
+                                mlOwnershipChecker);
+                        PendingInitializeManagedLedger pendingLedger = new 
PendingInitializeManagedLedger(newledger);
+                        pendingInitializeLedgers.put(name, pendingLedger);
+                        newledger.initialize(new 
ManagedLedgerInitializeLedgerCallback() {
                             @Override
-                            public void closeComplete(Object ctx) {
-                                // no-op
+                            public void initializeComplete() {
+                                log.info("[{}] Successfully initialize managed 
ledger", name);
+                                pendingInitializeLedgers.remove(name, 
pendingLedger);
+                                future.complete(newledger);
+
+                                // May need to update the cursor position
+                                
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
+                                // May need to trigger offloading
+                                if (config.isTriggerOffloadOnTopicLoad()) {
+                                    
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
+                                }
                             }
 
                             @Override
-                            public void closeFailed(ManagedLedgerException 
exception, Object ctx) {
-                                log.warn("[{}] Failed to a pending 
initialization managed ledger", name, exception);
+                            public void 
initializeFailed(ManagedLedgerException e) {
+                                if (config.isCreateIfMissing()) {
+                                    log.error("[{}] Failed to initialize 
managed ledger: {}", name, e.getMessage());
+                                }
+
+                                // Clean the map if initialization fails
+                                ledgers.remove(name, future);
+
+                                if (pendingInitializeLedgers.remove(name, 
pendingLedger)) {
+                                    pendingLedger.ledger.asyncClose(new 
CloseCallback() {
+                                        @Override
+                                        public void closeComplete(Object ctx) {
+                                            // no-op
+                                        }
+
+                                        @Override
+                                        public void 
closeFailed(ManagedLedgerException exception, Object ctx) {
+                                            log.warn("[{}] Failed to a pending 
initialization managed ledger", name,
+                                                    exception);
+                                        }
+                                    }, null);
+                                }
+
+                                future.completeExceptionally(e);
                             }
                         }, null);
-                    }
-
-                    future.completeExceptionally(e);
-                }
-            }, null);
+                    }).exceptionally(ex -> {
+                        future.completeExceptionally(ex);
+                        return null;
+                    });
             return future;
         }).thenAccept(ml -> callback.openLedgerComplete(ml, 
ctx)).exceptionally(exception -> {
             callback.openLedgerFailed((ManagedLedgerException) 
exception.getCause(), ctx);
@@ -443,20 +450,22 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
             callback.openReadOnlyManagedLedgerFailed(
                     new 
ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
         }
-        ReadOnlyManagedLedgerImpl roManagedLedger = new 
ReadOnlyManagedLedgerImpl(this,
-                bookkeeperFactory
-                        .get(new 
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
-                                
config.getBookKeeperEnsemblePlacementPolicyProperties())),
-                store, config, scheduledExecutor, managedLedgerName);
-        roManagedLedger.initialize().thenRun(() -> {
-            log.info("[{}] Successfully initialize Read-only managed ledger", 
managedLedgerName);
-            callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx);
-
-        }).exceptionally(e -> {
-            log.error("[{}] Failed to initialize Read-only managed ledger", 
managedLedgerName, e);
-            callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) 
e.getCause(), ctx);
-            return null;
-        });
+
+        bookkeeperFactory
+                .get(new 
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                        
config.getBookKeeperEnsemblePlacementPolicyProperties()))
+                .thenCompose(bk -> {
+                    ReadOnlyManagedLedgerImpl roManagedLedger = new 
ReadOnlyManagedLedgerImpl(this, bk,
+                            store, config, scheduledExecutor, 
managedLedgerName);
+                    return roManagedLedger.initialize().thenApply(v -> 
roManagedLedger);
+                }).thenAccept(roManagedLedger -> {
+                    log.info("[{}] Successfully initialize Read-only managed 
ledger", managedLedgerName);
+                    
callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx);
+                }).exceptionally(e -> {
+                    log.error("[{}] Failed to initialize Read-only managed 
ledger", managedLedgerName, e);
+                    
callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) e.getCause(), 
ctx);
+                    return null;
+                });
     }
 
     @Override
@@ -578,49 +587,35 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                 ledgerFuture.completeExceptionally(new 
ManagedLedgerException.ManagedLedgerFactoryClosedException());
             }
         }
-        CompletableFuture<Void> bookkeeperFuture = new CompletableFuture<>();
-        futures.add(bookkeeperFuture);
-        futures.add(CompletableFuture.runAsync(() -> {
-            if (isBookkeeperManaged) {
-                try {
-                    BookKeeper bookkeeper = bookkeeperFactory.get();
-                    if (bookkeeper != null) {
-                        bookkeeper.close();
-                    }
-                    bookkeeperFuture.complete(null);
-                } catch (Throwable throwable) {
-                    bookkeeperFuture.completeExceptionally(throwable);
-                }
-            } else {
-                bookkeeperFuture.complete(null);
-            }
-            if (!ledgers.isEmpty()) {
-                log.info("Force closing {} ledgers.", ledgers.size());
-                //make sure all callbacks is called.
-                ledgers.forEach(((ledgerName, ledgerFuture) -> {
-                    if (!ledgerFuture.isDone()) {
-                        ledgerFuture.completeExceptionally(
-                                new 
ManagedLedgerException.ManagedLedgerFactoryClosedException());
-                    } else {
-                        ManagedLedgerImpl managedLedger = 
ledgerFuture.getNow(null);
-                        if (managedLedger == null) {
-                            return;
-                        }
-                        try {
-                            managedLedger.close();
-                        } catch (Throwable throwable) {
-                            log.warn("[{}] Got exception when closing managed 
ledger: {}", managedLedger.getName(),
-                                    throwable);
+        CompletableFuture<BookKeeper> bookkeeperFuture = isBookkeeperManaged
+                ? bookkeeperFactory.get()
+                : CompletableFuture.completedFuture(null);
+        return bookkeeperFuture
+                .thenRun(() -> {
+                    log.info("Closing {} ledgers.", ledgers.size());
+                    //make sure all callbacks is called.
+                    ledgers.forEach(((ledgerName, ledgerFuture) -> {
+                        if (!ledgerFuture.isDone()) {
+                            ledgerFuture.completeExceptionally(
+                                    new 
ManagedLedgerException.ManagedLedgerFactoryClosedException());
+                        } else {
+                            ManagedLedgerImpl managedLedger = 
ledgerFuture.getNow(null);
+                            if (managedLedger == null) {
+                                return;
+                            }
+                            try {
+                                managedLedger.close();
+                            } catch (Throwable throwable) {
+                                log.warn("[{}] Got exception when closing 
managed ledger: {}", managedLedger.getName(),
+                                        throwable);
+                            }
                         }
-                    }
-                }));
-            }
-        }));
-        return FutureUtil.waitForAll(futures).thenAcceptAsync(__ -> {
-            //wait for tasks in scheduledExecutor executed.
-            scheduledExecutor.shutdownNow();
-            entryCacheManager.clear();
-        });
+                    }));
+                }).thenAcceptAsync(__ -> {
+                    //wait for tasks in scheduledExecutor executed.
+                    scheduledExecutor.shutdownNow();
+                    entryCacheManager.clear();
+                });
     }
 
     @Override
@@ -861,14 +856,14 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         asyncGetManagedLedgerInfo(managedLedgerName, new 
ManagedLedgerInfoCallback() {
             @Override
             public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
-                BookKeeper bkc = getBookKeeper();
-
-                // First delete all cursors resources
-                List<CompletableFuture<Void>> futures = 
info.cursors.entrySet().stream()
-                        .map(e -> deleteCursor(bkc, managedLedgerName, 
e.getKey(), e.getValue()))
-                        .collect(Collectors.toList());
-                Futures.waitForAll(futures).thenRun(() -> {
-                    deleteManagedLedgerData(bkc, managedLedgerName, info, 
mlConfigFuture, callback, ctx);
+                getBookKeeper().thenCompose(bk -> {
+                    // First delete all cursors resources
+                    List<CompletableFuture<Void>> futures = 
info.cursors.entrySet().stream()
+                            .map(e -> deleteCursor(bk, managedLedgerName, 
e.getKey(), e.getValue()))
+                            .collect(Collectors.toList());
+                    return Futures.waitForAll(futures).thenApply(v -> bk);
+                }).thenAccept(bk -> {
+                    deleteManagedLedgerData(bk, managedLedgerName, info, 
mlConfigFuture, callback, ctx);
                 }).exceptionally(ex -> {
                     callback.deleteLedgerFailed(new 
ManagedLedgerException(ex), ctx);
                     return null;
@@ -1053,7 +1048,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         return this.mbean;
     }
 
-    public BookKeeper getBookKeeper() {
+    public CompletableFuture<BookKeeper> getBookKeeper() {
         return bookkeeperFactory.get();
     }
 
@@ -1062,7 +1057,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
      *
      */
     public interface BookkeeperFactoryForCustomEnsemblePlacementPolicy {
-        default BookKeeper get() {
+        default CompletableFuture<BookKeeper> get() {
             return get(null);
         }
 
@@ -1073,7 +1068,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
          * @param ensemblePlacementPolicyMetadata
          * @return
          */
-        BookKeeper get(EnsemblePlacementPolicyConfig 
ensemblePlacementPolicyMetadata);
+        CompletableFuture<BookKeeper> get(EnsemblePlacementPolicyConfig 
ensemblePlacementPolicyMetadata);
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerFactoryImpl.class);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
index a271d439e06..81cd94e5bf9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
@@ -140,7 +140,7 @@ public class ManagedLedgerOfflineBacklog {
             final NavigableMap<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception {
         String managedLedgerName = topicName.getPersistenceNamingEncoding();
         MetaStore store = factory.getMetaStore();
-        BookKeeper bk = factory.getBookKeeper();
+
         final CountDownLatch mlMetaCounter = new CountDownLatch(1);
 
         store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing 
*/,
@@ -180,12 +180,16 @@ public class ManagedLedgerOfflineBacklog {
                             if (log.isDebugEnabled()) {
                                 log.debug("[{}] Opening ledger {}", 
managedLedgerName, id);
                             }
-                            try {
-                                bk.asyncOpenLedgerNoRecovery(id, digestType, 
password, opencb, null);
-                            } catch (Exception e) {
-                                log.warn("[{}] Failed to open ledger {}: {}", 
managedLedgerName, id, e);
-                                mlMetaCounter.countDown();
-                            }
+
+                            factory.getBookKeeper()
+                                    .thenAccept(bk -> {
+                                        bk.asyncOpenLedgerNoRecovery(id, 
digestType, password, opencb, null);
+                                    }).exceptionally(ex -> {
+                                        log.warn("[{}] Failed to open ledger 
{}: {}", managedLedgerName, id, ex);
+                                        opencb.openComplete(-1, null, null);
+                                        mlMetaCounter.countDown();
+                                        return null;
+                                    });
                         } else {
                             log.warn("[{}] Ledger list empty", 
managedLedgerName);
                             mlMetaCounter.countDown();
@@ -217,7 +221,7 @@ public class ManagedLedgerOfflineBacklog {
         }
         String managedLedgerName = topicName.getPersistenceNamingEncoding();
         MetaStore store = factory.getMetaStore();
-        BookKeeper bk = factory.getBookKeeper();
+        BookKeeper bk = factory.getBookKeeper().get();
         final CountDownLatch allCursorsCounter = new CountDownLatch(1);
         final long errorInReadingCursor = -1;
         ConcurrentOpenHashMap<String, Long> ledgerRetryMap =
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
index 95923baac02..5ab1a01838d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
@@ -19,9 +19,9 @@
 package org.apache.pulsar.broker;
 
 import io.netty.channel.EventLoopGroup;
-import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -31,13 +31,16 @@ import 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
  * Provider of a new BookKeeper client instance.
  */
 public interface BookKeeperClientFactory {
-    BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, 
EventLoopGroup eventLoopGroup,
-                      Optional<Class<? extends EnsemblePlacementPolicy>> 
ensemblePlacementPolicyClass,
-                      Map<String, Object> ensemblePlacementPolicyProperties) 
throws IOException;
+    CompletableFuture<BookKeeper> create(ServiceConfiguration conf, 
MetadataStoreExtended store,
+                                         EventLoopGroup eventLoopGroup,
+                                         Optional<Class<? extends 
EnsemblePlacementPolicy>> policyClass,
+                                         Map<String, Object> 
ensemblePlacementPolicyProperties);
+
+    CompletableFuture<BookKeeper> create(ServiceConfiguration conf, 
MetadataStoreExtended store,
+                                         EventLoopGroup eventLoopGroup,
+                                         Optional<Class<? extends 
EnsemblePlacementPolicy>> policyClass,
+                                         Map<String, Object> 
ensemblePlacementPolicyProperties,
+                                         StatsLogger statsLogger);
 
-    BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, 
EventLoopGroup eventLoopGroup,
-                      Optional<Class<? extends EnsemblePlacementPolicy>> 
ensemblePlacementPolicyClass,
-                      Map<String, Object> ensemblePlacementPolicyProperties,
-                      StatsLogger statsLogger) throws IOException;
     void close();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index e5293cee24e..45299d9ed05 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -29,6 +29,7 @@ import io.netty.channel.EventLoopGroup;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
@@ -53,19 +54,19 @@ import 
org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
 public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
 
     @Override
-    public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended 
store,
-                             EventLoopGroup eventLoopGroup,
-                             Optional<Class<? extends 
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
-                             Map<String, Object> properties) throws 
IOException {
-        return create(conf, store, eventLoopGroup, 
ensemblePlacementPolicyClass, properties,
+    public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, 
MetadataStoreExtended store,
+                                                EventLoopGroup eventLoopGroup,
+                                                Optional<Class<? extends 
EnsemblePlacementPolicy>> policyClass,
+                                                Map<String, Object> 
properties) {
+        return create(conf, store, eventLoopGroup, policyClass, properties,
                 NullStatsLogger.INSTANCE);
     }
 
     @Override
-    public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended 
store,
+    public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, 
MetadataStoreExtended store,
                              EventLoopGroup eventLoopGroup,
                              Optional<Class<? extends 
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
-                             Map<String, Object> properties, StatsLogger 
statsLogger) throws IOException {
+                             Map<String, Object> properties, StatsLogger 
statsLogger) {
         PulsarMetadataClientDriver.init();
 
         ClientConfiguration bkConf = createBkClientConfiguration(store, conf);
@@ -77,11 +78,14 @@ public class BookKeeperClientFactoryImpl implements 
BookKeeperClientFactory {
         } else {
             setDefaultEnsemblePlacementPolicy(bkConf, conf, store);
         }
-        try {
-            return getBookKeeperBuilder(conf, eventLoopGroup, statsLogger, 
bkConf).build();
-        } catch (InterruptedException | BKException e) {
-            throw new IOException(e);
-        }
+
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                return getBookKeeperBuilder(conf, eventLoopGroup, statsLogger, 
bkConf).build();
+            } catch (InterruptedException | BKException | IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 8861b12f0c1..6ed95f167a1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pulsar.broker;
 
+import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.EventLoopGroup;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -48,8 +50,8 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
 
     private ManagedLedgerFactory managedLedgerFactory;
     private BookKeeper defaultBkClient;
-    private final Map<EnsemblePlacementPolicyConfig, BookKeeper>
-            bkEnsemblePolicyToBkClientMap = new ConcurrentHashMap<>();
+    private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper>
+            bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().buildAsync();
     private StatsProvider statsProvider = new NullStatsProvider();
 
     public void initialize(ServiceConfiguration conf, MetadataStoreExtended 
metadataStore,
@@ -89,27 +91,20 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
         StatsLogger statsLogger = 
statsProvider.getStatsLogger("pulsar_managedLedger_client");
 
         this.defaultBkClient =
-                bookkeeperProvider.create(conf, metadataStore, eventLoopGroup, 
Optional.empty(), null, statsLogger);
+                bookkeeperProvider.create(conf, metadataStore, eventLoopGroup, 
Optional.empty(), null, statsLogger)
+                        .get();
 
         BookkeeperFactoryForCustomEnsemblePlacementPolicy bkFactory = (
                 EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) 
-> {
-            BookKeeper bkClient = null;
-            // find or create bk-client in cache for a specific 
ensemblePlacementPolicy
-            if (ensemblePlacementPolicyConfig != null && 
ensemblePlacementPolicyConfig.getPolicyClass() != null) {
-                bkClient = 
bkEnsemblePolicyToBkClientMap.computeIfAbsent(ensemblePlacementPolicyConfig, 
(key) -> {
-                    try {
-                        return bookkeeperProvider.create(conf, metadataStore, 
eventLoopGroup,
-                                
Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()),
-                                ensemblePlacementPolicyConfig.getProperties(), 
statsLogger);
-                    } catch (Exception e) {
-                        log.error("Failed to initialize bk-client for policy 
{}, properties {}",
-                                ensemblePlacementPolicyConfig.getPolicyClass(),
-                                ensemblePlacementPolicyConfig.getProperties(), 
e);
-                    }
-                    return this.defaultBkClient;
-                });
+            if (ensemblePlacementPolicyConfig == null || 
ensemblePlacementPolicyConfig.getPolicyClass() == null) {
+                return CompletableFuture.completedFuture(defaultBkClient);
             }
-            return bkClient != null ? bkClient : defaultBkClient;
+
+            // find or create bk-client in cache for a specific 
ensemblePlacementPolicy
+            return 
bkEnsemblePolicyToBkClientMap.get(ensemblePlacementPolicyConfig,
+                    (config, executor) -> bookkeeperProvider.create(conf, 
metadataStore, eventLoopGroup,
+                            
Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()),
+                            ensemblePlacementPolicyConfig.getProperties(), 
statsLogger));
         };
 
         try {
@@ -136,7 +131,7 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
 
     @VisibleForTesting
     public Map<EnsemblePlacementPolicyConfig, BookKeeper> 
getBkEnsemblePolicyToBookKeeperMap() {
-        return bkEnsemblePolicyToBkClientMap;
+        return bkEnsemblePolicyToBkClientMap.synchronous().asMap();
     }
 
     @Override
@@ -164,7 +159,7 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
                 // factory, however that might be introducing more unknowns.
                 log.warn("Encountered exceptions on closing bookkeeper 
client", ree);
             }
-            bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
+            
bkEnsemblePolicyToBkClientMap.synchronous().asMap().forEach((policy, bk) -> {
                 try {
                     if (bk != null) {
                         bk.close();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index e99f39b382f..8dcfe8d39a8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -107,7 +107,7 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
                 pulsar.getIoEventLoopGroup(),
                 Optional.empty(),
                 null
-        );
+        ).get();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index acdd906f6b8..99f0249b304 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -110,7 +110,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             pulsar.getIoEventLoopGroup(),
             Optional.empty(),
             null
-        );
+        ).join();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 3225f7294d5..7d35c2c0f7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -165,7 +165,7 @@ public class CompactorTool {
                 new DefaultThreadFactory("compactor-io"));
 
         @Cleanup
-        BookKeeper bk = bkClientFactory.create(brokerConfig, store, 
eventLoopGroup, Optional.empty(), null);
+        BookKeeper bk = bkClientFactory.create(brokerConfig, store, 
eventLoopGroup, Optional.empty(), null).get();
 
         @Cleanup
         PulsarClient pulsar = createClient(brokerConfig);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
index 6d65687a501..887e35e2774 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
@@ -19,9 +19,9 @@
 package org.apache.pulsar.broker;
 
 import io.netty.channel.EventLoopGroup;
-import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
@@ -51,19 +51,19 @@ public class MockedBookKeeperClientFactory implements 
BookKeeperClientFactory {
     }
 
     @Override
-    public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended 
store,
-                             EventLoopGroup eventLoopGroup,
-                             Optional<Class<? extends 
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
-                             Map<String, Object> properties) throws 
IOException {
-        return mockedBk;
+    public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, 
MetadataStoreExtended store,
+                                                EventLoopGroup eventLoopGroup,
+                                                Optional<Class<? extends 
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                                                Map<String, Object> 
properties) {
+        return CompletableFuture.completedFuture(mockedBk);
     }
 
     @Override
-    public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended 
store,
+    public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, 
MetadataStoreExtended store,
                              EventLoopGroup eventLoopGroup,
                              Optional<Class<? extends 
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
-                             Map<String, Object> properties, StatsLogger 
statsLogger) throws IOException {
-        return mockedBk;
+                             Map<String, Object> properties, StatsLogger 
statsLogger) {
+        return CompletableFuture.completedFuture(mockedBk);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
index fd457687323..5f02fd7af48 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.testcontext;
 import io.netty.channel.EventLoopGroup;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -39,21 +40,21 @@ class MockBookKeeperClientFactory implements 
BookKeeperClientFactory {
     }
 
     @Override
-    public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended 
store,
-                             EventLoopGroup eventLoopGroup,
-                             Optional<Class<? extends 
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
-                             Map<String, Object> properties) {
+    public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, 
MetadataStoreExtended store,
+                                    EventLoopGroup eventLoopGroup,
+                                    Optional<Class<? extends 
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                                    Map<String, Object> properties) {
         // Always return the same instance (so that we don't loose the mock BK 
content on broker restart
-        return mockBookKeeper;
+        return CompletableFuture.completedFuture(mockBookKeeper);
     }
 
     @Override
-    public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended 
store,
+    public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, 
MetadataStoreExtended store,
                              EventLoopGroup eventLoopGroup,
                              Optional<Class<? extends 
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
                              Map<String, Object> properties, StatsLogger 
statsLogger) {
         // Always return the same instance (so that we don't loose the mock BK 
content on broker restart
-        return mockBookKeeper;
+        return CompletableFuture.completedFuture(mockBookKeeper);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index e955a433ad5..3cca85aa2f1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -163,7 +163,7 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
     public void testEntryLookup() throws Exception {
         @Cleanup
         BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
-                this.conf, null, null, Optional.empty(), null);
+                this.conf, null, null, Optional.empty(), null).get();
 
         Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, 
Long>>> compactedLedgerData
             = buildCompactedLedger(bk, 500);
@@ -219,7 +219,7 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
     public void testCleanupOldCompactedTopicLedger() throws Exception {
         @Cleanup
         BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
-                this.conf, null, null, Optional.empty(), null);
+                this.conf, null, null, Optional.empty(), null).get();
 
         LedgerHandle oldCompactedLedger = bk.createLedger(1, 1,
                 Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
@@ -849,7 +849,7 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
     public void 
testCompactWithConcurrentGetCompactionHorizonAndCompactedTopicContext() throws 
Exception {
         @Cleanup
         BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
-                this.conf, null, null, Optional.empty(), null);
+                this.conf, null, null, Optional.empty(), null).get();
 
         Mockito.doAnswer(invocation -> {
             Thread.sleep(1500);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index 98bf2b819c2..ac1ba6bc814 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -78,7 +78,7 @@ public class CompactionRetentionTest extends 
MockedPulsarServiceBaseTest {
 
         compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
-        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, 
Optional.empty(), null);
+        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, 
Optional.empty(), null).get();
         compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index f0010096b1e..081831b0300 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -123,7 +123,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
         compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
-        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, 
Optional.empty(), null);
+        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, 
Optional.empty(), null).get();
         compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index debc3dd5e3f..16945a60f5d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -100,7 +100,7 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
         compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
         bk = pulsar.getBookKeeperClientFactory().create(
-                this.conf, null, null, Optional.empty(), null);
+                this.conf, null, null, Optional.empty(), null).get();
         compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
index b3a48f40547..91402168108 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
@@ -155,7 +155,7 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
 
         compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
-        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, 
Optional.empty(), null);
+        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, 
Optional.empty(), null).get();
         schema = Schema.JSON(ServiceUnitStateData.class);
         strategy = new ServiceUnitStateCompactionStrategy();
         strategy.checkBrokers(false);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
index d84d1ccc9ea..ba77ce5bd9d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
@@ -72,7 +72,7 @@ public class TopicCompactionServiceTest extends 
MockedPulsarServiceBaseTest {
         compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
         bk = pulsar.getBookKeeperClientFactory().create(
-                this.conf, null, null, Optional.empty(), null);
+                this.conf, null, null, Optional.empty(), null).get();
         compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
     }
 


Reply via email to