littleorca opened a new issue #11412: URL: https://github.com/apache/pulsar/issues/11412
**Describe the bug** For the moment that ManagedLedgerFactoryImpl.open() runs into MetaStoreImpl.getManagedLedgerInfo() and waiting for the future returned by store.get() to complete, if ManagedLedgerFactoryImpl.shutdown() is invoked then, the executor will be shutdown, and thus the future will be unable to run following stage after it's completion, as the executor will reject. Unfortunately, the exceptionally() also requires the executor, and the failure callback won't be called, open() will never return. **To Reproduce** Refer to the following test: ```java public class ManagedLedgerTest { @Test public void openEncounteredShutdown() throws Exception { final String ledgerName = UUID.randomUUID().toString(); final long version = 0; final long createTimeMillis = System.currentTimeMillis(); MetadataStore metadataStore = mock(MetadataStore.class); given(metadataStore.get(any())).willAnswer(inv -> { String path = inv.getArgumentAt(0, String.class); if (path == null) { throw new IllegalArgumentException("Path is null."); } if (path.endsWith(ledgerName)) { // ledger MLDataFormats.ManagedLedgerInfo.Builder mli = MLDataFormats.ManagedLedgerInfo.newBuilder() .addLedgerInfo(0, MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder() .setLedgerId(0) .setEntries(0) .setTimestamp(System.currentTimeMillis())); Stat stat = new Stat(path, version, createTimeMillis, createTimeMillis); return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("store.get return with ml"); return Optional.of(new GetResult(mli.build().toByteArray(), stat)); }); } else if (path.contains(ledgerName)) { // cursor MLDataFormats.ManagedCursorInfo.Builder mci = MLDataFormats.ManagedCursorInfo.newBuilder() .setCursorsLedgerId(-1) .setMarkDeleteLedgerId(0) .setMarkDeleteLedgerId(-1); Stat stat = new Stat(path, version, createTimeMillis, createTimeMillis); return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("store.get return with mc"); return Optional.of(new GetResult(mci.build().toByteArray(), stat)); }); } else { throw new IllegalArgumentException("Invalid path: " + path); } }); given(metadataStore.put(anyString(), any(), any())).willAnswer(inv -> { Optional<Long> expectedVersion = inv.getArgumentAt(2, Optional.class); return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return new Stat(inv.getArgumentAt(0, String.class), expectedVersion.orElse(0L) + 1, createTimeMillis, System.currentTimeMillis()); }); }); given(metadataStore.getChildren(anyString())).willAnswer(inv -> { return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return Collections.singletonList("cursor"); }); }); BookKeeper bookKeeper = mock(BookKeeper.class); LedgerHandle ledgerHandle = mock(LedgerHandle.class); LedgerHandle newLedgerHandle = mock(LedgerHandle.class); OrderedExecutor executor = OrderedExecutor.newBuilder().name("Test").build(); given(bookKeeper.getMainWorkerPool()).willReturn(executor); doAnswer(inv -> { Thread.sleep(100); AsyncCallback.OpenCallback cb = inv.getArgumentAt(3, AsyncCallback.OpenCallback.class); cb.openComplete(0, ledgerHandle, inv.getArgumentAt(4, Object.class)); return null; }).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(), any()); doAnswer(inv -> { Thread.sleep(100); AsyncCallback.CreateCallback cb = inv.getArgumentAt(5, AsyncCallback.CreateCallback.class); cb.createComplete(0, newLedgerHandle, inv.getArgumentAt(6, Object.class)); return null; }).when(bookKeeper).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any()/*callback*/, any(), any()); ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bookKeeper); CountDownLatch countDownLatch = new CountDownLatch(1); CompletableFuture.runAsync(() -> { try { ManagedLedger ml = factory.open(ledgerName); } catch (Throwable e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }); Thread.sleep(5000); System.out.println("Shutdown..."); factory.shutdown(); if (!countDownLatch.await(60, TimeUnit.SECONDS)) { fail("open() not returned in time."); } } } ``` **Expected behavior** ManagedLedgerFactoryImpl.open()/asyncOpen() should fail with error instead of hang forever. **Additional context** VERSION: v2.8.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org