This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 16e6c462211 [fix][offload] Don't cleanup data when offload met 
MetaStore exception (#21686)
16e6c462211 is described below

commit 16e6c462211bd17bc2f67e2b657babedf534feab
Author: Yong Zhang <zhangyong1025...@gmail.com>
AuthorDate: Fri Dec 8 04:20:55 2023 +0800

    [fix][offload] Don't cleanup data when offload met MetaStore exception 
(#21686)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 55 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 9cec660b917..fdcf18f0e4a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3162,7 +3162,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
-    private void offloadLoop(CompletableFuture<PositionImpl> promise, 
Queue<LedgerInfo> ledgersToOffload,
+    void offloadLoop(CompletableFuture<PositionImpl> promise, 
Queue<LedgerInfo> ledgersToOffload,
             PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
         State currentState = getState();
         if (currentState == State.Closed) {
@@ -3210,6 +3210,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                                             log.error("[{}] Failed to update 
offloaded metadata for the ledgerId {}, "
                                                             + "the offloaded 
data will not be cleaned up",
                                                     name, ledgerId, exception);
+                                            return;
                                         } else {
                                             log.error("[{}] Failed to offload 
data for the ledgerId {}, "
                                                             + "clean up the 
offloaded data",
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index ca4e1d10a6c..311e6a23699 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -21,7 +21,9 @@ package org.apache.bookkeeper.mledger.impl;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
@@ -55,10 +57,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
@@ -4151,4 +4155,55 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         }
         return taskCounter;
     }
+
+    @Test
+    public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() 
throws Exception {
+        ManagedLedgerConfig config = spy(new ManagedLedgerConfig());
+        ManagedLedgerImpl ml = spy((ManagedLedgerImpl) 
factory.open("testNoCleanupOffloadLedger", config));
+
+        // mock the ledger offloader
+        LedgerOffloader ledgerOffloader = mock(NullLedgerOffloader.class);
+        when(config.getLedgerOffloader()).thenReturn(ledgerOffloader);
+        when(ledgerOffloader.getOffloadDriverName()).thenReturn("mock");
+
+        // There will have two put call to the metadata store, the first time 
is prepare the offload.
+        // And the second is the complete the offload. This case is testing 
when completing the offload,
+        // the metadata store meets an exception.
+        AtomicInteger metadataPutCallCount = new AtomicInteger(0);
+        metadataStore.failConditional(new MetadataStoreException("mock 
completion error"),
+            (key, value) -> 
key.equals(FaultInjectionMetadataStore.OperationType.PUT) &&
+                metadataPutCallCount.incrementAndGet() == 2);
+
+        // prepare the arguments for the offloadLoop method
+        CompletableFuture<PositionImpl> future = new CompletableFuture<>();
+        Queue<LedgerInfo> ledgersToOffload = new LinkedList<>();
+        LedgerInfo ledgerInfo = 
LedgerInfo.getDefaultInstance().toBuilder().setLedgerId(1).setEntries(10).build();
+        ledgersToOffload.add(ledgerInfo);
+        PositionImpl firstUnoffloaded = new PositionImpl(1, 0);
+        Optional<Throwable> firstError = Optional.empty();
+
+        // mock the read handle to make the offload successful
+        CompletableFuture<ReadHandle> readHandle = new CompletableFuture<>();
+        readHandle.complete(mock(ReadHandle.class));
+        
when(ml.getLedgerHandle(eq(ledgerInfo.getLedgerId()))).thenReturn(readHandle);
+        when(ledgerOffloader.offload(any(), any(), 
anyMap())).thenReturn(CompletableFuture.completedFuture(null));
+
+        ml.ledgers.put(ledgerInfo.getLedgerId(), ledgerInfo);
+
+        // do the offload
+        ml.offloadLoop(future, ledgersToOffload, firstUnoffloaded, firstError);
+
+        // waiting for the offload complete
+        try {
+            future.join();
+            fail("The offload should fail");
+        } catch (Exception e) {
+            // the offload should fail
+            assertTrue(e.getCause().getMessage().contains("mock completion 
error"));
+        }
+
+        // the ledger deletion shouldn't happen
+        verify(ledgerOffloader, times(0))
+            .deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap());
+    }
 }

Reply via email to