littleorca opened a new issue #11412:
URL: https://github.com/apache/pulsar/issues/11412


   **Describe the bug**
   For the moment that ManagedLedgerFactoryImpl.open() runs into 
MetaStoreImpl.getManagedLedgerInfo() and waiting for the future returned by 
store.get() to complete, if ManagedLedgerFactoryImpl.shutdown() is invoked 
then, the executor will be shutdown, and thus the future will be unable to run 
following stage after it's completion, as the executor will reject. 
Unfortunately, the exceptionally() also requires the executor, and the failure 
callback won't be called, open() will never return.
   
   **To Reproduce**
   Refer to the following test:
   
   ```java
   public class ManagedLedgerTest {
   
       @Test
       public void openEncounteredShutdown() throws Exception {
           final String ledgerName = UUID.randomUUID().toString();
           final long version = 0;
           final long createTimeMillis = System.currentTimeMillis();
   
           MetadataStore metadataStore = mock(MetadataStore.class);
           given(metadataStore.get(any())).willAnswer(inv -> {
               String path = inv.getArgumentAt(0, String.class);
               if (path == null) {
                   throw new IllegalArgumentException("Path is null.");
               }
               if (path.endsWith(ledgerName)) { // ledger
                   MLDataFormats.ManagedLedgerInfo.Builder mli = 
MLDataFormats.ManagedLedgerInfo.newBuilder()
                           .addLedgerInfo(0, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder()
                                   .setLedgerId(0)
                                   .setEntries(0)
                                   .setTimestamp(System.currentTimeMillis()));
                   Stat stat = new Stat(path, version, createTimeMillis, 
createTimeMillis);
                   return CompletableFuture.supplyAsync(() -> {
                       try {
                           Thread.sleep(10000);
                       } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
                       System.out.println("store.get return with ml");
                       return Optional.of(new 
GetResult(mli.build().toByteArray(), stat));
                   });
   
               } else if (path.contains(ledgerName)) { // cursor
                   MLDataFormats.ManagedCursorInfo.Builder mci = 
MLDataFormats.ManagedCursorInfo.newBuilder()
                           .setCursorsLedgerId(-1)
                           .setMarkDeleteLedgerId(0)
                           .setMarkDeleteLedgerId(-1);
                   Stat stat = new Stat(path, version, createTimeMillis, 
createTimeMillis);
                   return CompletableFuture.supplyAsync(() -> {
                       try {
                           Thread.sleep(10000);
                       } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
                       System.out.println("store.get return with mc");
                       return Optional.of(new 
GetResult(mci.build().toByteArray(), stat));
                   });
   
               } else {
                   throw new IllegalArgumentException("Invalid path: " + path);
               }
           });
           given(metadataStore.put(anyString(), any(), any())).willAnswer(inv 
-> {
               Optional<Long> expectedVersion = inv.getArgumentAt(2, 
Optional.class);
               return CompletableFuture.supplyAsync(() -> {
                   try {
                       Thread.sleep(100);
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
                   return new Stat(inv.getArgumentAt(0, String.class), 
expectedVersion.orElse(0L) + 1, createTimeMillis, System.currentTimeMillis());
               });
           });
           given(metadataStore.getChildren(anyString())).willAnswer(inv -> {
               return CompletableFuture.supplyAsync(() -> {
                   try {
                       Thread.sleep(100);
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
                   return Collections.singletonList("cursor");
               });
           });
   
           BookKeeper bookKeeper = mock(BookKeeper.class);
           LedgerHandle ledgerHandle = mock(LedgerHandle.class);
           LedgerHandle newLedgerHandle = mock(LedgerHandle.class);
           OrderedExecutor executor = 
OrderedExecutor.newBuilder().name("Test").build();
           given(bookKeeper.getMainWorkerPool()).willReturn(executor);
           doAnswer(inv -> {
               Thread.sleep(100);
               AsyncCallback.OpenCallback cb = inv.getArgumentAt(3, 
AsyncCallback.OpenCallback.class);
               cb.openComplete(0, ledgerHandle, inv.getArgumentAt(4, 
Object.class));
               return null;
           }).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(), 
any());
           doAnswer(inv -> {
               Thread.sleep(100);
               AsyncCallback.CreateCallback cb = inv.getArgumentAt(5, 
AsyncCallback.CreateCallback.class);
               cb.createComplete(0, newLedgerHandle, inv.getArgumentAt(6, 
Object.class));
               return null;
           }).when(bookKeeper).asyncCreateLedger(anyInt(), anyInt(), anyInt(), 
any(), any(), any()/*callback*/, any(), any());
   
           ManagedLedgerFactoryImpl factory = new 
ManagedLedgerFactoryImpl(metadataStore, bookKeeper);
           CountDownLatch countDownLatch = new CountDownLatch(1);
           CompletableFuture.runAsync(() -> {
               try {
                   ManagedLedger ml = factory.open(ledgerName);
               } catch (Throwable e) {
                   e.printStackTrace();
               } finally {
                   countDownLatch.countDown();
               }
           });
   
           Thread.sleep(5000);
           System.out.println("Shutdown...");
           factory.shutdown();
   
           if (!countDownLatch.await(60, TimeUnit.SECONDS)) {
               fail("open() not returned in time.");
           }
       }
   }
   ```
   
   **Expected behavior**
   ManagedLedgerFactoryImpl.open()/asyncOpen() should fail with error instead 
of hang forever.
   
   **Additional context**
   VERSION: v2.8.0
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to