This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 1b4f35db22e [fix] [ml] make the result of delete cursor is success if 
cursor is deleted (#19825)
1b4f35db22e is described below

commit 1b4f35db22e9bf03550db4bc110007eee02f8418
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Wed Mar 22 15:30:00 2023 +0800

    [fix] [ml] make the result of delete cursor is success if cursor is deleted 
(#19825)
    
    When deleting the zk node of the cursor, if the exception 
`MetadataStoreException.NotFoundException`  occurs, the deletion is considered 
successful.
---
 .../bookkeeper/mledger/impl/MetaStoreImpl.java     | 16 +++++---
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 44 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 5 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 2a47cfdc537..43d734b28a6 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -43,6 +43,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Stat;
@@ -248,7 +249,7 @@ public class MetaStoreImpl implements MetaStore {
     @Override
     public void asyncRemoveCursor(String ledgerName, String cursorName, 
MetaStoreCallback<Void> callback) {
         String path = PREFIX + ledgerName + "/" + cursorName;
-        log.info("[{}] Remove consumer={}", ledgerName, cursorName);
+        log.info("[{}] Remove cursor={}", ledgerName, cursorName);
 
         store.delete(path, Optional.empty())
                 .thenAcceptAsync(v -> {
@@ -257,11 +258,16 @@ public class MetaStoreImpl implements MetaStore {
                     }
                     callback.operationComplete(null, null);
                 }, executor.chooseThread(ledgerName))
-                .exceptionally(ex -> {
-                    executor.executeOrdered(ledgerName, 
SafeRunnable.safeRun(() -> callback
-                            .operationFailed(getException(ex))));
+                .exceptionallyAsync(ex -> {
+                    Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+                    if (actEx instanceof 
MetadataStoreException.NotFoundException){
+                        log.info("[{}] [{}] cursor delete done because it did 
not exist.", ledgerName, cursorName);
+                        callback.operationComplete(null, null);
+                        return null;
+                    }
+                    SafeRunnable.safeRun(() -> 
callback.operationFailed(getException(ex)));
                     return null;
-                });
+                }, executor.chooseThread(ledgerName));
     }
 
     @Override
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 7ad7e0f8d68..45fd275a412 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -128,6 +128,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
@@ -3958,4 +3959,47 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         managedLedger.getEnsemblesAsync(lastLedger).join();
         Assert.assertFalse(managedLedger.ledgerCache.containsKey(lastLedger));
     }
+
+    @Test
+    public void testGetEstimatedBacklogSize() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+        config.setRetentionTime(-1, TimeUnit.SECONDS);
+        config.setRetentionSizeInMB(-1);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testGetEstimatedBacklogSize", config);
+        List<Position> positions = new ArrayList<>(10);
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(new byte[1]));
+        }
+
+        Assert.assertEquals(ledger.getEstimatedBacklogSize(new 
PositionImpl(-1, -1)), 10);
+        Assert.assertEquals(ledger.getEstimatedBacklogSize(((PositionImpl) 
positions.get(1))), 8);
+        Assert.assertEquals(ledger.getEstimatedBacklogSize(((PositionImpl) 
positions.get(9)).getNext()), 0);
+        ledger.close();
+    }
+
+    @Test
+    public void testDeleteCursorTwice() throws Exception {
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("ml");
+        String cursorName = "cursor_1";
+        ml.openCursor(cursorName);
+        syncRemoveCursor(ml, cursorName);
+        syncRemoveCursor(ml, cursorName);
+    }
+
+    private void syncRemoveCursor(ManagedLedgerImpl ml, String cursorName){
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ml.getStore().asyncRemoveCursor(ml.name, cursorName, new 
MetaStoreCallback<Void>() {
+            @Override
+            public void operationComplete(Void result, Stat stat) {
+                future.complete(null);
+            }
+
+            @Override
+            public void operationFailed(MetaStoreException e) {
+                
future.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+            }
+        });
+        future.join();
+    }
 }

Reply via email to