This is an automated email from the ASF dual-hosted git repository. cbornet pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 49539c20bea1d8b10b8465be5ee632c1c4bdc7ce Author: Christophe Bornet <cbor...@hotmail.com> AuthorDate: Mon Apr 24 11:46:27 2023 +0200 Revert "[improve][broker] Cache LedgerHandle in BookkeeperBucketSnapshotStorage (#20117)" This reverts commit e1d63990644700bf61b3d7af1ef6d4d62145c2bb. --- .../bucket/BookkeeperBucketSnapshotStorage.java | 52 +++++++++++----------- .../BookkeeperBucketSnapshotStorageTest.java | 43 ------------------ 2 files changed, 26 insertions(+), 69 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index 18a4c322f7b..9c30ccf1c0b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import javax.validation.constraints.NotNull; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; @@ -49,8 +48,6 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage { private final ServiceConfiguration config; private BookKeeper bookKeeper; - private final Map<Long, CompletableFuture<LedgerHandle>> ledgerHandleFutureCache = new ConcurrentHashMap<>(); - public BookkeeperBucketSnapshotStorage(PulsarService pulsar) { this.pulsar = pulsar; this.config = pulsar.getConfig(); @@ -69,30 +66,45 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage { @Override public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) { - return getLedgerHandle(bucketId).thenCompose(ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0) - .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement()))); + return openLedger(bucketId).thenCompose(ledgerHandle -> { + CompletableFuture<SnapshotMetadata> snapshotFuture = + getLedgerEntry(ledgerHandle, 0, 0) + .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())); + + snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); + + return snapshotFuture; + }); } @Override public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId, long lastSegmentEntryId) { - return getLedgerHandle(bucketId).thenCompose( - ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId) - .thenApply(this::parseSnapshotSegmentEntries)); + return openLedger(bucketId).thenCompose(ledgerHandle -> { + CompletableFuture<List<SnapshotSegment>> parseFuture = + getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId) + .thenApply(this::parseSnapshotSegmentEntries); + + parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); + + return parseFuture; + }); } @Override public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) { - return getLedgerHandle(bucketId).thenCompose( - ledgerHandle -> CompletableFuture.completedFuture(ledgerHandle.getLength())); + return openLedger(bucketId).thenCompose(ledgerHandle -> { + CompletableFuture<Long> lengthFuture = + CompletableFuture.completedFuture(ledgerHandle.getLength()); + + lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); + + return lengthFuture; + }); } @Override public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) { - CompletableFuture<LedgerHandle> ledgerHandleFuture = ledgerHandleFutureCache.remove(bucketId); - if (ledgerHandleFuture != null) { - ledgerHandleFuture.whenComplete((lh, ex) -> closeLedger(lh)); - } return deleteLedger(bucketId); } @@ -166,18 +178,6 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage { return future; } - private CompletableFuture<LedgerHandle> getLedgerHandle(Long ledgerId) { - CompletableFuture<LedgerHandle> ledgerHandleCompletableFuture = - ledgerHandleFutureCache.computeIfAbsent(ledgerId, k -> openLedger(ledgerId)); - // remove future of completed exceptionally - ledgerHandleCompletableFuture.whenComplete((__, ex) -> { - if (ex != null) { - ledgerHandleFutureCache.remove(ledgerId, ledgerHandleCompletableFuture); - } - }); - return ledgerHandleCompletableFuture; - } - private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) { final CompletableFuture<LedgerHandle> future = new CompletableFuture<>(); bookKeeper.asyncOpenLedger( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java index 7cb6b8d5865..a628b58e10d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java @@ -30,7 +30,6 @@ import java.util.concurrent.ExecutionException; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; -import org.apache.pulsar.common.util.FutureUtil; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -205,46 +204,4 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase Assert.assertTrue(bucketSnapshotLength > 0L); } - @Test - public void testConcurrencyGet() throws ExecutionException, InterruptedException { - DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata = - DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder() - .setMinScheduleTimestamp(System.currentTimeMillis()) - .setMaxScheduleTimestamp(System.currentTimeMillis()) - .putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build(); - - DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata = - DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder() - .addMetadataList(segmentMetadata) - .build(); - List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>(); - - long timeMillis = System.currentTimeMillis(); - DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex = - DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L) - .setTimestamp(timeMillis).build(); - DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment = - DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build(); - bucketSnapshotSegments.add(snapshotSegment); - bucketSnapshotSegments.add(snapshotSegment); - - CompletableFuture<Long> future = - bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, - bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME); - Long bucketId = future.get(); - Assert.assertNotNull(bucketId); - - List<CompletableFuture<Void>> futures = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - CompletableFuture<Void> future0 = CompletableFuture.runAsync(() -> { - List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> list = - bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3).join(); - Assert.assertTrue(list.size() > 0); - }); - futures.add(future0); - } - - FutureUtil.waitForAll(futures).join(); - } - }