This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7daad7a3f5715dd9ef0281cfe1219da7cbd22bf5
Author: Kai Wang <[email protected]>
AuthorDate: Sun Sep 29 11:16:33 2024 +0800

    [fix][ml] Managed ledger should recover after open ledger failed (#23368)
    
    (cherry picked from commit 77cb67a8c05210b9af0deb719cd24e3c3f5521b1)
    (cherry picked from commit c80eb40c537e92ee8cd5623337f2369201bcbd88)
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  1 +
 .../mledger/impl/ManagedLedgerErrorsTest.java      | 31 ++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 3617f0d525a..ed6f63c3938 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -401,6 +401,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
 
                                 // Clean the map if initialization fails
                                 ledgers.remove(name, future);
+                                entryCacheManager.removeEntryCache(name);
 
                                 if (pendingInitializeLedgers.remove(name, 
pendingLedger)) {
                                     pendingLedger.ledger.asyncClose(new 
CloseCallback() {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index 7b2f8228ad7..b13744bcea2 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -31,12 +31,14 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -509,6 +511,35 @@ public class ManagedLedgerErrorsTest extends 
MockedBookKeeperTestCase {
         entries.forEach(Entry::release);
     }
 
+    @Test
+    public void recoverAfterOpenManagedLedgerFail() throws Exception {
+        ManagedLedger ledger = 
factory.open("recoverAfterOpenManagedLedgerFail");
+        Position position = ledger.addEntry("entry".getBytes());
+        ledger.close();
+        bkc.failAfter(0, BKException.Code.BookieHandleNotAvailableException);
+        try {
+            factory.open("recoverAfterOpenManagedLedgerFail");
+        } catch (Exception e) {
+            // ok
+        }
+
+        ledger = factory.open("recoverAfterOpenManagedLedgerFail");
+        CompletableFuture<byte[]> future = new CompletableFuture<>();
+        ((ManagedLedgerImpl)ledger).asyncReadEntry((PositionImpl)position, new 
AsyncCallbacks.ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                future.complete(entry.getData());
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                future.completeExceptionally(exception);
+            }
+        }, null);
+        byte[] bytes = future.get(30, TimeUnit.SECONDS);
+        assertEquals(new String(bytes), "entry");
+    }
+
     @Test
     public void recoverLongTimeAfterMultipleWriteErrors() throws Exception {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("recoverLongTimeAfterMultipleWriteErrors");

Reply via email to