This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push: new 1b4f35db22e [fix] [ml] make the result of delete cursor is success if cursor is deleted (#19825) 1b4f35db22e is described below commit 1b4f35db22e9bf03550db4bc110007eee02f8418 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Wed Mar 22 15:30:00 2023 +0800 [fix] [ml] make the result of delete cursor is success if cursor is deleted (#19825) When deleting the zk node of the cursor, if the exception `MetadataStoreException.NotFoundException` occurs, the deletion is considered successful. --- .../bookkeeper/mledger/impl/MetaStoreImpl.java | 16 +++++--- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 44 ++++++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index 2a47cfdc537..43d734b28a6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -43,6 +43,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Stat; @@ -248,7 +249,7 @@ public class MetaStoreImpl implements MetaStore { @Override public void asyncRemoveCursor(String ledgerName, String cursorName, MetaStoreCallback<Void> callback) { String path = PREFIX + ledgerName + "/" + cursorName; - log.info("[{}] Remove consumer={}", ledgerName, cursorName); + log.info("[{}] Remove cursor={}", ledgerName, cursorName); store.delete(path, Optional.empty()) .thenAcceptAsync(v -> { @@ -257,11 +258,16 @@ public class MetaStoreImpl implements MetaStore { } callback.operationComplete(null, null); }, executor.chooseThread(ledgerName)) - .exceptionally(ex -> { - executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback - .operationFailed(getException(ex)))); + .exceptionallyAsync(ex -> { + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + if (actEx instanceof MetadataStoreException.NotFoundException){ + log.info("[{}] [{}] cursor delete done because it did not exist.", ledgerName, cursorName); + callback.operationComplete(null, null); + return null; + } + SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))); return null; - }); + }, executor.chooseThread(ledgerName)); } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 7ad7e0f8d68..45fd275a412 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -128,6 +128,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.SessionEvent; @@ -3958,4 +3959,47 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { managedLedger.getEnsemblesAsync(lastLedger).join(); Assert.assertFalse(managedLedger.ledgerCache.containsKey(lastLedger)); } + + @Test + public void testGetEstimatedBacklogSize() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(2); + config.setRetentionTime(-1, TimeUnit.SECONDS); + config.setRetentionSizeInMB(-1); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testGetEstimatedBacklogSize", config); + List<Position> positions = new ArrayList<>(10); + for (int i = 0; i < 10; i++) { + positions.add(ledger.addEntry(new byte[1])); + } + + Assert.assertEquals(ledger.getEstimatedBacklogSize(new PositionImpl(-1, -1)), 10); + Assert.assertEquals(ledger.getEstimatedBacklogSize(((PositionImpl) positions.get(1))), 8); + Assert.assertEquals(ledger.getEstimatedBacklogSize(((PositionImpl) positions.get(9)).getNext()), 0); + ledger.close(); + } + + @Test + public void testDeleteCursorTwice() throws Exception { + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("ml"); + String cursorName = "cursor_1"; + ml.openCursor(cursorName); + syncRemoveCursor(ml, cursorName); + syncRemoveCursor(ml, cursorName); + } + + private void syncRemoveCursor(ManagedLedgerImpl ml, String cursorName){ + CompletableFuture<Void> future = new CompletableFuture<>(); + ml.getStore().asyncRemoveCursor(ml.name, cursorName, new MetaStoreCallback<Void>() { + @Override + public void operationComplete(Void result, Stat stat) { + future.complete(null); + } + + @Override + public void operationFailed(MetaStoreException e) { + future.completeExceptionally(FutureUtil.unwrapCompletionException(e)); + } + }); + future.join(); + } }