This is an automated email from the ASF dual-hosted git repository. cbornet pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 254f09b70d80640095f2fe6d12ed79f046fb3241 Author: Christophe Bornet <cbor...@hotmail.com> AuthorDate: Mon Apr 24 11:50:50 2023 +0200 Revert "[improve] [broker] Close temporary open ledger in BookkeeperBucketSnapshotStorage (#20111)" This reverts commit 1d1a3ef864a65c995ceda4b7875ed934c2574298. --- .../bucket/BookkeeperBucketSnapshotStorage.java | 33 +++++----------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index 9c30ccf1c0b..e7d4f9301dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -66,41 +66,22 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage { @Override public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) { - return openLedger(bucketId).thenCompose(ledgerHandle -> { - CompletableFuture<SnapshotMetadata> snapshotFuture = - getLedgerEntry(ledgerHandle, 0, 0) - .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())); - - snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); - - return snapshotFuture; - }); + return openLedger(bucketId).thenCompose( + ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0). + thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement()))); } @Override public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId, long lastSegmentEntryId) { - return openLedger(bucketId).thenCompose(ledgerHandle -> { - CompletableFuture<List<SnapshotSegment>> parseFuture = - getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId) - .thenApply(this::parseSnapshotSegmentEntries); - - parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); - - return parseFuture; - }); + return openLedger(bucketId).thenCompose( + ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, + lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries)); } @Override public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) { - return openLedger(bucketId).thenCompose(ledgerHandle -> { - CompletableFuture<Long> lengthFuture = - CompletableFuture.completedFuture(ledgerHandle.getLength()); - - lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); - - return lengthFuture; - }); + return openLedger(bucketId).thenApply(LedgerHandle::getLength); } @Override