This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 16e6c462211 [fix][offload] Don't cleanup data when offload met MetaStore exception (#21686) 16e6c462211 is described below commit 16e6c462211bd17bc2f67e2b657babedf534feab Author: Yong Zhang <zhangyong1025...@gmail.com> AuthorDate: Fri Dec 8 04:20:55 2023 +0800 [fix][offload] Don't cleanup data when offload met MetaStore exception (#21686) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 +- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 55 ++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 9cec660b917..fdcf18f0e4a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3162,7 +3162,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } - private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload, + void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload, PositionImpl firstUnoffloaded, Optional<Throwable> firstError) { State currentState = getState(); if (currentState == State.Closed) { @@ -3210,6 +3210,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.error("[{}] Failed to update offloaded metadata for the ledgerId {}, " + "the offloaded data will not be cleaned up", name, ledgerId, exception); + return; } else { log.error("[{}] Failed to offload data for the ledgerId {}, " + "clean up the offloaded data", diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index ca4e1d10a6c..311e6a23699 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -21,7 +21,9 @@ package org.apache.bookkeeper.mledger.impl; import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -55,10 +57,12 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingQueue; @@ -4151,4 +4155,55 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { } return taskCounter; } + + @Test + public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exception { + ManagedLedgerConfig config = spy(new ManagedLedgerConfig()); + ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("testNoCleanupOffloadLedger", config)); + + // mock the ledger offloader + LedgerOffloader ledgerOffloader = mock(NullLedgerOffloader.class); + when(config.getLedgerOffloader()).thenReturn(ledgerOffloader); + when(ledgerOffloader.getOffloadDriverName()).thenReturn("mock"); + + // There will have two put call to the metadata store, the first time is prepare the offload. + // And the second is the complete the offload. This case is testing when completing the offload, + // the metadata store meets an exception. + AtomicInteger metadataPutCallCount = new AtomicInteger(0); + metadataStore.failConditional(new MetadataStoreException("mock completion error"), + (key, value) -> key.equals(FaultInjectionMetadataStore.OperationType.PUT) && + metadataPutCallCount.incrementAndGet() == 2); + + // prepare the arguments for the offloadLoop method + CompletableFuture<PositionImpl> future = new CompletableFuture<>(); + Queue<LedgerInfo> ledgersToOffload = new LinkedList<>(); + LedgerInfo ledgerInfo = LedgerInfo.getDefaultInstance().toBuilder().setLedgerId(1).setEntries(10).build(); + ledgersToOffload.add(ledgerInfo); + PositionImpl firstUnoffloaded = new PositionImpl(1, 0); + Optional<Throwable> firstError = Optional.empty(); + + // mock the read handle to make the offload successful + CompletableFuture<ReadHandle> readHandle = new CompletableFuture<>(); + readHandle.complete(mock(ReadHandle.class)); + when(ml.getLedgerHandle(eq(ledgerInfo.getLedgerId()))).thenReturn(readHandle); + when(ledgerOffloader.offload(any(), any(), anyMap())).thenReturn(CompletableFuture.completedFuture(null)); + + ml.ledgers.put(ledgerInfo.getLedgerId(), ledgerInfo); + + // do the offload + ml.offloadLoop(future, ledgersToOffload, firstUnoffloaded, firstError); + + // waiting for the offload complete + try { + future.join(); + fail("The offload should fail"); + } catch (Exception e) { + // the offload should fail + assertTrue(e.getCause().getMessage().contains("mock completion error")); + } + + // the ledger deletion shouldn't happen + verify(ledgerOffloader, times(0)) + .deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap()); + } }