gaoran10 commented on code in PR #21686:
URL: https://github.com/apache/pulsar/pull/21686#discussion_r1418679536


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##########
@@ -4152,4 +4156,53 @@ private long calculatePendingTaskCount(OrderedScheduler 
orderedScheduler) {
         }
         return taskCounter;
     }
+
+    @Test
+    public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() 
throws Exception {
+        ManagedLedgerConfig config = spy(new ManagedLedgerConfig());
+        ManagedLedgerImpl ml = spy((ManagedLedgerImpl) 
factory.open("testNoCleanupOffloadLedger", config));
+
+        // mock the ledger offloader
+        LedgerOffloader ledgerOffloader = mock(NullLedgerOffloader.class);
+        when(config.getLedgerOffloader()).thenReturn(ledgerOffloader);
+        when(ledgerOffloader.getOffloadDriverName()).thenReturn("mock");
+
+        // There will have two put call to the metadata store, the first time 
is prepare the offload.
+        // And the second is the complete the offload. This case is testing 
when completing the offload,
+        // the metadata store meets an exception.
+        AtomicInteger metadataPutCallCount = new AtomicInteger(0);
+        metadataStore.failConditional(new MetadataStoreException("mock error"),
+            (key, value) -> 
key.equals(FaultInjectionMetadataStore.OperationType.PUT) &&
+                metadataPutCallCount.incrementAndGet() == 2);
+
+        // prepare the arguments for the offloadLoop method
+        CompletableFuture<PositionImpl> future = new CompletableFuture<>();
+        Queue<LedgerInfo> ledgersToOffload = new LinkedList<>();
+        LedgerInfo ledgerInfo = 
LedgerInfo.getDefaultInstance().toBuilder().setLedgerId(1).setEntries(10).build();
+        ledgersToOffload.add(ledgerInfo);
+        PositionImpl firstUnoffloaded = new PositionImpl(1, 0);
+        Optional<Throwable> firstError = Optional.empty();
+
+        // mock the read handle to make the offload successful
+        CompletableFuture<ReadHandle> readHandle = new CompletableFuture<>();
+        readHandle.complete(mock(ReadHandle.class));
+        
when(ml.getLedgerHandle(eq(ledgerInfo.getLedgerId()))).thenReturn(readHandle);
+        when(ledgerOffloader.offload(any(), any(), 
anyMap())).thenReturn(CompletableFuture.completedFuture(null));
+
+        ml.ledgers.put(ledgerInfo.getLedgerId(), ledgerInfo);
+
+        // do the offload
+        ml.offloadLoop(future, ledgersToOffload, firstUnoffloaded, firstError);
+
+        // waiting for the offload complete
+        try {
+            future.join();
+        } catch (Exception e) {
+            // ignore
+        }
+
+        // the ledger deletion shouldn't happen

Review Comment:
   Do we need to verify the offload encounter metadata exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to