This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 0fd5af03dca [fix] [ml] Fix uncompleted future when remove cursor
(#20050)
0fd5af03dca is described below
commit 0fd5af03dcab29f4fb50cecfda1c4ddec7b70d2b
Author: fengyubiao <[email protected]>
AuthorDate: Tue Apr 11 02:44:03 2023 +0800
[fix] [ml] Fix uncompleted future when remove cursor (#20050)
---
.../bookkeeper/mledger/impl/MetaStoreImpl.java | 20 +++++++++++---------
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 17 +++++++++++++++++
2 files changed, 28 insertions(+), 9 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 43d734b28a6..5a69af98e17 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -258,16 +258,18 @@ public class MetaStoreImpl implements MetaStore {
}
callback.operationComplete(null, null);
}, executor.chooseThread(ledgerName))
- .exceptionallyAsync(ex -> {
- Throwable actEx = FutureUtil.unwrapCompletionException(ex);
- if (actEx instanceof
MetadataStoreException.NotFoundException){
- log.info("[{}] [{}] cursor delete done because it did
not exist.", ledgerName, cursorName);
- callback.operationComplete(null, null);
- return null;
- }
- SafeRunnable.safeRun(() ->
callback.operationFailed(getException(ex)));
+ .exceptionally(ex -> {
+ executor.executeOrdered(ledgerName,
SafeRunnable.safeRun(() -> {
+ Throwable actEx =
FutureUtil.unwrapCompletionException(ex);
+ if (actEx instanceof
MetadataStoreException.NotFoundException){
+ log.info("[{}] [{}] cursor delete done because it
did not exist.", ledgerName, cursorName);
+ callback.operationComplete(null, null);
+ return;
+ }
+ callback.operationFailed(getException(ex));
+ }));
return null;
- }, executor.chooseThread(ledgerName));
+ });
}
@Override
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index cdc493b51ab..bace4fea6ca 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -46,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -95,6 +96,7 @@ import
org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.IntRange;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
@@ -1052,6 +1054,21 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
Awaitility.await().until(() -> ledger.getNumberOfEntries() <= 2);
}
+ @Test(timeOut = 10000)
+ void testRemoveCursorFail() throws Exception {
+ String mlName = UUID.randomUUID().toString().replaceAll("-", "");
+ String cursorName = "c1";
+ ManagedLedger ledger = factory.open(mlName);
+ ledger.openCursor(cursorName);
+ metadataStore.setAlwaysFail(new MetadataStoreException("123"));
+ try {
+ ledger.deleteCursor(cursorName);
+ fail("expected delete cursor failure.");
+ } catch (Exception ex) {
+
assertTrue(FutureUtil.unwrapCompletionException(ex).getMessage().contains("123"));
+ }
+ }
+
@Test(timeOut = 20000)
void cursorPersistence() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");