This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8b6b3370f92 [fix][broker] Avoid orphan ledgers in
BucketDelayedDeliveryTracker (#22802)
8b6b3370f92 is described below
commit 8b6b3370f921435a61132e4f26f6428907dc69d8
Author: 道君 <[email protected]>
AuthorDate: Thu Oct 10 22:03:51 2024 +0800
[fix][broker] Avoid orphan ledgers in BucketDelayedDeliveryTracker (#22802)
---
.../BucketDelayedDeliveryTrackerFactory.java | 7 ++---
.../bucket/BookkeeperBucketSnapshotStorage.java | 18 +++++++++---
.../bucket/BucketDelayedDeliveryTracker.java | 30 ++++++++++++++++++--
.../delayed/bucket/BucketNotExistException.java | 32 ++++++++++++++++++++++
.../broker/delayed/bucket/ImmutableBucket.java | 10 ++++---
5 files changed, 83 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index c2d002ad19c..93eb3ebbc77 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -119,10 +119,9 @@ public class BucketDelayedDeliveryTrackerFactory
implements DelayedDeliveryTrack
FutureUtil.Sequencer<Void> sequencer = FutureUtil.Sequencer.create();
cursorProperties.forEach((k, v) -> {
if (k != null && v != null &&
k.startsWith(BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX)) {
- CompletableFuture<Void> future = sequencer.sequential(() -> {
- return cursor.removeCursorProperty(k)
- .thenCompose(__ ->
bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v)));
- });
+ CompletableFuture<Void> future = sequencer.sequential(() ->
+
bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v))
+ .thenCompose(__ ->
cursor.removeCursorProperty(k)));
futures.add(future);
}
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 8dcfe8d39a8..0d90e5e1d98 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -205,7 +205,10 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
LedgerPassword,
(rc, handle, ctx) -> {
- if (rc != BKException.Code.OK) {
+ if (rc == BKException.Code.NoSuchLedgerExistsException) {
+ // If the ledger does not exist, throw
BucketNotExistException
+
future.completeExceptionally(noSuchLedgerException("Open ledger", ledgerId));
+ } else if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Open
ledger", rc, ledgerId));
} else {
future.complete(handle);
@@ -265,10 +268,11 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
private CompletableFuture<Void> deleteLedger(long ledgerId) {
CompletableFuture<Void> future = new CompletableFuture<>();
bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> {
- if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Delete ledger", rc,
ledgerId));
- } else {
+ if (rc == BKException.Code.NoSuchLedgerExistsException || rc ==
BKException.Code.OK) {
+ // If the ledger does not exist or has been deleted, we can
treat it as success
future.complete(null);
+ } else {
+ future.completeExceptionally(bkException("Delete ledger", rc,
ledgerId));
}
}, null);
return future;
@@ -279,4 +283,10 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
+ " - ledger=" + ledgerId + " - operation=" + operation;
return new BucketSnapshotPersistenceException(message);
}
+
+ private static BucketNotExistException noSuchLedgerException(String
operation, long ledgerId) {
+ String message =
BKException.getMessage(BKException.Code.NoSuchLedgerExistsException)
+ + " - ledger=" + ledgerId + " - operation=" + operation;
+ return new BucketNotExistException(message);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 0091bf5b9bd..08f3ae1fa6e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -180,8 +180,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
futures = new HashMap<>(immutableBucketMap.size());
for (Map.Entry<Range<Long>, ImmutableBucket> entry :
immutableBucketMap.entrySet()) {
Range<Long> key = entry.getKey();
- ImmutableBucket immutableBucket = entry.getValue();
- futures.put(key,
immutableBucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime));
+ futures.put(key,
handleRecoverBucketSnapshotEntry(entry.getValue()));
}
try {
@@ -232,6 +231,33 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
return numberDelayedMessages.getValue();
}
+ /**
+ * Handle the BucketNotExistException when recover bucket snapshot entry.
+ * The non exist bucket will be added to `toBeDeletedBucketMap` and
deleted from `immutableBuckets`
+ * in the next step.
+ *
+ * @param bucket
+ * @return
+ */
+ private CompletableFuture<List<DelayedIndex>>
handleRecoverBucketSnapshotEntry(ImmutableBucket bucket) {
+ CompletableFuture<List<DelayedIndex>> f = new CompletableFuture<>();
+ bucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime)
+ .whenComplete((v, e) -> {
+ if (e == null) {
+ f.complete(v);
+ } else {
+ if (e instanceof BucketNotExistException) {
+ // If the bucket does not exist, return an empty
list,
+ // the bucket will be deleted from
`immutableBuckets` in the next step.
+ f.complete(Collections.emptyList());
+ } else {
+ f.completeExceptionally(e);
+ }
+ }
+ });
+ return f;
+ }
+
private synchronized void putAndCleanOverlapRange(Range<Long> range,
ImmutableBucket immutableBucket,
Map<Range<Long>,
ImmutableBucket> toBeDeletedBucketMap) {
RangeMap<Long, ImmutableBucket> subRangeMap =
immutableBuckets.subRangeMap(range);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketNotExistException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketNotExistException.java
new file mode 100644
index 00000000000..f6c16a1595f
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketNotExistException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import org.apache.pulsar.broker.service.BrokerServiceException;
+
+public class BucketNotExistException extends
BrokerServiceException.PersistenceException {
+
+ public BucketNotExistException(Throwable t) {
+ super(t);
+ }
+
+ public BucketNotExistException(String msg) {
+ super(msg);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 0932f51f350..a1944a21ea7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -193,9 +193,10 @@ class ImmutableBucket extends Bucket {
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.delete);
String bucketKey = bucketKey();
long bucketId = getAndUpdateBucketId();
- return removeBucketCursorProperty(bucketKey).thenCompose(__ ->
- executeWithRetry(() ->
bucketSnapshotStorage.deleteBucketSnapshot(bucketId),
- BucketSnapshotPersistenceException.class,
MaxRetryTimes)).whenComplete((__, ex) -> {
+
+ return executeWithRetry(() ->
bucketSnapshotStorage.deleteBucketSnapshot(bucketId),
+ BucketSnapshotPersistenceException.class, MaxRetryTimes)
+ .whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to delete bucket snapshot,
bucketId: {}, bucketKey: {}",
dispatcherName, bucketId, bucketKey, ex);
@@ -208,7 +209,8 @@ class ImmutableBucket extends Bucket {
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.delete,
System.currentTimeMillis() - deleteStartTime);
}
- });
+ })
+ .thenCompose(__ -> removeBucketCursorProperty(bucketKey));
}
CompletableFuture<Void> clear(BucketDelayedMessageIndexStats stats) {