This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch xiangying/cherry-pick-2.10/tired_storage in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6706c4b633eaf10e94dc9bfd34c2549fd53cdebc Author: Andrey Yegorov <[email protected]> AuthorDate: Thu Sep 29 02:06:31 2022 -0700 [Fix][Tiered Storage] Eagerly Delete Offloaded Segments On Topic Deletion (#15914) * Truncate topic before deletion to avoid orphaned offloaded ledgers * CR feedback (cherry picked from commit 9026d1954d180cfb4b3a38f52217b14a3b5e3dc0) --- .../bookkeeper/mledger/ManagedLedgerFactory.java | 20 ++ .../mledger/impl/ManagedLedgerFactoryImpl.java | 111 +++++++++-- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 56 ++++-- .../bookkeeper/mledger/offload/OffloadUtils.java | 29 +++ .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 8 +- .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 60 ++++++ .../pulsar/broker/service/BrokerService.java | 32 ++-- .../broker/service/persistent/PersistentTopic.java | 98 ++++++---- .../pulsar/broker/service/PersistentTopicTest.java | 2 + .../tests/integration/offload/TestBaseOffload.java | 211 +++++++++++++++++++-- .../integration/offload/TestFileSystemOffload.java | 4 +- .../integration/offload/TestOffloadDeletionFS.java | 144 ++++++++++++++ .../tests/integration/offload/TestS3Offload.java | 2 +- .../offload/TestUniversalConfigurations.java | 2 +- .../suites/PulsarTieredStorageTestSuite.java | 4 +- 15 files changed, 660 insertions(+), 123 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java index e42c2581ba1..21841544f81 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java @@ -145,6 +145,16 @@ public interface ManagedLedgerFactory { */ void delete(String name) throws InterruptedException, ManagedLedgerException; + /** + * Delete a managed ledger. If it's not open, it's metadata will get regardless deleted. + * + * @param name + * @throws InterruptedException + * @throws ManagedLedgerException + */ + void delete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture) + throws InterruptedException, ManagedLedgerException; + /** * Delete a managed ledger. If it's not open, it's metadata will get regardless deleted. * @@ -154,6 +164,16 @@ public interface ManagedLedgerFactory { */ void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx); + /** + * Delete a managed ledger. If it's not open, it's metadata will get regardless deleted. + * + * @param name + * @throws InterruptedException + * @throws ManagedLedgerException + */ + void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture, + DeleteLedgerCallback callback, Object ctx); + /** * Releases all the resources maintained by the ManagedLedgerFactory. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index e6b7d9917ee..bdc6f1bf332 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -26,8 +26,10 @@ import com.google.common.collect.Maps; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; @@ -69,6 +71,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl; +import org.apache.bookkeeper.mledger.offload.OffloadUtils; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; @@ -76,6 +79,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; @@ -795,12 +799,18 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { @Override public void delete(String name) throws InterruptedException, ManagedLedgerException { + delete(name, CompletableFuture.completedFuture(null)); + } + + @Override + public void delete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture) + throws InterruptedException, ManagedLedgerException { class Result { ManagedLedgerException e = null; } final Result r = new Result(); final CountDownLatch latch = new CountDownLatch(1); - asyncDelete(name, new DeleteLedgerCallback() { + asyncDelete(name, mlConfigFuture, new DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { latch.countDown(); @@ -822,10 +832,16 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { @Override public void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx) { + asyncDelete(name, CompletableFuture.completedFuture(null), callback, ctx); + } + + @Override + public void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture, + DeleteLedgerCallback callback, Object ctx) { CompletableFuture<ManagedLedgerImpl> future = ledgers.get(name); if (future == null) { // Managed ledger does not exist and we're not currently trying to open it - deleteManagedLedger(name, callback, ctx); + deleteManagedLedger(name, mlConfigFuture, callback, ctx); } else { future.thenAccept(ml -> { // If it's open, delete in the normal way @@ -840,7 +856,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { /** * Delete all managed ledger resources and metadata. */ - void deleteManagedLedger(String managedLedgerName, DeleteLedgerCallback callback, Object ctx) { + void deleteManagedLedger(String managedLedgerName, CompletableFuture<ManagedLedgerConfig> mlConfigFuture, + DeleteLedgerCallback callback, Object ctx) { // Read the managed ledger metadata from store asyncGetManagedLedgerInfo(managedLedgerName, new ManagedLedgerInfoCallback() { @Override @@ -852,7 +869,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { .map(e -> deleteCursor(bkc, managedLedgerName, e.getKey(), e.getValue())) .collect(Collectors.toList()); Futures.waitForAll(futures).thenRun(() -> { - deleteManagedLedgerData(bkc, managedLedgerName, info, callback, ctx); + deleteManagedLedgerData(bkc, managedLedgerName, info, mlConfigFuture, callback, ctx); }).exceptionally(ex -> { callback.deleteLedgerFailed(new ManagedLedgerException(ex), ctx); return null; @@ -867,22 +884,80 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { } private void deleteManagedLedgerData(BookKeeper bkc, String managedLedgerName, ManagedLedgerInfo info, - DeleteLedgerCallback callback, Object ctx) { + CompletableFuture<ManagedLedgerConfig> mlConfigFuture, + DeleteLedgerCallback callback, Object ctx) { + final CompletableFuture<Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>> + ledgerInfosFuture = new CompletableFuture<>(); + store.getManagedLedgerInfo(managedLedgerName, false, null, + new MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() { + @Override + public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { + Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> infos = new HashMap<>(); + for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : mlInfo.getLedgerInfoList()) { + infos.put(ls.getLedgerId(), ls); + } + ledgerInfosFuture.complete(infos); + } + + @Override + public void operationFailed(MetaStoreException e) { + log.error("Failed to get managed ledger info for {}", managedLedgerName, e); + ledgerInfosFuture.completeExceptionally(e); + } + }); + Futures.waitForAll(info.ledgers.stream() - .filter(li -> !li.isOffloaded) - .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute() - .handle((result, ex) -> { - if (ex != null) { - int rc = BKException.getExceptionCode(ex); - if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException - || rc == BKException.Code.NoSuchLedgerExistsException) { - log.info("Ledger {} does not exist, ignoring", li.ledgerId); - return null; - } - throw new CompletionException(ex); + .map(li -> { + final CompletableFuture<Void> res; + if (li.isOffloaded) { + res = mlConfigFuture + .thenCombine(ledgerInfosFuture, Pair::of) + .thenCompose(pair -> { + ManagedLedgerConfig mlConfig = pair.getLeft(); + Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerInfos = pair.getRight(); + + if (mlConfig == null || ledgerInfos == null) { + return CompletableFuture.completedFuture(null); } - return result; - })) + + MLDataFormats.ManagedLedgerInfo.LedgerInfo ls = ledgerInfos.get(li.ledgerId); + + if (ls.getOffloadContext().hasUidMsb()) { + MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder newInfoBuilder = ls.toBuilder(); + newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true); + String driverName = OffloadUtils.getOffloadDriverName(ls, + mlConfig.getLedgerOffloader().getOffloadDriverName()); + Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls, + mlConfig.getLedgerOffloader().getOffloadDriverMetadata()); + OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata); + + UUID uuid = new UUID(ls.getOffloadContext().getUidMsb(), + ls.getOffloadContext().getUidLsb()); + return OffloadUtils.cleanupOffloaded(li.ledgerId, uuid, mlConfig, + OffloadUtils.getOffloadDriverMetadata(ls, + mlConfig.getLedgerOffloader().getOffloadDriverMetadata()), + "Deletion", managedLedgerName, scheduledExecutor); + } + + return CompletableFuture.completedFuture(null); + }); + } else { + res = CompletableFuture.completedFuture(null); + } + return res.thenCompose(__ -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute() + .handle((result, ex) -> { + if (ex != null) { + int rc = BKException.getExceptionCode(ex); + if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException + || rc == BKException.Code.NoSuchLedgerExistsException) { + log.info("Ledger {} does not exist, ignoring", li.ledgerId); + return null; + } + throw new CompletionException(ex); + } + return result; + })); + }) .collect(Collectors.toList())) .thenRun(() -> { // Delete the metadata diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 012ef388010..0f165e3105c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2446,7 +2446,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) { if (!factory.isMetadataServiceAvailable()) { // Defer trimming of ledger if we cannot connect to metadata service - promise.complete(null); + promise.completeExceptionally(new MetaStoreException("Metadata service is not available")); return; } @@ -2735,11 +2735,30 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) { + // Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and // ledgers setFenced(); cancelScheduledTasks(); + // Truncate to ensure the offloaded data is not orphaned. + // Also ensures the BK ledgers are deleted and not just scheduled for deletion + CompletableFuture<Void> truncateFuture = asyncTruncate(); + truncateFuture.whenComplete((ignore, exc) -> { + if (exc != null) { + log.error("[{}] Error truncating ledger for deletion", name, exc); + callback.deleteLedgerFailed(exc instanceof ManagedLedgerException + ? (ManagedLedgerException) exc : new ManagedLedgerException(exc), + ctx); + } else { + asyncDeleteInternal(callback, ctx); + } + }); + + } + + private void asyncDeleteInternal(final DeleteLedgerCallback callback, final Object ctx) { + List<ManagedCursor> cursors = Lists.newArrayList(this.cursors); if (cursors.isEmpty()) { // No cursors to delete, proceed with next step @@ -2797,10 +2816,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (info.getOffloadContext().hasUidMsb()) { UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); - cleanupOffloaded(ledgerId, uuid, - OffloadUtils.getOffloadDriverName(info, config.getLedgerOffloader().getOffloadDriverName()), + OffloadUtils.cleanupOffloaded(ledgerId, uuid, config, OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()), - "Trimming"); + "Trimming", name, scheduledExecutor); } } @@ -2855,7 +2873,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { default: // Handle error log.warn("[{}] Failed to delete ledger {} -- {}", name, ls.getLedgerId(), - BKException.getMessage(rc)); + BKException.getMessage(rc) + " code " + rc); int toDelete = ledgersToDelete.get(); if (toDelete != -1 && ledgersToDelete.compareAndSet(toDelete, -1)) { // Trigger callback only once @@ -3044,18 +3062,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // it is possible to get a BadVersion or other exception after retrying. // So we don't clean up the data if it has metadata operation exception. log.error("[{}] Failed to update offloaded metadata for the ledgerId {}, " - + "the offloaded data will not be cleaned up", - name, ledgerId, exception); - return; + + "the offloaded data will not be cleaned up", + name, ledgerId, exception); } else { log.error("[{}] Failed to offload data for the ledgerId {}, " - + "clean up the offloaded data", - name, ledgerId, exception); + + "clean up the offloaded data", + name, ledgerId, exception); } - cleanupOffloaded( - ledgerId, uuid, - driverName, driverMetadata, - "Metastore failure"); + OffloadUtils.cleanupOffloaded( + ledgerId, uuid, config, + driverMetadata, + "Metastore failure", name, scheduledExecutor); } }); }) @@ -3174,14 +3191,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { oldInfo.getOffloadContext().getUidLsb()); log.info("[{}] Found previous offload attempt for ledger {}, uuid {}" + ", cleaning up", name, ledgerId, uuid); - cleanupOffloaded( + OffloadUtils.cleanupOffloaded( ledgerId, oldUuid, - OffloadUtils.getOffloadDriverName(oldInfo, - config.getLedgerOffloader().getOffloadDriverName()), + config, OffloadUtils.getOffloadDriverMetadata(oldInfo, config.getLedgerOffloader().getOffloadDriverMetadata()), - "Previous failed offload"); + "Previous failed offload", + name, + scheduledExecutor); } LedgerInfo.Builder builder = oldInfo.toBuilder(); builder.getOffloadContextBuilder() @@ -3781,7 +3799,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return new NonRecoverableLedgerException(BKException.getMessage(bkErrorCode)); } } else { - return new ManagedLedgerException(BKException.getMessage(bkErrorCode)); + return new ManagedLedgerException(BKException.getMessage(bkErrorCode) + " error code: " + bkErrorCode); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java index 4e019bd1642..d114c82a486 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java @@ -22,14 +22,21 @@ import com.google.common.collect.Maps; import com.google.protobuf.ByteString; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerMetadataBuilder; import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.common.util.Backoff; +import org.apache.bookkeeper.common.util.Retries; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.proto.MLDataFormats.KeyValue; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext; @@ -181,4 +188,26 @@ public final class OffloadUtils { return builder.build(); } + + public static CompletableFuture<Void> cleanupOffloaded(long ledgerId, UUID uuid, ManagedLedgerConfig mlConfig, + Map<String, String> offloadDriverMetadata, String cleanupReason, + String name, org.apache.bookkeeper.common.util.OrderedScheduler executor) { + log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the reason {}.", + name, ledgerId, uuid.toString(), cleanupReason); + Map<String, String> metadataMap = new HashMap(); + metadataMap.putAll(offloadDriverMetadata); + metadataMap.put("ManagedLedgerName", name); + + return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), + TimeUnit.SECONDS.toHours(1)).limit(10), + Retries.NonFatalPredicate, + () -> mlConfig.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, metadataMap), + executor, name).whenComplete((ignored, exception) -> { + if (exception != null) { + log.warn("[{}] Error cleaning up offload for {}, (cleanup reason: {})", + name, ledgerId, cleanupReason, exception); + } + }); + } + } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 68e72786be7..cdc62b90618 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3011,7 +3011,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {}, Collections.emptyMap()); retryStrategically((test) -> responseException1.get() != null, 5, 1000); assertNotNull(responseException1.get()); - assertEquals(responseException1.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException)); + assertTrue(responseException1.get().getMessage() + .startsWith(BKException.getMessage(BKException.Code.TimeoutException))); // (2) test read-timeout for: ManagedLedger.asyncReadEntry(..) AtomicReference<ManagedLedgerException> responseException2 = new AtomicReference<>(); @@ -3036,13 +3037,14 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { return responseException2.get() != null; }, 5, 1000); assertNotNull(responseException2.get()); - assertEquals(responseException2.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException)); + assertTrue(responseException2.get().getMessage() + .startsWith(BKException.getMessage(BKException.Code.TimeoutException))); ledger.close(); } /** - * It verifies that if bk-client doesn't complete the add-entry in given time out then broker is resilient enought + * It verifies that if bk-client doesn't complete the add-entry in given time out then broker is resilient enough * to create new ledger and add entry successfully. * * diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index b9747aa772a..dcdc6a9bbcb 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -35,10 +35,13 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BooleanSupplier; import java.util.stream.Collectors; +import com.google.common.collect.Sets; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -680,6 +683,63 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase { assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedger)); } + @Test + public void testOffloadDeleteClosedLedger() throws Exception { + MockLedgerOffloader offloader = new MockLedgerOffloader(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(0, TimeUnit.MINUTES); + offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(100L); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L); + config.setLedgerOffloader(offloader); + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + ManagedCursor cursor = ledger.openCursor("foobar"); + + for (int i = 0; i < 15; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + + assertEquals(ledger.getLedgersInfoAsList().size(), 2); + ledger.offloadPrefix(ledger.getLastConfirmedEntry()); + assertEquals(ledger.getLedgersInfoAsList().size(), 2); + + assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()).count(), 1); + assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete()); + + Set<Long> offloadedledgers = Sets.newHashSet(offloader.offloadedLedgers()); + assertTrue(offloadedledgers.size() > 0); + + Set<Long> bkLedgersInMLedger = Sets.newHashSet(ledger.getLedgersInfo().keySet()); + assertTrue(bkLedgersInMLedger.size() > 0); + + factory.close(ledger); + ledger.close(); + + AtomicInteger success = new AtomicInteger(0); + factory.asyncDelete("my_test_ledger", CompletableFuture.completedFuture(config), + new AsyncCallbacks.DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + success.set(1); + } + + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + success.set(-1); + } + }, null); + assertEventuallyTrue(() -> success.get() == 1); + Set<Long> deletedledgers = offloader.deletedOffloads(); + assertEquals(offloadedledgers, deletedledgers); + + for (long ledgerId: bkLedgersInMLedger) { + assertFalse(bkc.getLedgers().contains(ledgerId)); + } + } + @Test public void testOffloadDeleteIncomplete() throws Exception { Set<Pair<Long, UUID>> deleted = ConcurrentHashMap.newKeySet(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b8ae31267f6..edb8dfcdbb0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1022,7 +1022,9 @@ public class BrokerService implements Closeable { } public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete, boolean deleteSchema) { + TopicName topicName = TopicName.get(topic); Optional<Topic> optTopic = getTopicReference(topic); + if (optTopic.isPresent()) { Topic t = optTopic.get(); if (forceDelete) { @@ -1055,9 +1057,8 @@ public class BrokerService implements Closeable { } } - if (log.isDebugEnabled()) { - log.debug("Topic {} is not loaded, try to delete from metadata", topic); - } + log.info("Topic {} is not loaded, try to delete from metadata", topic); + // Topic is not loaded, though we still might be able to delete from metadata TopicName tn = TopicName.get(topic); if (!tn.isPersistent()) { @@ -1066,28 +1067,29 @@ public class BrokerService implements Closeable { } CompletableFuture<Void> future = new CompletableFuture<>(); - CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>(); deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); + deleteTopicAuthenticationFuture.whenComplete((v, ex) -> { if (ex != null) { future.completeExceptionally(ex); return; } - managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), new DeleteLedgerCallback() { - @Override - public void deleteLedgerComplete(Object ctx) { - future.complete(null); - } + CompletableFuture<ManagedLedgerConfig> mlConfigFuture = getManagedLedgerConfig(topicName); + managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), + mlConfigFuture, new DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + future.complete(null); + } - @Override - public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - }, null); + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); }); - return future; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 020b9616980..bad19776910 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1009,10 +1009,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal public CompletableFuture<Void> unsubscribe(String subscriptionName) { CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>(); + TopicName tn = TopicName.get(MLPendingAckStore + .getTransactionPendingAckStoreSuffix(topic, + Codec.encode(subscriptionName))); if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) { - getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore - .getTransactionPendingAckStoreSuffix(topic, - Codec.encode(subscriptionName))).getPersistenceNamingEncoding(), + getBrokerService().getManagedLedgerFactory().asyncDelete(tn.getPersistenceNamingEncoding(), + getBrokerService().getManagedLedgerConfig(tn), new AsyncCallbacks.DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { @@ -1203,47 +1205,61 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>(); subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub))); - FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { - if (e != null) { - log.error("[{}] Error deleting topic", topic, e); - unfenceTopicToResume(); - deleteFuture.completeExceptionally(e); - } else { - ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { - @Override - public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFromCache(PersistentTopic.this); - - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - - subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - - unregisterTopicPolicyListener(); - - log.info("[{}] Topic deleted", topic); - deleteFuture.complete(null); - } - - @Override - public void deleteLedgerFailed(ManagedLedgerException exception, - Object ctx) { - if (exception.getCause() - instanceof MetadataStoreException.NotFoundException) { - log.info("[{}] Topic is already deleted {}", - topic, exception.getMessage()); - deleteLedgerComplete(ctx); - } else { - unfenceTopicToResume(); - log.error("[{}] Error deleting topic", topic, exception); - deleteFuture.completeExceptionally( - new PersistenceException(exception)); - } + FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { + if (e != null) { + log.error("[{}] Error deleting topic", topic, e); + unfenceTopicToResume(); + deleteFuture.completeExceptionally(e); + } else { + // Truncate to ensure the offloaded data is not orphaned. + // Also ensures the BK ledgers are deleted and not just + // scheduled for deletion + CompletableFuture<Void> truncateFuture = ledger.asyncTruncate(); + truncateFuture.whenComplete((ignore, exc) -> { + if (e != null) { + log.error("[{}] Error truncating topic", topic, e); + unfenceTopicToResume(); + deleteFuture.completeExceptionally(e); + } else { + ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + brokerService.removeTopicFromCache(PersistentTopic.this); + + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + + subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); + + unregisterTopicPolicyListener(); + + log.info("[{}] Topic deleted", topic); + deleteFuture.complete(null); + } + + @Override + public void + deleteLedgerFailed(ManagedLedgerException exception, + Object ctx) { + if (exception.getCause() + instanceof MetadataStoreException.NotFoundException) { + log.info("[{}] Topic is already deleted {}", + topic, exception.getMessage()); + deleteLedgerComplete(ctx); + } else { + unfenceTopicToResume(); + log.error("[{}] Error deleting topic", + topic, exception); + deleteFuture.completeExceptionally( + new PersistenceException(exception)); + } + } + }, null); + } + }); } - }, null); + }); } }); - } - }); }).exceptionally(ex->{ unfenceTopicToResume(); deleteFuture.completeExceptionally( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 24b056d3a56..b6d78f8b8c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1249,6 +1249,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { @Test public void testDeleteTopic() throws Exception { + doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate(); + // create topic PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java index f5a320e5820..819f905a92d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java @@ -20,6 +20,7 @@ package org.apache.pulsar.tests.integration.offload; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -32,14 +33,17 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.tests.integration.suites.PulsarTieredStorageTestSuite; +import org.awaitility.Awaitility; import org.testng.Assert; @Slf4j public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite { - private static final int ENTRY_SIZE = 1024; + protected int getEntrySize() { + return 1024; + }; - private static byte[] buildEntry(String pattern) { - byte[] entry = new byte[ENTRY_SIZE]; + private byte[] buildEntry(String pattern) { + byte[] entry = new byte[getEntrySize()]; byte[] patternBytes = pattern.getBytes(); for (int i = 0; i < entry.length; i++) { @@ -64,15 +68,24 @@ public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite { long firstLedger = -1; try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); Producer<byte[]> producer = client.newProducer().topic(topic) + .maxPendingMessages(getNumEntriesPerLedger() / 2).sendTimeout(60, TimeUnit.SECONDS) .blockIfQueueFull(true).enableBatching(false).create();) { client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close(); // write enough to topic to make it roll int i = 0; - for (; i < ENTRIES_PER_LEDGER * 1.5; i++) { - producer.sendAsync(buildEntry("offload-message" + i)); + AtomicBoolean success = new AtomicBoolean(true); + + for (; i < getNumEntriesPerLedger() * 1.5; i++) { + producer.sendAsync(buildEntry("offload-message" + i)) + .exceptionally(e -> { + log.error("failed to send a message", e); + success.set(false); + return null; + });; } producer.flush(); + Assert.assertTrue(success.get()); } try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) { @@ -113,7 +126,7 @@ public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite { try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) { // read back from topic - for (int i = 0; i < ENTRIES_PER_LEDGER * 1.5; i++) { + for (int i = 0; i < getNumEntriesPerLedger() * 1.5; i++) { Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES); Assert.assertEquals(buildEntry("offload-message" + i), m.getData()); } @@ -138,25 +151,32 @@ public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite { long firstLedger = 0; try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); Producer<byte[]> producer = client.newProducer().topic(topic) - .blockIfQueueFull(true).enableBatching(false).create(); - ) { + .maxPendingMessages(getNumEntriesPerLedger() / 2).sendTimeout(60, TimeUnit.SECONDS) + .blockIfQueueFull(true).enableBatching(false).create()) { client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close(); + AtomicBoolean success = new AtomicBoolean(true); // write enough to topic to make it roll twice - for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) { - producer.sendAsync(buildEntry("offload-message" + i)); + for (int i = 0; i < getNumEntriesPerLedger() * 2.5; i++) { + producer.sendAsync(buildEntry("offload-message" + i)) + .exceptionally(e -> { + log.error("failed to send a message", e); + success.set(false); + return null; + });; } producer.flush(); + Assert.assertTrue(success.get()); } try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) { firstLedger = admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId; // wait up to 30 seconds for offload to occur - for (int i = 0; i < 300 && !admin.topics().getInternalStats(topic).ledgers.get(0).offloaded; i++) { - Thread.sleep(100); + for (int i = 0; i < 100 && !admin.topics().getInternalStats(topic).ledgers.get(0).offloaded; i++) { + Thread.sleep(300); } Assert.assertTrue(admin.topics().getInternalStats(topic).ledgers.get(0).offloaded); @@ -175,8 +195,9 @@ public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite { try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) { // read back from topic - for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) { + for (int i = 0; i < getNumEntriesPerLedger() * 2.5; i++) { Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES); + Assert.assertNotNull(m); Assert.assertEquals(buildEntry("offload-message" + i), m.getData()); } } @@ -197,30 +218,52 @@ public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite { .map(l -> l.offloaded).findFirst().get(); } - private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic) throws Exception { + private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic) + throws Exception { + return writeAndWaitForOffload(serviceUrl, adminUrl, topic, -1); + } + + private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic, int partitionNum) + throws Exception { try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build(); - Producer producer = client.newProducer().topic(topic) + Producer<byte[]> producer = client.newProducer().topic(topic) + .maxPendingMessages(getNumEntriesPerLedger() / 2).sendTimeout(60, TimeUnit.SECONDS) .blockIfQueueFull(true).enableBatching(false).create(); PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) { - List<PersistentTopicInternalStats.LedgerInfo> ledgers = admin.topics().getInternalStats(topic).ledgers; + String topicToCheck = partitionNum >= 0 + ? topic + "-partition-" + partitionNum + : topic; + + List<PersistentTopicInternalStats.LedgerInfo> ledgers = admin.topics() + .getInternalStats(topicToCheck).ledgers; long currentLedger = ledgers.get(ledgers.size() - 1).ledgerId; client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close(); + AtomicBoolean success = new AtomicBoolean(true); // write enough to topic to make it roll twice - for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) { - producer.sendAsync(buildEntry("offload-message" + i)); + for (int i = 0; + i < getNumEntriesPerLedger() * 2.5 * (partitionNum > 0 ? partitionNum + 1 : 1); + i++) { + producer.sendAsync(buildEntry("offload-message" + i)) + .exceptionally(e -> { + log.error("failed to send a message", e); + success.set(false); + return null; + }); } + producer.flush(); producer.send(buildEntry("final-offload-message")); + Assert.assertTrue(success.get()); // wait up to 30 seconds for offload to occur for (int i = 0; - i < 300 && !ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers, currentLedger); + i < 100 && !ledgerOffloaded(admin.topics().getInternalStats(topicToCheck).ledgers, currentLedger); i++) { - Thread.sleep(100); + Thread.sleep(300); } - Assert.assertTrue(ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers, currentLedger)); + Assert.assertTrue(ledgerOffloaded(admin.topics().getInternalStats(topicToCheck).ledgers, currentLedger)); return currentLedger; } @@ -295,4 +338,130 @@ public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite { Thread.sleep(5000); Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger)); } + + protected void testDeleteOffloadedTopic(String serviceUrl, String adminUrl, + boolean unloadBeforeDelete, int numPartitions) throws Exception { + final String tenant = "offload-test-cli-" + randomName(4); + final String namespace = tenant + "/ns1"; + final String topic = "persistent://" + namespace + "/topic1"; + + pulsarCluster.runAdminCommandOnAnyBroker("tenants", + "create", "--allowed-clusters", pulsarCluster.getClusterName(), + "--admin-roles", "offload-admin", tenant); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "create", "--clusters", pulsarCluster.getClusterName(), namespace); + + // set threshold to offload runs immediately after role + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-offload-threshold", "--size", "0", namespace); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-retention", "--size", "100M", "--time", "100m", namespace); + + String output = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "get-offload-deletion-lag", namespace).getStdout(); + Assert.assertTrue(output.contains("Unset for namespace")); + + if (numPartitions > 0) { + pulsarCluster.runAdminCommandOnAnyBroker("topics", + "create-partitioned-topic", topic, + "--partitions", Integer.toString(numPartitions)); + } else { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "create", topic); + } + + long offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic, numPartitions - 1); + // give it up to 5 seconds to delete, it shouldn't + // so we wait this every time + Thread.sleep(5000); + Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger)); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace, + "--lag", "0m"); + output = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "get-offload-deletion-lag", namespace).getStdout(); + Assert.assertTrue(output.contains("0 minute(s)")); + + offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic, numPartitions - 1); + // wait up to 10 seconds for ledger to be deleted + for (int i = 0; i < 10 && ledgerExistsInBookKeeper(offloadedLedger); i++) { + writeAndWaitForOffload(serviceUrl, adminUrl, topic, numPartitions - 1); + Thread.sleep(1000); + } + + Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger)); + Assert.assertTrue(offloadedLedgerExists(topic, numPartitions - 1, offloadedLedger)); + + if (unloadBeforeDelete) { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "unload", topic); + } + if (numPartitions > 0) { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete-partitioned-topic", topic); + } else { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete", topic); + } + final long ledgerId = offloadedLedger; + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + Assert.assertFalse(offloadedLedgerExists(topic, numPartitions - 1, ledgerId)); + }); + } + + protected void testDeleteOffloadedTopicExistsInBk(String serviceUrl, String adminUrl, + boolean unloadBeforeDelete, int numPartitions) throws Exception { + final String tenant = "offload-test-cli-" + randomName(4); + final String namespace = tenant + "/ns1"; + final String topic = "persistent://" + namespace + "/topic1"; + + pulsarCluster.runAdminCommandOnAnyBroker("tenants", + "create", "--allowed-clusters", pulsarCluster.getClusterName(), + "--admin-roles", "offload-admin", tenant); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "create", "--clusters", pulsarCluster.getClusterName(), namespace); + + // set threshold to offload runs immediately after role + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-offload-threshold", "--size", "0", namespace); + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-retention", "--size", "100M", "--time", "100m", namespace); + + if (numPartitions > 0) { + pulsarCluster.runAdminCommandOnAnyBroker("topics", + "create-partitioned-topic", topic, + "--partitions", Integer.toString(numPartitions)); + } else { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "create", topic); + } + + String output = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "get-offload-deletion-lag", namespace).getStdout(); + Assert.assertTrue(output.contains("Unset for namespace")); + + long offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic, numPartitions - 1); + // give it up to 5 seconds to delete, it shouldn't + // so we wait this every time + Thread.sleep(5000); + Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger)); + + Assert.assertTrue(offloadedLedgerExists(topic, numPartitions - 1, offloadedLedger)); + + if (unloadBeforeDelete) { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "unload", topic); + } + if (numPartitions > 0) { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete-partitioned-topic", topic); + } else { + pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete", topic); + } + final long ledgerId = offloadedLedger; + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + Assert.assertFalse(offloadedLedgerExists(topic, numPartitions - 1, ledgerId)); + }); + Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger)); + } + + protected boolean offloadedLedgerExists(String topic, int partitionNum, long firstLedger) { + throw new RuntimeException("not implemented"); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java index 808aae62e74..46b5b2bacda 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java @@ -41,14 +41,12 @@ public class TestFileSystemOffload extends TestBaseOffload { @Test(dataProvider = "ServiceAndAdminUrls") public void testPublishOffloadAndConsumeDeletionLag(Supplier<String> serviceUrl, Supplier<String> adminUrl) throws Exception { super.testPublishOffloadAndConsumeDeletionLag(serviceUrl.get(), adminUrl.get()); - } - @Override protected Map<String, String> getEnv() { Map<String, String> result = new HashMap<>(); - result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER)); + result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger())); result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); result.put("managedLedgerOffloadDriver", "filesystem"); result.put("fileSystemURI", "file:///"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java new file mode 100644 index 00000000000..4b1739a0cd1 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.offload; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.tests.integration.docker.ContainerExecException; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +@Slf4j +public class TestOffloadDeletionFS extends TestBaseOffload { + + @Override + protected int getEntrySize() { + return 512; + } + + @Override + protected int getNumEntriesPerLedger() { + return 200; + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteOffloadedTopic(Supplier<String> serviceUrl, Supplier<String> adminUrl) throws Exception { + super.testDeleteOffloadedTopic(serviceUrl.get(), adminUrl.get(), false, 0); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteUnloadedOffloadedTopic(Supplier<String> serviceUrl, Supplier<String> adminUrl) + throws Exception { + super.testDeleteOffloadedTopic(serviceUrl.get(), adminUrl.get(), true, 0); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteOffloadedTopicExistsInBk(Supplier<String> serviceUrl, Supplier<String> adminUrl) + throws Exception { + super.testDeleteOffloadedTopicExistsInBk(serviceUrl.get(), adminUrl.get(), false, 0); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteUnloadedOffloadedTopicExistsInBk(Supplier<String> serviceUrl, Supplier<String> adminUrl) + throws Exception { + super.testDeleteOffloadedTopicExistsInBk(serviceUrl.get(), adminUrl.get(), true, 0); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteOffloadedPartitionedTopic(Supplier<String> serviceUrl, Supplier<String> adminUrl) throws Exception { + super.testDeleteOffloadedTopic(serviceUrl.get(), adminUrl.get(), false, 3); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteUnloadedOffloadedPartitionedTopic(Supplier<String> serviceUrl, Supplier<String> adminUrl) + throws Exception { + super.testDeleteOffloadedTopic(serviceUrl.get(), adminUrl.get(), true, 3); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteOffloadedPartitionedTopicExistsInBk(Supplier<String> serviceUrl, Supplier<String> adminUrl) + throws Exception { + super.testDeleteOffloadedTopicExistsInBk(serviceUrl.get(), adminUrl.get(), false, 3); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testDeleteUnloadedOffloadedPartitionedTopicExistsInBk(Supplier<String> serviceUrl, + Supplier<String> adminUrl) throws Exception { + super.testDeleteOffloadedTopicExistsInBk(serviceUrl.get(), adminUrl.get(), true, 3); + } + + @Override + protected Map<String, String> getEnv() { + Map<String, String> result = new HashMap<>(); + result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger())); + result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); + result.put("managedLedgerOffloadDriver", "filesystem"); + result.put("fileSystemURI", "file:///"); + + return result; + } + + @Override + protected boolean offloadedLedgerExists(String topic, int partitionNum, long ledger) { + log.info("offloadedLedgerExists(topic = {}, partitionNum={},ledger={})", + topic, partitionNum, ledger); + if (partitionNum > -1) { + topic = topic + "-partition-" + partitionNum; + } + String managedLedgerName = TopicName.get(topic).getPersistenceNamingEncoding(); + String rootPath = "pulsar/"; + String dirPath = rootPath + managedLedgerName + "/"; + + List<String> result = new LinkedList<>(); + String[] cmds = { + "ls", + "-1", + dirPath + }; + pulsarCluster.getBrokers().forEach(broker -> { + try { + ContainerExecResult res = broker.execCmd(cmds); + log.info("offloadedLedgerExists broker {} 'ls -1 {}' got {}", + broker.getContainerName(), dirPath, res.getStdout()); + Arrays.stream(res.getStdout().split("\n")) + .filter(x -> x.startsWith(ledger + "-")) + .forEach(x -> result.add(x)); + } catch (ContainerExecException ce) { + log.info("offloadedLedgerExists broker {} 'ls -1 {}' got error code {}", + broker.getContainerName(), dirPath, ce.getResult().getExitCode()); + // ignore 2 (No such file or directory) + if (ce.getResult().getExitCode() != 2) { + throw new RuntimeException(ce); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + return !result.isEmpty(); + } + +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java index edbbcfeba5e..a230b13e215 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java @@ -73,7 +73,7 @@ public class TestS3Offload extends TestBaseOffload { @Override protected Map<String, String> getEnv() { Map<String, String> result = new HashMap<>(); - result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER)); + result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger())); result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); result.put("managedLedgerOffloadDriver", "aws-s3"); result.put("s3ManagedLedgerOffloadBucket", "pulsar-integtest"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java index 9c53d801ea1..ef7406113f6 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java @@ -72,7 +72,7 @@ public class TestUniversalConfigurations extends TestBaseOffload { @Override protected Map<String, String> getEnv() { Map<String, String> result = new HashMap<>(); - result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER)); + result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger())); result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); result.put("managedLedgerOffloadDriver", "aws-s3"); result.put("managedLedgerOffloadBucket", "pulsar-integtest"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java index 7811b38e0fd..1c6bb9dc3f3 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java @@ -31,7 +31,9 @@ import org.testng.annotations.BeforeClass; @Slf4j public abstract class PulsarTieredStorageTestSuite extends PulsarClusterTestBase { - protected static final int ENTRIES_PER_LEDGER = 1024; + protected int getNumEntriesPerLedger() { + return 1024; + } @BeforeClass(alwaysRun = true) @Override
