This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7299f1af75ff39a9f046bd8f2c6e77dfff5ceb57
Author: 道君 <[email protected]>
AuthorDate: Thu Oct 10 22:03:51 2024 +0800

    [fix][broker] Avoid orphan ledgers in BucketDelayedDeliveryTracker (#22802)
    
    (cherry picked from commit 8b6b3370f921435a61132e4f26f6428907dc69d8)
---
 .../BucketDelayedDeliveryTrackerFactory.java       |  7 ++---
 .../bucket/BookkeeperBucketSnapshotStorage.java    | 18 +++++++++---
 .../bucket/BucketDelayedDeliveryTracker.java       | 30 ++++++++++++++++++--
 .../delayed/bucket/BucketNotExistException.java    | 32 ++++++++++++++++++++++
 .../broker/delayed/bucket/ImmutableBucket.java     | 10 ++++---
 5 files changed, 83 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index 69a08bd2be4..2ee09a0f04c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -119,10 +119,9 @@ public class BucketDelayedDeliveryTrackerFactory 
implements DelayedDeliveryTrack
         FutureUtil.Sequencer<Void> sequencer = FutureUtil.Sequencer.create();
         cursorProperties.forEach((k, v) -> {
             if (k != null && v != null && 
k.startsWith(BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX)) {
-                CompletableFuture<Void> future = sequencer.sequential(() -> {
-                    return cursor.removeCursorProperty(k)
-                            .thenCompose(__ -> 
bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v)));
-                });
+                CompletableFuture<Void> future = sequencer.sequential(() ->
+                        
bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v))
+                                .thenCompose(__ -> 
cursor.removeCursorProperty(k)));
                 futures.add(future);
             }
         });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 8dcfe8d39a8..0d90e5e1d98 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -205,7 +205,10 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
                 
BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
                 LedgerPassword,
                 (rc, handle, ctx) -> {
-                    if (rc != BKException.Code.OK) {
+                    if (rc == BKException.Code.NoSuchLedgerExistsException) {
+                        // If the ledger does not exist, throw 
BucketNotExistException
+                        
future.completeExceptionally(noSuchLedgerException("Open ledger", ledgerId));
+                    } else if (rc != BKException.Code.OK) {
                         future.completeExceptionally(bkException("Open 
ledger", rc, ledgerId));
                     } else {
                         future.complete(handle);
@@ -265,10 +268,11 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
     private CompletableFuture<Void> deleteLedger(long ledgerId) {
         CompletableFuture<Void> future = new CompletableFuture<>();
         bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> {
-            if (rc != BKException.Code.OK) {
-                future.completeExceptionally(bkException("Delete ledger", rc, 
ledgerId));
-            } else {
+            if (rc == BKException.Code.NoSuchLedgerExistsException || rc == 
BKException.Code.OK) {
+                // If the ledger does not exist or has been deleted, we can 
treat it as success
                 future.complete(null);
+            } else  {
+                future.completeExceptionally(bkException("Delete ledger", rc, 
ledgerId));
             }
         }, null);
         return future;
@@ -279,4 +283,10 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
                 + " -  ledger=" + ledgerId + " - operation=" + operation;
         return new BucketSnapshotPersistenceException(message);
     }
+
+    private static BucketNotExistException noSuchLedgerException(String 
operation, long ledgerId) {
+        String message = 
BKException.getMessage(BKException.Code.NoSuchLedgerExistsException)
+                + " - ledger=" + ledgerId + " - operation=" + operation;
+        return new BucketNotExistException(message);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 1cbae674bd5..c23bc17c303 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -179,8 +179,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 futures = new HashMap<>(immutableBucketMap.size());
         for (Map.Entry<Range<Long>, ImmutableBucket> entry : 
immutableBucketMap.entrySet()) {
             Range<Long> key = entry.getKey();
-            ImmutableBucket immutableBucket = entry.getValue();
-            futures.put(key, 
immutableBucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime));
+            futures.put(key, 
handleRecoverBucketSnapshotEntry(entry.getValue()));
         }
 
         try {
@@ -231,6 +230,33 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         return numberDelayedMessages.getValue();
     }
 
+    /**
+     * Handle the BucketNotExistException when recover bucket snapshot entry.
+     * The non exist bucket will be added to `toBeDeletedBucketMap` and 
deleted from `immutableBuckets`
+     * in the next step.
+     *
+     * @param bucket
+     * @return
+     */
+    private CompletableFuture<List<DelayedIndex>> 
handleRecoverBucketSnapshotEntry(ImmutableBucket bucket) {
+        CompletableFuture<List<DelayedIndex>> f = new CompletableFuture<>();
+        bucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime)
+                .whenComplete((v, e) -> {
+                    if (e == null) {
+                        f.complete(v);
+                    } else {
+                        if (e instanceof BucketNotExistException) {
+                            // If the bucket does not exist, return an empty 
list,
+                            // the bucket will be deleted from 
`immutableBuckets` in the next step.
+                            f.complete(Collections.emptyList());
+                        } else {
+                            f.completeExceptionally(e);
+                        }
+                    }
+                });
+        return f;
+    }
+
     private synchronized void putAndCleanOverlapRange(Range<Long> range, 
ImmutableBucket immutableBucket,
                                                       Map<Range<Long>, 
ImmutableBucket> toBeDeletedBucketMap) {
         RangeMap<Long, ImmutableBucket> subRangeMap = 
immutableBuckets.subRangeMap(range);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketNotExistException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketNotExistException.java
new file mode 100644
index 00000000000..f6c16a1595f
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketNotExistException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import org.apache.pulsar.broker.service.BrokerServiceException;
+
+public class BucketNotExistException extends 
BrokerServiceException.PersistenceException {
+
+        public BucketNotExistException(Throwable t) {
+            super(t);
+        }
+
+        public BucketNotExistException(String msg) {
+            super(msg);
+        }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 0932f51f350..a1944a21ea7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -193,9 +193,10 @@ class ImmutableBucket extends Bucket {
         stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.delete);
         String bucketKey = bucketKey();
         long bucketId = getAndUpdateBucketId();
-        return removeBucketCursorProperty(bucketKey).thenCompose(__ ->
-                executeWithRetry(() -> 
bucketSnapshotStorage.deleteBucketSnapshot(bucketId),
-                        BucketSnapshotPersistenceException.class, 
MaxRetryTimes)).whenComplete((__, ex) -> {
+
+        return executeWithRetry(() -> 
bucketSnapshotStorage.deleteBucketSnapshot(bucketId),
+                BucketSnapshotPersistenceException.class, MaxRetryTimes)
+                .whenComplete((__, ex) -> {
                     if (ex != null) {
                         log.error("[{}] Failed to delete bucket snapshot, 
bucketId: {}, bucketKey: {}",
                                 dispatcherName, bucketId, bucketKey, ex);
@@ -208,7 +209,8 @@ class ImmutableBucket extends Bucket {
                         
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.delete,
                                 System.currentTimeMillis() - deleteStartTime);
                     }
-        });
+                })
+                .thenCompose(__ -> removeBucketCursorProperty(bucketKey));
     }
 
     CompletableFuture<Void> clear(BucketDelayedMessageIndexStats stats) {

Reply via email to