rdhabalia commented on code in PR #22842:
URL: https://github.com/apache/pulsar/pull/22842#discussion_r1626782396


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -378,63 +378,77 @@ public void asyncOpen(final String name, final 
ManagedLedgerConfig config, final
         ledgers.computeIfAbsent(name, (mlName) -> {
             // Create the managed ledger
             CompletableFuture<ManagedLedgerImpl> future = new 
CompletableFuture<>();
-            BookKeeper bk = bookkeeperFactory.get(
-                    new 
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
-                            
config.getBookKeeperEnsemblePlacementPolicyProperties()));
-            final ManagedLedgerImpl newledger = config.getShadowSource() == 
null
-                    ? new ManagedLedgerImpl(this, bk, store, config, 
scheduledExecutor, name, mlOwnershipChecker)
-                    : new ShadowManagedLedgerImpl(this, bk, store, config, 
scheduledExecutor, name,
-                    mlOwnershipChecker);
-            PendingInitializeManagedLedger pendingLedger = new 
PendingInitializeManagedLedger(newledger);
-            pendingInitializeLedgers.put(name, pendingLedger);
-            newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
-                @Override
-                public void initializeComplete() {
-                    log.info("[{}] Successfully initialize managed ledger", 
name);
-                    pendingInitializeLedgers.remove(name, pendingLedger);
-                    future.complete(newledger);
-
-                    // May need to update the cursor position
-                    newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
-                    // May need to trigger offloading
-                    if (config.isTriggerOffloadOnTopicLoad()) {
-                        
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
-                    }
+            CompletableFuture<BookKeeper> bkFuture = 
createBookKeeperClient(config);
+            bkFuture.handle((bk, ex) -> {
+                if (ex != null) {
+                    future.completeExceptionally(ex);
+                    return null;
                 }
+                final ManagedLedgerImpl newledger = config.getShadowSource() 
== null
+                        ? new ManagedLedgerImpl(this, bk, store, config, 
scheduledExecutor, name, mlOwnershipChecker)
+                        : new ShadowManagedLedgerImpl(this, bk, store, config, 
scheduledExecutor, name,
+                        mlOwnershipChecker);
+                PendingInitializeManagedLedger pendingLedger = new 
PendingInitializeManagedLedger(newledger);
+                pendingInitializeLedgers.put(name, pendingLedger);
+                newledger.initialize(new 
ManagedLedgerInitializeLedgerCallback() {
+                    @Override
+                    public void initializeComplete() {
+                        log.info("[{}] Successfully initialize managed 
ledger", name);
+                        pendingInitializeLedgers.remove(name, pendingLedger);
+                        future.complete(newledger);
 
-                @Override
-                public void initializeFailed(ManagedLedgerException e) {
-                    if (config.isCreateIfMissing()) {
-                        log.error("[{}] Failed to initialize managed ledger: 
{}", name, e.getMessage());
+                        // May need to update the cursor position
+                        
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
+                        // May need to trigger offloading
+                        if (config.isTriggerOffloadOnTopicLoad()) {
+                            
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
+                        }
                     }
 
-                    // Clean the map if initialization fails
-                    ledgers.remove(name, future);
+                    @Override
+                    public void initializeFailed(ManagedLedgerException e) {
+                        if (config.isCreateIfMissing()) {
+                            log.error("[{}] Failed to initialize managed 
ledger: {}", name, e.getMessage());
+                        }
 
-                    if (pendingInitializeLedgers.remove(name, pendingLedger)) {
-                        pendingLedger.ledger.asyncClose(new CloseCallback() {
-                            @Override
-                            public void closeComplete(Object ctx) {
-                                // no-op
-                            }
+                        // Clean the map if initialization fails
+                        ledgers.remove(name, future);
 
-                            @Override
-                            public void closeFailed(ManagedLedgerException 
exception, Object ctx) {
-                                log.warn("[{}] Failed to a pending 
initialization managed ledger", name, exception);
-                            }
-                        }, null);
-                    }
+                        if (pendingInitializeLedgers.remove(name, 
pendingLedger)) {
+                            pendingLedger.ledger.asyncClose(new 
CloseCallback() {
+                                @Override
+                                public void closeComplete(Object ctx) {
+                                    // no-op
+                                }
 
-                    future.completeExceptionally(e);
-                }
-            }, null);
+                                @Override
+                                public void closeFailed(ManagedLedgerException 
exception, Object ctx) {
+                                    log.warn("[{}] Failed to a pending 
initialization managed ledger", name, exception);
+                                }
+                            }, null);
+                        }
+                        future.completeExceptionally(e);
+                    }
+                }, null);
+                return null;
+            });
             return future;
         }).thenAccept(ml -> callback.openLedgerComplete(ml, 
ctx)).exceptionally(exception -> {
             callback.openLedgerFailed((ManagedLedgerException) 
exception.getCause(), ctx);
             return null;
         });
     }
 
+    private CompletableFuture<BookKeeper> 
createBookKeeperClient(ManagedLedgerConfig config) {
+        CompletableFuture<BookKeeper> future = new CompletableFuture<>();
+        scheduledExecutor.execute(() -> {

Review Comment:
   > I think the culprit of the issue is in the 
BookkeeperFactoryForCustomEnsemblePlacementPolicy interface
   
   Yes. But that's something that depends on other system which is BookKeeper 
in this case, Upgrading bk version will not be possible for all Pulsar stable 
version. 
   Let's merge this patch and I can make bookkeeper enhancement to fix this 
issue for future version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to