This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 0fd5af03dca [fix] [ml] Fix uncompleted future when remove cursor 
(#20050)
0fd5af03dca is described below

commit 0fd5af03dcab29f4fb50cecfda1c4ddec7b70d2b
Author: fengyubiao <[email protected]>
AuthorDate: Tue Apr 11 02:44:03 2023 +0800

    [fix] [ml] Fix uncompleted future when remove cursor (#20050)
---
 .../bookkeeper/mledger/impl/MetaStoreImpl.java       | 20 +++++++++++---------
 .../bookkeeper/mledger/impl/ManagedCursorTest.java   | 17 +++++++++++++++++
 2 files changed, 28 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 43d734b28a6..5a69af98e17 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -258,16 +258,18 @@ public class MetaStoreImpl implements MetaStore {
                     }
                     callback.operationComplete(null, null);
                 }, executor.chooseThread(ledgerName))
-                .exceptionallyAsync(ex -> {
-                    Throwable actEx = FutureUtil.unwrapCompletionException(ex);
-                    if (actEx instanceof 
MetadataStoreException.NotFoundException){
-                        log.info("[{}] [{}] cursor delete done because it did 
not exist.", ledgerName, cursorName);
-                        callback.operationComplete(null, null);
-                        return null;
-                    }
-                    SafeRunnable.safeRun(() -> 
callback.operationFailed(getException(ex)));
+                .exceptionally(ex -> {
+                    executor.executeOrdered(ledgerName, 
SafeRunnable.safeRun(() -> {
+                        Throwable actEx = 
FutureUtil.unwrapCompletionException(ex);
+                        if (actEx instanceof 
MetadataStoreException.NotFoundException){
+                            log.info("[{}] [{}] cursor delete done because it 
did not exist.", ledgerName, cursorName);
+                            callback.operationComplete(null, null);
+                            return;
+                        }
+                        callback.operationFailed(getException(ex));
+                    }));
                     return null;
-                }, executor.chooseThread(ledgerName));
+                });
     }
 
     @Override
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index cdc493b51ab..bace4fea6ca 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -95,6 +96,7 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.IntRange;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
@@ -1052,6 +1054,21 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         Awaitility.await().until(() -> ledger.getNumberOfEntries() <= 2);
     }
 
+    @Test(timeOut = 10000)
+    void testRemoveCursorFail() throws Exception {
+        String mlName = UUID.randomUUID().toString().replaceAll("-", "");
+        String cursorName = "c1";
+        ManagedLedger ledger = factory.open(mlName);
+        ledger.openCursor(cursorName);
+        metadataStore.setAlwaysFail(new MetadataStoreException("123"));
+        try {
+            ledger.deleteCursor(cursorName);
+            fail("expected delete cursor failure.");
+        } catch (Exception ex) {
+            
assertTrue(FutureUtil.unwrapCompletionException(ex).getMessage().contains("123"));
+        }
+    }
+
     @Test(timeOut = 20000)
     void cursorPersistence() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");

Reply via email to