This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch xiangying/cherry-pick-2.10/tired_storage
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6706c4b633eaf10e94dc9bfd34c2549fd53cdebc
Author: Andrey Yegorov <[email protected]>
AuthorDate: Thu Sep 29 02:06:31 2022 -0700

    [Fix][Tiered Storage] Eagerly Delete Offloaded Segments On Topic Deletion 
(#15914)
    
    * Truncate topic before deletion to avoid orphaned offloaded ledgers
    
    * CR feedback
    
    (cherry picked from commit 9026d1954d180cfb4b3a38f52217b14a3b5e3dc0)
---
 .../bookkeeper/mledger/ManagedLedgerFactory.java   |  20 ++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 111 +++++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  56 ++++--
 .../bookkeeper/mledger/offload/OffloadUtils.java   |  29 +++
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |   8 +-
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java |  60 ++++++
 .../pulsar/broker/service/BrokerService.java       |  32 ++--
 .../broker/service/persistent/PersistentTopic.java |  98 ++++++----
 .../pulsar/broker/service/PersistentTopicTest.java |   2 +
 .../tests/integration/offload/TestBaseOffload.java | 211 +++++++++++++++++++--
 .../integration/offload/TestFileSystemOffload.java |   4 +-
 .../integration/offload/TestOffloadDeletionFS.java | 144 ++++++++++++++
 .../tests/integration/offload/TestS3Offload.java   |   2 +-
 .../offload/TestUniversalConfigurations.java       |   2 +-
 .../suites/PulsarTieredStorageTestSuite.java       |   4 +-
 15 files changed, 660 insertions(+), 123 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index e42c2581ba1..21841544f81 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -145,6 +145,16 @@ public interface ManagedLedgerFactory {
      */
     void delete(String name) throws InterruptedException, 
ManagedLedgerException;
 
+    /**
+     * Delete a managed ledger. If it's not open, it's metadata will get 
regardless deleted.
+     *
+     * @param name
+     * @throws InterruptedException
+     * @throws ManagedLedgerException
+     */
+    void delete(String name, CompletableFuture<ManagedLedgerConfig> 
mlConfigFuture)
+            throws InterruptedException, ManagedLedgerException;
+
     /**
      * Delete a managed ledger. If it's not open, it's metadata will get 
regardless deleted.
      *
@@ -154,6 +164,16 @@ public interface ManagedLedgerFactory {
      */
     void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx);
 
+    /**
+     * Delete a managed ledger. If it's not open, it's metadata will get 
regardless deleted.
+     *
+     * @param name
+     * @throws InterruptedException
+     * @throws ManagedLedgerException
+     */
+    void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> 
mlConfigFuture,
+                             DeleteLedgerCallback callback, Object ctx);
+
     /**
      * Releases all the resources maintained by the ManagedLedgerFactory.
      *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index e6b7d9917ee..bdc6f1bf332 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -26,8 +26,10 @@ import com.google.common.collect.Maps;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -69,6 +71,7 @@ import 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
 import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
@@ -76,6 +79,7 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -795,12 +799,18 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
 
     @Override
     public void delete(String name) throws InterruptedException, 
ManagedLedgerException {
+        delete(name, CompletableFuture.completedFuture(null));
+    }
+
+    @Override
+    public void delete(String name, CompletableFuture<ManagedLedgerConfig> 
mlConfigFuture)
+            throws InterruptedException, ManagedLedgerException {
         class Result {
             ManagedLedgerException e = null;
         }
         final Result r = new Result();
         final CountDownLatch latch = new CountDownLatch(1);
-        asyncDelete(name, new DeleteLedgerCallback() {
+        asyncDelete(name, mlConfigFuture, new DeleteLedgerCallback() {
             @Override
             public void deleteLedgerComplete(Object ctx) {
                 latch.countDown();
@@ -822,10 +832,16 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
 
     @Override
     public void asyncDelete(String name, DeleteLedgerCallback callback, Object 
ctx) {
+        asyncDelete(name, CompletableFuture.completedFuture(null), callback, 
ctx);
+    }
+
+    @Override
+    public void asyncDelete(String name, 
CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
+                            DeleteLedgerCallback callback, Object ctx) {
         CompletableFuture<ManagedLedgerImpl> future = ledgers.get(name);
         if (future == null) {
             // Managed ledger does not exist and we're not currently trying to 
open it
-            deleteManagedLedger(name, callback, ctx);
+            deleteManagedLedger(name, mlConfigFuture, callback, ctx);
         } else {
             future.thenAccept(ml -> {
                 // If it's open, delete in the normal way
@@ -840,7 +856,8 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     /**
      * Delete all managed ledger resources and metadata.
      */
-    void deleteManagedLedger(String managedLedgerName, DeleteLedgerCallback 
callback, Object ctx) {
+    void deleteManagedLedger(String managedLedgerName, 
CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
+                             DeleteLedgerCallback callback, Object ctx) {
         // Read the managed ledger metadata from store
         asyncGetManagedLedgerInfo(managedLedgerName, new 
ManagedLedgerInfoCallback() {
             @Override
@@ -852,7 +869,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                         .map(e -> deleteCursor(bkc, managedLedgerName, 
e.getKey(), e.getValue()))
                         .collect(Collectors.toList());
                 Futures.waitForAll(futures).thenRun(() -> {
-                    deleteManagedLedgerData(bkc, managedLedgerName, info, 
callback, ctx);
+                    deleteManagedLedgerData(bkc, managedLedgerName, info, 
mlConfigFuture, callback, ctx);
                 }).exceptionally(ex -> {
                     callback.deleteLedgerFailed(new 
ManagedLedgerException(ex), ctx);
                     return null;
@@ -867,22 +884,80 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     }
 
     private void deleteManagedLedgerData(BookKeeper bkc, String 
managedLedgerName, ManagedLedgerInfo info,
-            DeleteLedgerCallback callback, Object ctx) {
+                                         
CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
+                                         DeleteLedgerCallback callback, Object 
ctx) {
+        final CompletableFuture<Map<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo>>
+                ledgerInfosFuture = new CompletableFuture<>();
+        store.getManagedLedgerInfo(managedLedgerName, false, null,
+                new MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
+                    @Override
+                    public void 
operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
+                        Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
infos = new HashMap<>();
+                        for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : 
mlInfo.getLedgerInfoList()) {
+                            infos.put(ls.getLedgerId(), ls);
+                        }
+                        ledgerInfosFuture.complete(infos);
+                    }
+
+                    @Override
+                    public void operationFailed(MetaStoreException e) {
+                        log.error("Failed to get managed ledger info for {}", 
managedLedgerName, e);
+                        ledgerInfosFuture.completeExceptionally(e);
+                    }
+                });
+
         Futures.waitForAll(info.ledgers.stream()
-                .filter(li -> !li.isOffloaded)
-                .map(li -> 
bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
-                        .handle((result, ex) -> {
-                            if (ex != null) {
-                                int rc = BKException.getExceptionCode(ex);
-                                if (rc == 
BKException.Code.NoSuchLedgerExistsOnMetadataServerException
-                                    || rc == 
BKException.Code.NoSuchLedgerExistsException) {
-                                    log.info("Ledger {} does not exist, 
ignoring", li.ledgerId);
-                                    return null;
-                                }
-                                throw new CompletionException(ex);
+                .map(li -> {
+                    final CompletableFuture<Void> res;
+                    if (li.isOffloaded) {
+                        res = mlConfigFuture
+                                .thenCombine(ledgerInfosFuture, Pair::of)
+                                .thenCompose(pair -> {
+                            ManagedLedgerConfig mlConfig =  pair.getLeft();
+                            Map<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerInfos = pair.getRight();
+
+                            if (mlConfig == null || ledgerInfos == null) {
+                                return CompletableFuture.completedFuture(null);
                             }
-                            return result;
-                        }))
+
+                            MLDataFormats.ManagedLedgerInfo.LedgerInfo ls = 
ledgerInfos.get(li.ledgerId);
+
+                            if (ls.getOffloadContext().hasUidMsb()) {
+                                
MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder newInfoBuilder = 
ls.toBuilder();
+                                
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
+                                String driverName = 
OffloadUtils.getOffloadDriverName(ls,
+                                        
mlConfig.getLedgerOffloader().getOffloadDriverName());
+                                Map<String, String> driverMetadata = 
OffloadUtils.getOffloadDriverMetadata(ls,
+                                        
mlConfig.getLedgerOffloader().getOffloadDriverMetadata());
+                                
OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, 
driverMetadata);
+
+                                UUID uuid = new 
UUID(ls.getOffloadContext().getUidMsb(),
+                                        ls.getOffloadContext().getUidLsb());
+                                return 
OffloadUtils.cleanupOffloaded(li.ledgerId, uuid, mlConfig,
+                                        
OffloadUtils.getOffloadDriverMetadata(ls,
+                                                
mlConfig.getLedgerOffloader().getOffloadDriverMetadata()),
+                                        "Deletion", managedLedgerName, 
scheduledExecutor);
+                            }
+
+                            return CompletableFuture.completedFuture(null);
+                        });
+                    } else {
+                        res = CompletableFuture.completedFuture(null);
+                    }
+                    return res.thenCompose(__ -> 
bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
+                            .handle((result, ex) -> {
+                                if (ex != null) {
+                                    int rc = BKException.getExceptionCode(ex);
+                                    if (rc == 
BKException.Code.NoSuchLedgerExistsOnMetadataServerException
+                                        || rc == 
BKException.Code.NoSuchLedgerExistsException) {
+                                        log.info("Ledger {} does not exist, 
ignoring", li.ledgerId);
+                                        return null;
+                                    }
+                                    throw new CompletionException(ex);
+                                }
+                                return result;
+                        }));
+                })
                 .collect(Collectors.toList()))
                 .thenRun(() -> {
                     // Delete the metadata
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 012ef388010..0f165e3105c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2446,7 +2446,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) 
{
         if (!factory.isMetadataServiceAvailable()) {
             // Defer trimming of ledger if we cannot connect to metadata 
service
-            promise.complete(null);
+            promise.completeExceptionally(new MetaStoreException("Metadata 
service is not available"));
             return;
         }
 
@@ -2735,11 +2735,30 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
 
     @Override
     public void asyncDelete(final DeleteLedgerCallback callback, final Object 
ctx) {
+
         // Delete the managed ledger without closing, since we are not 
interested in gracefully closing cursors and
         // ledgers
         setFenced();
         cancelScheduledTasks();
 
+        // Truncate to ensure the offloaded data is not orphaned.
+        // Also ensures the BK ledgers are deleted and not just scheduled for 
deletion
+        CompletableFuture<Void> truncateFuture = asyncTruncate();
+        truncateFuture.whenComplete((ignore, exc) -> {
+            if (exc != null) {
+                log.error("[{}] Error truncating ledger for deletion", name, 
exc);
+                callback.deleteLedgerFailed(exc instanceof 
ManagedLedgerException
+                        ? (ManagedLedgerException) exc : new 
ManagedLedgerException(exc),
+                        ctx);
+            } else {
+                asyncDeleteInternal(callback, ctx);
+            }
+        });
+
+    }
+
+    private void asyncDeleteInternal(final DeleteLedgerCallback callback, 
final Object ctx) {
+
         List<ManagedCursor> cursors = Lists.newArrayList(this.cursors);
         if (cursors.isEmpty()) {
             // No cursors to delete, proceed with next step
@@ -2797,10 +2816,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
         if (info.getOffloadContext().hasUidMsb()) {
             UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), 
info.getOffloadContext().getUidLsb());
-            cleanupOffloaded(ledgerId, uuid,
-                    OffloadUtils.getOffloadDriverName(info, 
config.getLedgerOffloader().getOffloadDriverName()),
+            OffloadUtils.cleanupOffloaded(ledgerId, uuid, config,
                     OffloadUtils.getOffloadDriverMetadata(info, 
config.getLedgerOffloader().getOffloadDriverMetadata()),
-                    "Trimming");
+                    "Trimming", name, scheduledExecutor);
         }
     }
 
@@ -2855,7 +2873,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 default:
                     // Handle error
                     log.warn("[{}] Failed to delete ledger {} -- {}", name, 
ls.getLedgerId(),
-                            BKException.getMessage(rc));
+                            BKException.getMessage(rc) + " code " + rc);
                     int toDelete = ledgersToDelete.get();
                     if (toDelete != -1 && 
ledgersToDelete.compareAndSet(toDelete, -1)) {
                         // Trigger callback only once
@@ -3044,18 +3062,17 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                                             // it is possible to get a 
BadVersion or other exception after retrying.
                                             // So we don't clean up the data 
if it has metadata operation exception.
                                             log.error("[{}] Failed to update 
offloaded metadata for the ledgerId {}, "
-                                                    + "the offloaded data will 
not be cleaned up",
-                                                name, ledgerId, exception);
-                                            return;
+                                                            + "the offloaded 
data will not be cleaned up",
+                                                    name, ledgerId, exception);
                                         } else {
                                             log.error("[{}] Failed to offload 
data for the ledgerId {}, "
-                                                    + "clean up the offloaded 
data",
-                                                name, ledgerId, exception);
+                                                            + "clean up the 
offloaded data",
+                                                    name, ledgerId, exception);
                                         }
-                                        cleanupOffloaded(
-                                            ledgerId, uuid,
-                                            driverName, driverMetadata,
-                                            "Metastore failure");
+                                        OffloadUtils.cleanupOffloaded(
+                                            ledgerId, uuid, config,
+                                            driverMetadata,
+                                            "Metastore failure", name, 
scheduledExecutor);
                                     }
                                 });
                     })
@@ -3174,14 +3191,15 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                                                                    
oldInfo.getOffloadContext().getUidLsb());
                                            log.info("[{}] Found previous 
offload attempt for ledger {}, uuid {}"
                                                     + ", cleaning up", name, 
ledgerId, uuid);
-                                           cleanupOffloaded(
+                                           OffloadUtils.cleanupOffloaded(
                                                ledgerId,
                                                oldUuid,
-                                               
OffloadUtils.getOffloadDriverName(oldInfo,
-                                                   
config.getLedgerOffloader().getOffloadDriverName()),
+                                               config,
                                                
OffloadUtils.getOffloadDriverMetadata(oldInfo,
                                                    
config.getLedgerOffloader().getOffloadDriverMetadata()),
-                                               "Previous failed offload");
+                                               "Previous failed offload",
+                                               name,
+                                               scheduledExecutor);
                                        }
                                        LedgerInfo.Builder builder = 
oldInfo.toBuilder();
                                        builder.getOffloadContextBuilder()
@@ -3781,7 +3799,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 return new 
NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
             }
         } else {
-            return new 
ManagedLedgerException(BKException.getMessage(bkErrorCode));
+            return new 
ManagedLedgerException(BKException.getMessage(bkErrorCode) + " error code: " + 
bkErrorCode);
         }
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
index 4e019bd1642..d114c82a486 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
@@ -22,14 +22,21 @@ import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerMetadataBuilder;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.Retries;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.KeyValue;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
@@ -181,4 +188,26 @@ public final class OffloadUtils {
 
         return builder.build();
     }
+
+    public static CompletableFuture<Void> cleanupOffloaded(long ledgerId, UUID 
uuid, ManagedLedgerConfig mlConfig,
+                                     Map<String, String> 
offloadDriverMetadata, String cleanupReason,
+                                     String name, 
org.apache.bookkeeper.common.util.OrderedScheduler executor) {
+        log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the 
reason {}.",
+                name, ledgerId, uuid.toString(), cleanupReason);
+        Map<String, String> metadataMap = new HashMap();
+        metadataMap.putAll(offloadDriverMetadata);
+        metadataMap.put("ManagedLedgerName", name);
+
+        return 
Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1),
+                        TimeUnit.SECONDS.toHours(1)).limit(10),
+                Retries.NonFatalPredicate,
+                () -> mlConfig.getLedgerOffloader().deleteOffloaded(ledgerId, 
uuid, metadataMap),
+                executor, name).whenComplete((ignored, exception) -> {
+            if (exception != null) {
+                log.warn("[{}] Error cleaning up offload for {}, (cleanup 
reason: {})",
+                        name, ledgerId, cleanupReason, exception);
+            }
+        });
+    }
+
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 68e72786be7..cdc62b90618 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3011,7 +3011,8 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {}, 
Collections.emptyMap());
         retryStrategically((test) -> responseException1.get() != null, 5, 
1000);
         assertNotNull(responseException1.get());
-        assertEquals(responseException1.get().getMessage(), 
BKException.getMessage(BKException.Code.TimeoutException));
+        assertTrue(responseException1.get().getMessage()
+                
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
 
         // (2) test read-timeout for: ManagedLedger.asyncReadEntry(..)
         AtomicReference<ManagedLedgerException> responseException2 = new 
AtomicReference<>();
@@ -3036,13 +3037,14 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
             return responseException2.get() != null;
         }, 5, 1000);
         assertNotNull(responseException2.get());
-        assertEquals(responseException2.get().getMessage(), 
BKException.getMessage(BKException.Code.TimeoutException));
+        assertTrue(responseException2.get().getMessage()
+                
.startsWith(BKException.getMessage(BKException.Code.TimeoutException)));
 
         ledger.close();
     }
 
     /**
-     * It verifies that if bk-client doesn't complete the add-entry in given 
time out then broker is resilient enought
+     * It verifies that if bk-client doesn't complete the add-entry in given 
time out then broker is resilient enough
      * to create new ledger and add entry successfully.
      *
      *
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index b9747aa772a..dcdc6a9bbcb 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -35,10 +35,13 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -680,6 +683,63 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
         assertEventuallyTrue(() -> 
offloader.deletedOffloads().contains(firstLedger));
     }
 
+    @Test
+    public void testOffloadDeleteClosedLedger() throws Exception {
+        MockLedgerOffloader offloader = new MockLedgerOffloader();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(0, TimeUnit.MINUTES);
+        
offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(100L);
+        
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
+        config.setLedgerOffloader(offloader);
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        ManagedCursor cursor = ledger.openCursor("foobar");
+
+        for (int i = 0; i < 15; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+
+        assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+        assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+
+        assertEquals(ledger.getLedgersInfoAsList().stream()
+                .filter(e -> e.getOffloadContext().getComplete()).count(), 1);
+        
assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
+
+        Set<Long> offloadedledgers = 
Sets.newHashSet(offloader.offloadedLedgers());
+        assertTrue(offloadedledgers.size() > 0);
+
+        Set<Long> bkLedgersInMLedger = 
Sets.newHashSet(ledger.getLedgersInfo().keySet());
+        assertTrue(bkLedgersInMLedger.size() > 0);
+
+        factory.close(ledger);
+        ledger.close();
+
+        AtomicInteger success = new AtomicInteger(0);
+        factory.asyncDelete("my_test_ledger", 
CompletableFuture.completedFuture(config),
+                new AsyncCallbacks.DeleteLedgerCallback() {
+            @Override
+            public void deleteLedgerComplete(Object ctx) {
+                success.set(1);
+            }
+
+            @Override
+            public void deleteLedgerFailed(ManagedLedgerException exception, 
Object ctx) {
+                success.set(-1);
+            }
+        }, null);
+        assertEventuallyTrue(() -> success.get() == 1);
+        Set<Long> deletedledgers = offloader.deletedOffloads();
+        assertEquals(offloadedledgers, deletedledgers);
+
+        for (long ledgerId: bkLedgersInMLedger) {
+            assertFalse(bkc.getLedgers().contains(ledgerId));
+        }
+    }
+
     @Test
     public void testOffloadDeleteIncomplete() throws Exception {
         Set<Pair<Long, UUID>> deleted = ConcurrentHashMap.newKeySet();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b8ae31267f6..edb8dfcdbb0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1022,7 +1022,9 @@ public class BrokerService implements Closeable {
     }
 
     public CompletableFuture<Void> deleteTopic(String topic, boolean 
forceDelete, boolean deleteSchema) {
+        TopicName topicName = TopicName.get(topic);
         Optional<Topic> optTopic = getTopicReference(topic);
+
         if (optTopic.isPresent()) {
             Topic t = optTopic.get();
             if (forceDelete) {
@@ -1055,9 +1057,8 @@ public class BrokerService implements Closeable {
             }
         }
 
-        if (log.isDebugEnabled()) {
-            log.debug("Topic {} is not loaded, try to delete from metadata", 
topic);
-        }
+        log.info("Topic {} is not loaded, try to delete from metadata", topic);
+
         // Topic is not loaded, though we still might be able to delete from 
metadata
         TopicName tn = TopicName.get(topic);
         if (!tn.isPersistent()) {
@@ -1066,28 +1067,29 @@ public class BrokerService implements Closeable {
         }
 
         CompletableFuture<Void> future = new CompletableFuture<>();
-
         CompletableFuture<Void> deleteTopicAuthenticationFuture = new 
CompletableFuture<>();
         deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
+
         deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
             if (ex != null) {
                 future.completeExceptionally(ex);
                 return;
             }
-            
managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), new 
DeleteLedgerCallback() {
-                @Override
-                public void deleteLedgerComplete(Object ctx) {
-                    future.complete(null);
-                }
+            CompletableFuture<ManagedLedgerConfig> mlConfigFuture = 
getManagedLedgerConfig(topicName);
+            managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
+                    mlConfigFuture, new DeleteLedgerCallback() {
+                        @Override
+                        public void deleteLedgerComplete(Object ctx) {
+                            future.complete(null);
+                        }
 
-                @Override
-                public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
-                    future.completeExceptionally(exception);
-                }
-            }, null);
+                        @Override
+                        public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
+                            future.completeExceptionally(exception);
+                        }
+                    }, null);
         });
 
-
         return future;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 020b9616980..bad19776910 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1009,10 +1009,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     public CompletableFuture<Void> unsubscribe(String subscriptionName) {
         CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
 
+        TopicName tn = TopicName.get(MLPendingAckStore
+                .getTransactionPendingAckStoreSuffix(topic,
+                        Codec.encode(subscriptionName)));
         if 
(brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
-            
getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
-                            .getTransactionPendingAckStoreSuffix(topic,
-                                    
Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
+            
getBrokerService().getManagedLedgerFactory().asyncDelete(tn.getPersistenceNamingEncoding(),
+                    getBrokerService().getManagedLedgerConfig(tn),
                     new AsyncCallbacks.DeleteLedgerCallback() {
                         @Override
                         public void deleteLedgerComplete(Object ctx) {
@@ -1203,47 +1205,61 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                     List<CompletableFuture<Void>> 
subsDeleteFutures = new ArrayList<>();
                                     subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
 
-                                
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
-                                    if (e != null) {
-                                        log.error("[{}] Error deleting topic", 
topic, e);
-                                        unfenceTopicToResume();
-                                        deleteFuture.completeExceptionally(e);
-                                    } else {
-                                        ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
-                                            @Override
-                                            public void 
deleteLedgerComplete(Object ctx) {
-                                                
brokerService.removeTopicFromCache(PersistentTopic.this);
-
-                                            
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
-                                            
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
-                                            unregisterTopicPolicyListener();
-
-                                            log.info("[{}] Topic deleted", 
topic);
-                                            deleteFuture.complete(null);
-                                        }
-
-                                        @Override
-                                        public void 
deleteLedgerFailed(ManagedLedgerException exception,
-                                                                       Object 
ctx) {
-                                            if (exception.getCause()
-                                                    instanceof 
MetadataStoreException.NotFoundException) {
-                                                log.info("[{}] Topic is 
already deleted {}",
-                                                        topic, 
exception.getMessage());
-                                                deleteLedgerComplete(ctx);
-                                            } else {
-                                                unfenceTopicToResume();
-                                                log.error("[{}] Error deleting 
topic", topic, exception);
-                                                
deleteFuture.completeExceptionally(
-                                                        new 
PersistenceException(exception));
-                                            }
+                                    
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
+                                        if (e != null) {
+                                            log.error("[{}] Error deleting 
topic", topic, e);
+                                            unfenceTopicToResume();
+                                            
deleteFuture.completeExceptionally(e);
+                                        } else {
+                                            // Truncate to ensure the 
offloaded data is not orphaned.
+                                            // Also ensures the BK ledgers are 
deleted and not just
+                                            // scheduled for deletion
+                                            CompletableFuture<Void> 
truncateFuture = ledger.asyncTruncate();
+                                            
truncateFuture.whenComplete((ignore, exc) -> {
+                                                if (e != null) {
+                                                    log.error("[{}] Error 
truncating topic", topic, e);
+                                                    unfenceTopicToResume();
+                                                    
deleteFuture.completeExceptionally(e);
+                                                } else {
+                                                    ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
+                                                        @Override
+                                                        public void 
deleteLedgerComplete(Object ctx) {
+                                                            
brokerService.removeTopicFromCache(PersistentTopic.this);
+
+                                                            
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+                                                            
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+                                                            
unregisterTopicPolicyListener();
+
+                                                            log.info("[{}] 
Topic deleted", topic);
+                                                            
deleteFuture.complete(null);
+                                                        }
+
+                                                        @Override
+                                                        public void
+                                                        
deleteLedgerFailed(ManagedLedgerException exception,
+                                                                           
Object ctx) {
+                                                            if 
(exception.getCause()
+                                                                    instanceof 
MetadataStoreException.NotFoundException) {
+                                                                log.info("[{}] 
Topic is already deleted {}",
+                                                                        topic, 
exception.getMessage());
+                                                                
deleteLedgerComplete(ctx);
+                                                            } else {
+                                                                
unfenceTopicToResume();
+                                                                
log.error("[{}] Error deleting topic",
+                                                                        topic, 
exception);
+                                                                
deleteFuture.completeExceptionally(
+                                                                        new 
PersistenceException(exception));
+                                                            }
+                                                        }
+                                                    }, null);
+                                                }
+                                            });
                                         }
-                                    }, null);
+                                    });
                                 }
                             });
-                        }
-                    });
             }).exceptionally(ex->{
                 unfenceTopicToResume();
                 deleteFuture.completeExceptionally(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 24b056d3a56..b6d78f8b8c8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1249,6 +1249,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
     @Test
     public void testDeleteTopic() throws Exception {
+        
doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate();
+
         // create topic
         PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
index f5a320e5820..819f905a92d 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.tests.integration.offload;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -32,14 +33,17 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.tests.integration.suites.PulsarTieredStorageTestSuite;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 
 @Slf4j
 public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite {
-    private static final int ENTRY_SIZE = 1024;
+    protected int getEntrySize() {
+        return 1024;
+    };
 
-    private static byte[] buildEntry(String pattern) {
-        byte[] entry = new byte[ENTRY_SIZE];
+    private byte[] buildEntry(String pattern) {
+        byte[] entry = new byte[getEntrySize()];
         byte[] patternBytes = pattern.getBytes();
 
         for (int i = 0; i < entry.length; i++) {
@@ -64,15 +68,24 @@ public abstract class TestBaseOffload extends 
PulsarTieredStorageTestSuite {
         long firstLedger = -1;
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
             Producer<byte[]> producer = client.newProducer().topic(topic)
+                    .maxPendingMessages(getNumEntriesPerLedger() / 
2).sendTimeout(60, TimeUnit.SECONDS)
                     .blockIfQueueFull(true).enableBatching(false).create();) {
             
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
 
             // write enough to topic to make it roll
             int i = 0;
-            for (; i < ENTRIES_PER_LEDGER * 1.5; i++) {
-                producer.sendAsync(buildEntry("offload-message" + i));
+            AtomicBoolean success = new AtomicBoolean(true);
+
+            for (; i < getNumEntriesPerLedger() * 1.5; i++) {
+                producer.sendAsync(buildEntry("offload-message" + i))
+                        .exceptionally(e -> {
+                            log.error("failed to send a message", e);
+                            success.set(false);
+                            return null;
+                        });;
             }
             producer.flush();
+            Assert.assertTrue(success.get());
         }
 
         try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
@@ -113,7 +126,7 @@ public abstract class TestBaseOffload extends 
PulsarTieredStorageTestSuite {
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
             Consumer<byte[]> consumer = 
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
             // read back from topic
-            for (int i = 0; i < ENTRIES_PER_LEDGER * 1.5; i++) {
+            for (int i = 0; i < getNumEntriesPerLedger() * 1.5; i++) {
                 Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
                 Assert.assertEquals(buildEntry("offload-message" + i), 
m.getData());
             }
@@ -138,25 +151,32 @@ public abstract class TestBaseOffload extends 
PulsarTieredStorageTestSuite {
         long firstLedger = 0;
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
             Producer<byte[]> producer = client.newProducer().topic(topic)
-                    .blockIfQueueFull(true).enableBatching(false).create();
-        ) {
+                    .maxPendingMessages(getNumEntriesPerLedger() / 
2).sendTimeout(60, TimeUnit.SECONDS)
+                    .blockIfQueueFull(true).enableBatching(false).create()) {
 
             
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
 
+            AtomicBoolean success = new AtomicBoolean(true);
             // write enough to topic to make it roll twice
-            for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
-                producer.sendAsync(buildEntry("offload-message" + i));
+            for (int i = 0; i < getNumEntriesPerLedger() * 2.5; i++) {
+                producer.sendAsync(buildEntry("offload-message" + i))
+                        .exceptionally(e -> {
+                            log.error("failed to send a message", e);
+                            success.set(false);
+                            return null;
+                        });;
             }
 
             producer.flush();
+            Assert.assertTrue(success.get());
         }
 
         try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
             firstLedger = 
admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
 
             // wait up to 30 seconds for offload to occur
-            for (int i = 0; i < 300 && 
!admin.topics().getInternalStats(topic).ledgers.get(0).offloaded; i++) {
-                Thread.sleep(100);
+            for (int i = 0; i < 100 && 
!admin.topics().getInternalStats(topic).ledgers.get(0).offloaded; i++) {
+                Thread.sleep(300);
             }
             
Assert.assertTrue(admin.topics().getInternalStats(topic).ledgers.get(0).offloaded);
 
@@ -175,8 +195,9 @@ public abstract class TestBaseOffload extends 
PulsarTieredStorageTestSuite {
         try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
              Consumer<byte[]> consumer = 
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
             // read back from topic
-            for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+            for (int i = 0; i < getNumEntriesPerLedger() * 2.5; i++) {
                 Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
+                Assert.assertNotNull(m);
                 Assert.assertEquals(buildEntry("offload-message" + i), 
m.getData());
             }
         }
@@ -197,30 +218,52 @@ public abstract class TestBaseOffload extends 
PulsarTieredStorageTestSuite {
                 .map(l -> l.offloaded).findFirst().get();
     }
 
-    private long writeAndWaitForOffload(String serviceUrl, String adminUrl, 
String topic) throws Exception {
+    private long writeAndWaitForOffload(String serviceUrl, String adminUrl, 
String topic)
+            throws Exception {
+        return writeAndWaitForOffload(serviceUrl, adminUrl, topic, -1);
+    }
+
+    private long writeAndWaitForOffload(String serviceUrl, String adminUrl, 
String topic, int partitionNum)
+            throws Exception {
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Producer producer = client.newProducer().topic(topic)
+            Producer<byte[]> producer = client.newProducer().topic(topic)
+                    .maxPendingMessages(getNumEntriesPerLedger() / 
2).sendTimeout(60, TimeUnit.SECONDS)
                     .blockIfQueueFull(true).enableBatching(false).create();
             PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
 
-            List<PersistentTopicInternalStats.LedgerInfo> ledgers = 
admin.topics().getInternalStats(topic).ledgers;
+            String topicToCheck = partitionNum >= 0
+                    ? topic + "-partition-" + partitionNum
+                    : topic;
+
+            List<PersistentTopicInternalStats.LedgerInfo> ledgers = 
admin.topics()
+                    .getInternalStats(topicToCheck).ledgers;
             long currentLedger = ledgers.get(ledgers.size() - 1).ledgerId;
 
             
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
 
+            AtomicBoolean success = new AtomicBoolean(true);
             // write enough to topic to make it roll twice
-            for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
-                producer.sendAsync(buildEntry("offload-message" + i));
+            for (int i = 0;
+                 i < getNumEntriesPerLedger() * 2.5 * (partitionNum > 0 ? 
partitionNum + 1 : 1);
+                 i++) {
+                producer.sendAsync(buildEntry("offload-message" + i))
+                        .exceptionally(e -> {
+                            log.error("failed to send a message", e);
+                            success.set(false);
+                            return null;
+                        });
             }
+            producer.flush();
             producer.send(buildEntry("final-offload-message"));
+            Assert.assertTrue(success.get());
 
             // wait up to 30 seconds for offload to occur
             for (int i = 0;
-                 i < 300 && 
!ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers, currentLedger);
+                 i < 100 && 
!ledgerOffloaded(admin.topics().getInternalStats(topicToCheck).ledgers, 
currentLedger);
                  i++) {
-                Thread.sleep(100);
+                Thread.sleep(300);
             }
-            
Assert.assertTrue(ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers,
 currentLedger));
+            
Assert.assertTrue(ledgerOffloaded(admin.topics().getInternalStats(topicToCheck).ledgers,
 currentLedger));
 
             return currentLedger;
         }
@@ -295,4 +338,130 @@ public abstract class TestBaseOffload extends 
PulsarTieredStorageTestSuite {
         Thread.sleep(5000);
         Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
     }
+
+    protected void testDeleteOffloadedTopic(String serviceUrl, String adminUrl,
+                                            boolean unloadBeforeDelete, int 
numPartitions) throws Exception {
+        final String tenant = "offload-test-cli-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        pulsarCluster.runAdminCommandOnAnyBroker("tenants",
+                "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+                "--admin-roles", "offload-admin", tenant);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "create", "--clusters", pulsarCluster.getClusterName(), 
namespace);
+
+        // set threshold to offload runs immediately after role
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-offload-threshold", "--size", "0", namespace);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-retention", "--size", "100M", "--time", "100m", 
namespace);
+
+        String output = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces", "get-offload-deletion-lag", 
namespace).getStdout();
+        Assert.assertTrue(output.contains("Unset for namespace"));
+
+        if (numPartitions > 0) {
+            pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                    "create-partitioned-topic", topic,
+                    "--partitions", Integer.toString(numPartitions));
+        } else {
+            pulsarCluster.runAdminCommandOnAnyBroker("topics", "create", 
topic);
+        }
+
+        long offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, 
topic, numPartitions - 1);
+        // give it up to 5 seconds to delete, it shouldn't
+        // so we wait this every time
+        Thread.sleep(5000);
+        Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", 
"set-offload-deletion-lag", namespace,
+                "--lag", "0m");
+        output = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces", "get-offload-deletion-lag", 
namespace).getStdout();
+        Assert.assertTrue(output.contains("0 minute(s)"));
+
+        offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic, 
numPartitions - 1);
+        // wait up to 10 seconds for ledger to be deleted
+        for (int i = 0; i < 10 && ledgerExistsInBookKeeper(offloadedLedger); 
i++) {
+            writeAndWaitForOffload(serviceUrl, adminUrl, topic, numPartitions 
- 1);
+            Thread.sleep(1000);
+        }
+
+        Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger));
+        Assert.assertTrue(offloadedLedgerExists(topic, numPartitions - 1, 
offloadedLedger));
+
+        if (unloadBeforeDelete) {
+            pulsarCluster.runAdminCommandOnAnyBroker("topics", "unload", 
topic);
+        }
+        if (numPartitions > 0) {
+            pulsarCluster.runAdminCommandOnAnyBroker("topics", 
"delete-partitioned-topic", topic);
+        } else {
+            pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete", 
topic);
+        }
+        final long ledgerId = offloadedLedger;
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            Assert.assertFalse(offloadedLedgerExists(topic, numPartitions - 1, 
ledgerId));
+        });
+    }
+
+    protected void testDeleteOffloadedTopicExistsInBk(String serviceUrl, 
String adminUrl,
+                                            boolean unloadBeforeDelete, int 
numPartitions) throws Exception {
+        final String tenant = "offload-test-cli-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        pulsarCluster.runAdminCommandOnAnyBroker("tenants",
+                "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+                "--admin-roles", "offload-admin", tenant);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "create", "--clusters", pulsarCluster.getClusterName(), 
namespace);
+
+        // set threshold to offload runs immediately after role
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-offload-threshold", "--size", "0", namespace);
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-retention", "--size", "100M", "--time", "100m", 
namespace);
+
+        if (numPartitions > 0) {
+            pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                    "create-partitioned-topic", topic,
+                    "--partitions", Integer.toString(numPartitions));
+        } else {
+            pulsarCluster.runAdminCommandOnAnyBroker("topics", "create", 
topic);
+        }
+
+        String output = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces", "get-offload-deletion-lag", 
namespace).getStdout();
+        Assert.assertTrue(output.contains("Unset for namespace"));
+
+        long offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, 
topic, numPartitions - 1);
+        // give it up to 5 seconds to delete, it shouldn't
+        // so we wait this every time
+        Thread.sleep(5000);
+        Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
+
+        Assert.assertTrue(offloadedLedgerExists(topic, numPartitions - 1, 
offloadedLedger));
+
+        if (unloadBeforeDelete) {
+            pulsarCluster.runAdminCommandOnAnyBroker("topics", "unload", 
topic);
+        }
+        if (numPartitions > 0) {
+            pulsarCluster.runAdminCommandOnAnyBroker("topics", 
"delete-partitioned-topic", topic);
+        } else {
+            pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete", 
topic);
+        }
+        final long ledgerId = offloadedLedger;
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            Assert.assertFalse(offloadedLedgerExists(topic, numPartitions - 1, 
ledgerId));
+        });
+        Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger));
+    }
+
+    protected boolean offloadedLedgerExists(String topic, int partitionNum, 
long firstLedger) {
+        throw new RuntimeException("not implemented");
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java
index 808aae62e74..46b5b2bacda 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java
@@ -41,14 +41,12 @@ public class TestFileSystemOffload extends TestBaseOffload {
     @Test(dataProvider =  "ServiceAndAdminUrls")
     public void testPublishOffloadAndConsumeDeletionLag(Supplier<String> 
serviceUrl, Supplier<String> adminUrl) throws Exception {
         super.testPublishOffloadAndConsumeDeletionLag(serviceUrl.get(), 
adminUrl.get());
-
     }
 
-
     @Override
     protected Map<String, String> getEnv() {
         Map<String, String> result = new HashMap<>();
-        result.put("managedLedgerMaxEntriesPerLedger", 
String.valueOf(ENTRIES_PER_LEDGER));
+        result.put("managedLedgerMaxEntriesPerLedger", 
String.valueOf(getNumEntriesPerLedger()));
         result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
         result.put("managedLedgerOffloadDriver", "filesystem");
         result.put("fileSystemURI", "file:///");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java
new file mode 100644
index 00000000000..4b1739a0cd1
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.offload;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+@Slf4j
+public class TestOffloadDeletionFS extends TestBaseOffload {
+
+    @Override
+    protected int getEntrySize() {
+        return 512;
+    }
+
+    @Override
+    protected int getNumEntriesPerLedger() {
+        return 200;
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testDeleteOffloadedTopic(Supplier<String> serviceUrl, 
Supplier<String> adminUrl) throws Exception {
+        super.testDeleteOffloadedTopic(serviceUrl.get(), adminUrl.get(), 
false, 0);
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testDeleteUnloadedOffloadedTopic(Supplier<String> serviceUrl, 
Supplier<String> adminUrl)
+            throws Exception {
+        super.testDeleteOffloadedTopic(serviceUrl.get(), adminUrl.get(), true, 
0);
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testDeleteOffloadedTopicExistsInBk(Supplier<String> 
serviceUrl, Supplier<String> adminUrl)
+            throws Exception {
+        super.testDeleteOffloadedTopicExistsInBk(serviceUrl.get(), 
adminUrl.get(), false, 0);
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testDeleteUnloadedOffloadedTopicExistsInBk(Supplier<String> 
serviceUrl, Supplier<String> adminUrl)
+            throws Exception {
+        super.testDeleteOffloadedTopicExistsInBk(serviceUrl.get(), 
adminUrl.get(), true, 0);
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testDeleteOffloadedPartitionedTopic(Supplier<String> 
serviceUrl, Supplier<String> adminUrl) throws Exception {
+        super.testDeleteOffloadedTopic(serviceUrl.get(), adminUrl.get(), 
false, 3);
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testDeleteUnloadedOffloadedPartitionedTopic(Supplier<String> 
serviceUrl, Supplier<String> adminUrl)
+            throws Exception {
+        super.testDeleteOffloadedTopic(serviceUrl.get(), adminUrl.get(), true, 
3);
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testDeleteOffloadedPartitionedTopicExistsInBk(Supplier<String> 
serviceUrl, Supplier<String> adminUrl)
+            throws Exception {
+        super.testDeleteOffloadedTopicExistsInBk(serviceUrl.get(), 
adminUrl.get(), false, 3);
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void 
testDeleteUnloadedOffloadedPartitionedTopicExistsInBk(Supplier<String> 
serviceUrl,
+                                                                      
Supplier<String> adminUrl) throws Exception {
+        super.testDeleteOffloadedTopicExistsInBk(serviceUrl.get(), 
adminUrl.get(), true, 3);
+    }
+
+    @Override
+    protected Map<String, String> getEnv() {
+        Map<String, String> result = new HashMap<>();
+        result.put("managedLedgerMaxEntriesPerLedger", 
String.valueOf(getNumEntriesPerLedger()));
+        result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+        result.put("managedLedgerOffloadDriver", "filesystem");
+        result.put("fileSystemURI", "file:///");
+
+        return result;
+    }
+
+    @Override
+    protected boolean offloadedLedgerExists(String topic, int partitionNum, 
long ledger) {
+        log.info("offloadedLedgerExists(topic = {}, 
partitionNum={},ledger={})",
+                topic, partitionNum, ledger);
+        if (partitionNum > -1) {
+            topic = topic + "-partition-" + partitionNum;
+        }
+        String managedLedgerName = 
TopicName.get(topic).getPersistenceNamingEncoding();
+        String rootPath = "pulsar/";
+        String dirPath = rootPath + managedLedgerName + "/";
+
+        List<String> result = new LinkedList<>();
+        String[] cmds = {
+                "ls",
+                "-1",
+                dirPath
+                };
+        pulsarCluster.getBrokers().forEach(broker -> {
+            try {
+                ContainerExecResult res = broker.execCmd(cmds);
+                log.info("offloadedLedgerExists broker {} 'ls -1 {}' got {}",
+                        broker.getContainerName(), dirPath, res.getStdout());
+                Arrays.stream(res.getStdout().split("\n"))
+                        .filter(x -> x.startsWith(ledger + "-"))
+                        .forEach(x -> result.add(x));
+            } catch (ContainerExecException ce) {
+                log.info("offloadedLedgerExists broker {} 'ls -1 {}' got error 
code {}",
+                        broker.getContainerName(), dirPath, 
ce.getResult().getExitCode());
+                // ignore 2 (No such file or directory)
+                if (ce.getResult().getExitCode() != 2) {
+                    throw new RuntimeException(ce);
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        return !result.isEmpty();
+    }
+
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
index edbbcfeba5e..a230b13e215 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
@@ -73,7 +73,7 @@ public class TestS3Offload extends TestBaseOffload {
     @Override
     protected Map<String, String> getEnv() {
         Map<String, String> result = new HashMap<>();
-        result.put("managedLedgerMaxEntriesPerLedger", 
String.valueOf(ENTRIES_PER_LEDGER));
+        result.put("managedLedgerMaxEntriesPerLedger", 
String.valueOf(getNumEntriesPerLedger()));
         result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
         result.put("managedLedgerOffloadDriver", "aws-s3");
         result.put("s3ManagedLedgerOffloadBucket", "pulsar-integtest");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java
index 9c53d801ea1..ef7406113f6 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java
@@ -72,7 +72,7 @@ public class TestUniversalConfigurations extends 
TestBaseOffload {
     @Override
     protected Map<String, String> getEnv() {
         Map<String, String> result = new HashMap<>();
-        result.put("managedLedgerMaxEntriesPerLedger", 
String.valueOf(ENTRIES_PER_LEDGER));
+        result.put("managedLedgerMaxEntriesPerLedger", 
String.valueOf(getNumEntriesPerLedger()));
         result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
         result.put("managedLedgerOffloadDriver", "aws-s3");
         result.put("managedLedgerOffloadBucket", "pulsar-integtest");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
index 7811b38e0fd..1c6bb9dc3f3 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
@@ -31,7 +31,9 @@ import org.testng.annotations.BeforeClass;
 @Slf4j
 public abstract class PulsarTieredStorageTestSuite extends 
PulsarClusterTestBase {
 
-    protected static final int ENTRIES_PER_LEDGER = 1024;
+    protected int getNumEntriesPerLedger() {
+        return 1024;
+    }
 
     @BeforeClass(alwaysRun = true)
     @Override

Reply via email to