This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 217f1f011b66677d6d78ff9d42837c864ae10104 Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Jun 5 17:09:32 2024 -0700 [improve] Refactored BK ClientFactory to return futures (#22853) --- .../mledger/impl/ManagedLedgerFactoryImpl.java | 213 ++++++++++----------- .../mledger/impl/ManagedLedgerOfflineBacklog.java | 20 +- .../pulsar/broker/BookKeeperClientFactory.java | 19 +- .../pulsar/broker/BookKeeperClientFactoryImpl.java | 28 +-- .../pulsar/broker/ManagedLedgerClientFactory.java | 39 ++-- .../bucket/BookkeeperBucketSnapshotStorage.java | 2 +- .../service/schema/BookkeeperSchemaStorage.java | 2 +- .../apache/pulsar/compaction/CompactorTool.java | 2 +- .../broker/MockedBookKeeperClientFactory.java | 18 +- .../testcontext/MockBookKeeperClientFactory.java | 15 +- .../pulsar/compaction/CompactedTopicTest.java | 6 +- .../pulsar/compaction/CompactionRetentionTest.java | 2 +- .../apache/pulsar/compaction/CompactionTest.java | 2 +- .../apache/pulsar/compaction/CompactorTest.java | 2 +- .../compaction/ServiceUnitStateCompactionTest.java | 2 +- .../compaction/TopicCompactionServiceTest.java | 2 +- 16 files changed, 188 insertions(+), 186 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 5ce84b3ed85..a0929044a6a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -160,7 +160,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper, ManagedLedgerFactoryConfig config) throws Exception { - this(metadataStore, (policyConfig) -> bookKeeper, config); + this(metadataStore, (policyConfig) -> CompletableFuture.completedFuture(bookKeeper), config); } public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, @@ -232,8 +232,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { } @Override - public BookKeeper get(EnsemblePlacementPolicyConfig policy) { - return bkClient; + public CompletableFuture<BookKeeper> get(EnsemblePlacementPolicyConfig policy) { + return CompletableFuture.completedFuture(bkClient); } } @@ -377,52 +377,59 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { ledgers.computeIfAbsent(name, (mlName) -> { // Create the managed ledger CompletableFuture<ManagedLedgerImpl> future = new CompletableFuture<>(); - BookKeeper bk = bookkeeperFactory.get( - new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(), - config.getBookKeeperEnsemblePlacementPolicyProperties())); - final ManagedLedgerImpl newledger = config.getShadowSource() == null - ? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, mlOwnershipChecker) - : new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, - mlOwnershipChecker); - PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger); - pendingInitializeLedgers.put(name, pendingLedger); - newledger.initialize(new ManagedLedgerInitializeLedgerCallback() { - @Override - public void initializeComplete() { - log.info("[{}] Successfully initialize managed ledger", name); - pendingInitializeLedgers.remove(name, pendingLedger); - future.complete(newledger); + bookkeeperFactory.get( + new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(), + config.getBookKeeperEnsemblePlacementPolicyProperties())) + .thenAccept(bk -> { + final ManagedLedgerImpl newledger = config.getShadowSource() == null + ? new ManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, + mlOwnershipChecker) + : new ShadowManagedLedgerImpl(this, bk, store, config, scheduledExecutor, name, + mlOwnershipChecker); + PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger); + pendingInitializeLedgers.put(name, pendingLedger); + newledger.initialize(new ManagedLedgerInitializeLedgerCallback() { + @Override + public void initializeComplete() { + log.info("[{}] Successfully initialize managed ledger", name); + pendingInitializeLedgers.remove(name, pendingLedger); + future.complete(newledger); - // May need to update the cursor position - newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); - } + // May need to update the cursor position + newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); + } - @Override - public void initializeFailed(ManagedLedgerException e) { - if (config.isCreateIfMissing()) { - log.error("[{}] Failed to initialize managed ledger: {}", name, e.getMessage()); - } + @Override + public void initializeFailed(ManagedLedgerException e) { + if (config.isCreateIfMissing()) { + log.error("[{}] Failed to initialize managed ledger: {}", name, e.getMessage()); + } - // Clean the map if initialization fails - ledgers.remove(name, future); + // Clean the map if initialization fails + ledgers.remove(name, future); - if (pendingInitializeLedgers.remove(name, pendingLedger)) { - pendingLedger.ledger.asyncClose(new CloseCallback() { - @Override - public void closeComplete(Object ctx) { - // no-op - } + if (pendingInitializeLedgers.remove(name, pendingLedger)) { + pendingLedger.ledger.asyncClose(new CloseCallback() { + @Override + public void closeComplete(Object ctx) { + // no-op + } - @Override - public void closeFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}] Failed to a pending initialization managed ledger", name, exception); + @Override + public void closeFailed(ManagedLedgerException exception, Object ctx) { + log.warn("[{}] Failed to a pending initialization managed ledger", name, + exception); + } + }, null); + } + + future.completeExceptionally(e); } }, null); - } - - future.completeExceptionally(e); - } - }, null); + }).exceptionally(ex -> { + future.completeExceptionally(ex); + return null; + }); return future; }).thenAccept(ml -> callback.openLedgerComplete(ml, ctx)).exceptionally(exception -> { callback.openLedgerFailed((ManagedLedgerException) exception.getCause(), ctx); @@ -438,20 +445,22 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { callback.openReadOnlyManagedLedgerFailed( new ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx); } - ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this, - bookkeeperFactory - .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(), - config.getBookKeeperEnsemblePlacementPolicyProperties())), - store, config, scheduledExecutor, managedLedgerName); - roManagedLedger.initialize().thenRun(() -> { - log.info("[{}] Successfully initialize Read-only managed ledger", managedLedgerName); - callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx); - - }).exceptionally(e -> { - log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e); - callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) e.getCause(), ctx); - return null; - }); + + bookkeeperFactory + .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(), + config.getBookKeeperEnsemblePlacementPolicyProperties())) + .thenCompose(bk -> { + ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this, bk, + store, config, scheduledExecutor, managedLedgerName); + return roManagedLedger.initialize().thenApply(v -> roManagedLedger); + }).thenAccept(roManagedLedger -> { + log.info("[{}] Successfully initialize Read-only managed ledger", managedLedgerName); + callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx); + }).exceptionally(e -> { + log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e); + callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) e.getCause(), ctx); + return null; + }); } @Override @@ -573,49 +582,35 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { ledgerFuture.completeExceptionally(new ManagedLedgerException.ManagedLedgerFactoryClosedException()); } } - CompletableFuture<Void> bookkeeperFuture = new CompletableFuture<>(); - futures.add(bookkeeperFuture); - futures.add(CompletableFuture.runAsync(() -> { - if (isBookkeeperManaged) { - try { - BookKeeper bookkeeper = bookkeeperFactory.get(); - if (bookkeeper != null) { - bookkeeper.close(); - } - bookkeeperFuture.complete(null); - } catch (Throwable throwable) { - bookkeeperFuture.completeExceptionally(throwable); - } - } else { - bookkeeperFuture.complete(null); - } - if (!ledgers.isEmpty()) { - log.info("Force closing {} ledgers.", ledgers.size()); - //make sure all callbacks is called. - ledgers.forEach(((ledgerName, ledgerFuture) -> { - if (!ledgerFuture.isDone()) { - ledgerFuture.completeExceptionally( - new ManagedLedgerException.ManagedLedgerFactoryClosedException()); - } else { - ManagedLedgerImpl managedLedger = ledgerFuture.getNow(null); - if (managedLedger == null) { - return; - } - try { - managedLedger.close(); - } catch (Throwable throwable) { - log.warn("[{}] Got exception when closing managed ledger: {}", managedLedger.getName(), - throwable); + CompletableFuture<BookKeeper> bookkeeperFuture = isBookkeeperManaged + ? bookkeeperFactory.get() + : CompletableFuture.completedFuture(null); + return bookkeeperFuture + .thenRun(() -> { + log.info("Closing {} ledgers.", ledgers.size()); + //make sure all callbacks is called. + ledgers.forEach(((ledgerName, ledgerFuture) -> { + if (!ledgerFuture.isDone()) { + ledgerFuture.completeExceptionally( + new ManagedLedgerException.ManagedLedgerFactoryClosedException()); + } else { + ManagedLedgerImpl managedLedger = ledgerFuture.getNow(null); + if (managedLedger == null) { + return; + } + try { + managedLedger.close(); + } catch (Throwable throwable) { + log.warn("[{}] Got exception when closing managed ledger: {}", managedLedger.getName(), + throwable); + } } - } - })); - } - })); - return FutureUtil.waitForAll(futures).thenAcceptAsync(__ -> { - //wait for tasks in scheduledExecutor executed. - scheduledExecutor.shutdownNow(); - entryCacheManager.clear(); - }); + })); + }).thenAcceptAsync(__ -> { + //wait for tasks in scheduledExecutor executed. + scheduledExecutor.shutdownNow(); + entryCacheManager.clear(); + }); } @Override @@ -856,14 +851,14 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { asyncGetManagedLedgerInfo(managedLedgerName, new ManagedLedgerInfoCallback() { @Override public void getInfoComplete(ManagedLedgerInfo info, Object ctx) { - BookKeeper bkc = getBookKeeper(); - - // First delete all cursors resources - List<CompletableFuture<Void>> futures = info.cursors.entrySet().stream() - .map(e -> deleteCursor(bkc, managedLedgerName, e.getKey(), e.getValue())) - .collect(Collectors.toList()); - Futures.waitForAll(futures).thenRun(() -> { - deleteManagedLedgerData(bkc, managedLedgerName, info, mlConfigFuture, callback, ctx); + getBookKeeper().thenCompose(bk -> { + // First delete all cursors resources + List<CompletableFuture<Void>> futures = info.cursors.entrySet().stream() + .map(e -> deleteCursor(bk, managedLedgerName, e.getKey(), e.getValue())) + .collect(Collectors.toList()); + return Futures.waitForAll(futures).thenApply(v -> bk); + }).thenAccept(bk -> { + deleteManagedLedgerData(bk, managedLedgerName, info, mlConfigFuture, callback, ctx); }).exceptionally(ex -> { callback.deleteLedgerFailed(new ManagedLedgerException(ex), ctx); return null; @@ -1048,7 +1043,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { return this.mbean; } - public BookKeeper getBookKeeper() { + public CompletableFuture<BookKeeper> getBookKeeper() { return bookkeeperFactory.get(); } @@ -1057,7 +1052,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { * */ public interface BookkeeperFactoryForCustomEnsemblePlacementPolicy { - default BookKeeper get() { + default CompletableFuture<BookKeeper> get() { return get(null); } @@ -1068,7 +1063,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { * @param ensemblePlacementPolicyMetadata * @return */ - BookKeeper get(EnsemblePlacementPolicyConfig ensemblePlacementPolicyMetadata); + CompletableFuture<BookKeeper> get(EnsemblePlacementPolicyConfig ensemblePlacementPolicyMetadata); } private static final Logger log = LoggerFactory.getLogger(ManagedLedgerFactoryImpl.class); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java index a271d439e06..81cd94e5bf9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java @@ -140,7 +140,7 @@ public class ManagedLedgerOfflineBacklog { final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception { String managedLedgerName = topicName.getPersistenceNamingEncoding(); MetaStore store = factory.getMetaStore(); - BookKeeper bk = factory.getBookKeeper(); + final CountDownLatch mlMetaCounter = new CountDownLatch(1); store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing */, @@ -180,12 +180,16 @@ public class ManagedLedgerOfflineBacklog { if (log.isDebugEnabled()) { log.debug("[{}] Opening ledger {}", managedLedgerName, id); } - try { - bk.asyncOpenLedgerNoRecovery(id, digestType, password, opencb, null); - } catch (Exception e) { - log.warn("[{}] Failed to open ledger {}: {}", managedLedgerName, id, e); - mlMetaCounter.countDown(); - } + + factory.getBookKeeper() + .thenAccept(bk -> { + bk.asyncOpenLedgerNoRecovery(id, digestType, password, opencb, null); + }).exceptionally(ex -> { + log.warn("[{}] Failed to open ledger {}: {}", managedLedgerName, id, ex); + opencb.openComplete(-1, null, null); + mlMetaCounter.countDown(); + return null; + }); } else { log.warn("[{}] Ledger list empty", managedLedgerName); mlMetaCounter.countDown(); @@ -217,7 +221,7 @@ public class ManagedLedgerOfflineBacklog { } String managedLedgerName = topicName.getPersistenceNamingEncoding(); MetaStore store = factory.getMetaStore(); - BookKeeper bk = factory.getBookKeeper(); + BookKeeper bk = factory.getBookKeeper().get(); final CountDownLatch allCursorsCounter = new CountDownLatch(1); final long errorInReadingCursor = -1; ConcurrentOpenHashMap<String, Long> ledgerRetryMap = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java index 95923baac02..5ab1a01838d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java @@ -19,9 +19,9 @@ package org.apache.pulsar.broker; import io.netty.channel.EventLoopGroup; -import java.io.IOException; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.EnsemblePlacementPolicy; import org.apache.bookkeeper.stats.StatsLogger; @@ -31,13 +31,16 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; * Provider of a new BookKeeper client instance. */ public interface BookKeeperClientFactory { - BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup, - Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, - Map<String, Object> ensemblePlacementPolicyProperties) throws IOException; + CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store, + EventLoopGroup eventLoopGroup, + Optional<Class<? extends EnsemblePlacementPolicy>> policyClass, + Map<String, Object> ensemblePlacementPolicyProperties); + + CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store, + EventLoopGroup eventLoopGroup, + Optional<Class<? extends EnsemblePlacementPolicy>> policyClass, + Map<String, Object> ensemblePlacementPolicyProperties, + StatsLogger statsLogger); - BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup, - Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, - Map<String, Object> ensemblePlacementPolicyProperties, - StatsLogger statsLogger) throws IOException; void close(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java index e5293cee24e..45299d9ed05 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java @@ -29,6 +29,7 @@ import io.netty.channel.EventLoopGroup; import java.io.IOException; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; @@ -53,19 +54,19 @@ import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver; public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory { @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, - EventLoopGroup eventLoopGroup, - Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, - Map<String, Object> properties) throws IOException { - return create(conf, store, eventLoopGroup, ensemblePlacementPolicyClass, properties, + public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store, + EventLoopGroup eventLoopGroup, + Optional<Class<? extends EnsemblePlacementPolicy>> policyClass, + Map<String, Object> properties) { + return create(conf, store, eventLoopGroup, policyClass, properties, NullStatsLogger.INSTANCE); } @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, + public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup, Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, - Map<String, Object> properties, StatsLogger statsLogger) throws IOException { + Map<String, Object> properties, StatsLogger statsLogger) { PulsarMetadataClientDriver.init(); ClientConfiguration bkConf = createBkClientConfiguration(store, conf); @@ -77,11 +78,14 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory { } else { setDefaultEnsemblePlacementPolicy(bkConf, conf, store); } - try { - return getBookKeeperBuilder(conf, eventLoopGroup, statsLogger, bkConf).build(); - } catch (InterruptedException | BKException e) { - throw new IOException(e); - } + + return CompletableFuture.supplyAsync(() -> { + try { + return getBookKeeperBuilder(conf, eventLoopGroup, statsLogger, bkConf).build(); + } catch (InterruptedException | BKException | IOException e) { + throw new RuntimeException(e); + } + }); } @VisibleForTesting diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index 8861b12f0c1..6ed95f167a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -18,12 +18,14 @@ */ package org.apache.pulsar.broker; +import com.github.benmanes.caffeine.cache.AsyncCache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; import io.netty.channel.EventLoopGroup; import java.io.IOException; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.conf.ClientConfiguration; @@ -48,8 +50,8 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage { private ManagedLedgerFactory managedLedgerFactory; private BookKeeper defaultBkClient; - private final Map<EnsemblePlacementPolicyConfig, BookKeeper> - bkEnsemblePolicyToBkClientMap = new ConcurrentHashMap<>(); + private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper> + bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().buildAsync(); private StatsProvider statsProvider = new NullStatsProvider(); public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, @@ -89,27 +91,20 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage { StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_managedLedger_client"); this.defaultBkClient = - bookkeeperProvider.create(conf, metadataStore, eventLoopGroup, Optional.empty(), null, statsLogger); + bookkeeperProvider.create(conf, metadataStore, eventLoopGroup, Optional.empty(), null, statsLogger) + .get(); BookkeeperFactoryForCustomEnsemblePlacementPolicy bkFactory = ( EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) -> { - BookKeeper bkClient = null; - // find or create bk-client in cache for a specific ensemblePlacementPolicy - if (ensemblePlacementPolicyConfig != null && ensemblePlacementPolicyConfig.getPolicyClass() != null) { - bkClient = bkEnsemblePolicyToBkClientMap.computeIfAbsent(ensemblePlacementPolicyConfig, (key) -> { - try { - return bookkeeperProvider.create(conf, metadataStore, eventLoopGroup, - Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()), - ensemblePlacementPolicyConfig.getProperties(), statsLogger); - } catch (Exception e) { - log.error("Failed to initialize bk-client for policy {}, properties {}", - ensemblePlacementPolicyConfig.getPolicyClass(), - ensemblePlacementPolicyConfig.getProperties(), e); - } - return this.defaultBkClient; - }); + if (ensemblePlacementPolicyConfig == null || ensemblePlacementPolicyConfig.getPolicyClass() == null) { + return CompletableFuture.completedFuture(defaultBkClient); } - return bkClient != null ? bkClient : defaultBkClient; + + // find or create bk-client in cache for a specific ensemblePlacementPolicy + return bkEnsemblePolicyToBkClientMap.get(ensemblePlacementPolicyConfig, + (config, executor) -> bookkeeperProvider.create(conf, metadataStore, eventLoopGroup, + Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()), + ensemblePlacementPolicyConfig.getProperties(), statsLogger)); }; try { @@ -136,7 +131,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage { @VisibleForTesting public Map<EnsemblePlacementPolicyConfig, BookKeeper> getBkEnsemblePolicyToBookKeeperMap() { - return bkEnsemblePolicyToBkClientMap; + return bkEnsemblePolicyToBkClientMap.synchronous().asMap(); } @Override @@ -164,7 +159,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage { // factory, however that might be introducing more unknowns. log.warn("Encountered exceptions on closing bookkeeper client", ree); } - bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> { + bkEnsemblePolicyToBkClientMap.synchronous().asMap().forEach((policy, bk) -> { try { if (bk != null) { bk.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index e99f39b382f..8dcfe8d39a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -107,7 +107,7 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage { pulsar.getIoEventLoopGroup(), Optional.empty(), null - ); + ).get(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index acdd906f6b8..99f0249b304 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -110,7 +110,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage { pulsar.getIoEventLoopGroup(), Optional.empty(), null - ); + ).join(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java index 3225f7294d5..7d35c2c0f7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java @@ -165,7 +165,7 @@ public class CompactorTool { new DefaultThreadFactory("compactor-io")); @Cleanup - BookKeeper bk = bkClientFactory.create(brokerConfig, store, eventLoopGroup, Optional.empty(), null); + BookKeeper bk = bkClientFactory.create(brokerConfig, store, eventLoopGroup, Optional.empty(), null).get(); @Cleanup PulsarClient pulsar = createClient(brokerConfig); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java index 6d65687a501..887e35e2774 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java @@ -19,9 +19,9 @@ package org.apache.pulsar.broker; import io.netty.channel.EventLoopGroup; -import java.io.IOException; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.EnsemblePlacementPolicy; @@ -51,19 +51,19 @@ public class MockedBookKeeperClientFactory implements BookKeeperClientFactory { } @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, - EventLoopGroup eventLoopGroup, - Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, - Map<String, Object> properties) throws IOException { - return mockedBk; + public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store, + EventLoopGroup eventLoopGroup, + Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, + Map<String, Object> properties) { + return CompletableFuture.completedFuture(mockedBk); } @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, + public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup, Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, - Map<String, Object> properties, StatsLogger statsLogger) throws IOException { - return mockedBk; + Map<String, Object> properties, StatsLogger statsLogger) { + return CompletableFuture.completedFuture(mockedBk); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java index fd457687323..5f02fd7af48 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.testcontext; import io.netty.channel.EventLoopGroup; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.EnsemblePlacementPolicy; import org.apache.bookkeeper.stats.StatsLogger; @@ -39,21 +40,21 @@ class MockBookKeeperClientFactory implements BookKeeperClientFactory { } @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, - EventLoopGroup eventLoopGroup, - Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, - Map<String, Object> properties) { + public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store, + EventLoopGroup eventLoopGroup, + Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, + Map<String, Object> properties) { // Always return the same instance (so that we don't loose the mock BK content on broker restart - return mockBookKeeper; + return CompletableFuture.completedFuture(mockBookKeeper); } @Override - public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, + public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup, Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, Map<String, Object> properties, StatsLogger statsLogger) { // Always return the same instance (so that we don't loose the mock BK content on broker restart - return mockBookKeeper; + return CompletableFuture.completedFuture(mockBookKeeper); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java index e955a433ad5..3cca85aa2f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -163,7 +163,7 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { public void testEntryLookup() throws Exception { @Cleanup BookKeeper bk = pulsar.getBookKeeperClientFactory().create( - this.conf, null, null, Optional.empty(), null); + this.conf, null, null, Optional.empty(), null).get(); Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, Long>>> compactedLedgerData = buildCompactedLedger(bk, 500); @@ -219,7 +219,7 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { public void testCleanupOldCompactedTopicLedger() throws Exception { @Cleanup BookKeeper bk = pulsar.getBookKeeperClientFactory().create( - this.conf, null, null, Optional.empty(), null); + this.conf, null, null, Optional.empty(), null).get(); LedgerHandle oldCompactedLedger = bk.createLedger(1, 1, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, @@ -849,7 +849,7 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { public void testCompactWithConcurrentGetCompactionHorizonAndCompactedTopicContext() throws Exception { @Cleanup BookKeeper bk = pulsar.getBookKeeperClientFactory().create( - this.conf, null, null, Optional.empty(), null); + this.conf, null, null, Optional.empty(), null).get(); Mockito.doAnswer(invocation -> { Thread.sleep(1500); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java index 98bf2b819c2..ac1ba6bc814 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java @@ -78,7 +78,7 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest { compactionScheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); - bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null); + bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null).get(); compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index f0010096b1e..081831b0300 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -123,7 +123,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { compactionScheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); - bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null); + bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null).get(); compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index debc3dd5e3f..16945a60f5d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -100,7 +100,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { compactionScheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build()); bk = pulsar.getBookKeeperClientFactory().create( - this.conf, null, null, Optional.empty(), null); + this.conf, null, null, Optional.empty(), null).get(); compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java index b3a48f40547..91402168108 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java @@ -155,7 +155,7 @@ public class ServiceUnitStateCompactionTest extends MockedPulsarServiceBaseTest compactionScheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); - bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null); + bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null).get(); schema = Schema.JSON(ServiceUnitStateData.class); strategy = new ServiceUnitStateCompactionStrategy(); strategy.checkBrokers(false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java index d84d1ccc9ea..ba77ce5bd9d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java @@ -72,7 +72,7 @@ public class TopicCompactionServiceTest extends MockedPulsarServiceBaseTest { compactionScheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build()); bk = pulsar.getBookKeeperClientFactory().create( - this.conf, null, null, Optional.empty(), null); + this.conf, null, null, Optional.empty(), null).get(); compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); }