This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 38322a689b2 [improve][broker] PIP-327: Support force topic loading for 
unrecoverable errors (#21759)
38322a689b2 is described below

commit 38322a689b205fa8e4233146a2f9136081f92f26
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Oct 3 21:58:43 2024 -0700

    [improve][broker] PIP-327: Support force topic loading for unrecoverable 
errors (#21759)
---
 .../bookkeeper/mledger/ManagedLedgerConfig.java    | 12 +++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  6 ++-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 58 +++++++++++++++++++++-
 .../apache/pulsar/broker/ServiceConfiguration.java | 12 +++++
 .../pulsar/broker/service/BrokerService.java       |  1 +
 .../service/schema/BookkeeperSchemaStorage.java    | 30 +++++++----
 .../schema/BookkeeperSchemaStorageTest.java        | 15 ++++--
 7 files changed, 117 insertions(+), 17 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index a24251450b4..7b28990f355 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -64,6 +64,7 @@ public class ManagedLedgerConfig {
     private long retentionTimeMs = 0;
     private long retentionSizeInMB = 0;
     private boolean autoSkipNonRecoverableData;
+    private boolean ledgerForceRecovery;
     private boolean lazyCursorRecovery = false;
     private long metadataOperationsTimeoutSeconds = 60;
     private long readEntryTimeoutSeconds = 120;
@@ -465,6 +466,17 @@ public class ManagedLedgerConfig {
         this.autoSkipNonRecoverableData = skipNonRecoverableData;
     }
 
+    /**
+     * Skip managed ledger failure to recover managed ledger forcefully.
+     */
+    public boolean isLedgerForceRecovery() {
+        return ledgerForceRecovery;
+    }
+
+    public void setLedgerForceRecovery(boolean ledgerForceRecovery) {
+        this.ledgerForceRecovery = ledgerForceRecovery;
+    }
+
     /**
      * @return max unacked message ranges that will be persisted and recovered.
      *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b39fd231cdc..f469b88cae8 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -182,6 +182,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     // Wether the current cursorLedger is read-only or writable
     private boolean isCursorLedgerReadOnly = true;
+    private boolean ledgerForceRecovery;
 
     // Stat of the cursor z-node
     // NOTE: Don't update cursorLedgerStat alone,
@@ -332,6 +333,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             markDeleteLimiter = null;
         }
         this.mbean = new ManagedCursorMXBeanImpl(this);
+        this.ledgerForceRecovery = getConfig().isLedgerForceRecovery();
     }
 
     private void updateCursorLedgerStat(ManagedCursorInfo cursorInfo, Stat 
stat) {
@@ -547,7 +549,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             if (log.isInfoEnabled()) {
                 log.info("[{}] Opened ledger {} for cursor {}. rc={}", 
ledger.getName(), ledgerId, name, rc);
             }
-            if (isBkErrorNotRecoverable(rc)) {
+            if (isBkErrorNotRecoverable(rc) || ledgerForceRecovery) {
                 log.error("[{}] Error opening metadata ledger {} for cursor 
{}: {}", ledger.getName(), ledgerId, name,
                         BKException.getMessage(rc));
                 // Rewind to oldest entry available
@@ -575,7 +577,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}} readComplete rc={} entryId={}", 
ledger.getName(), rc1, lh1.getLastAddConfirmed());
                 }
-                if (isBkErrorNotRecoverable(rc1)) {
+                if (isBkErrorNotRecoverable(rc1) || ledgerForceRecovery) {
                     log.error("[{}] Error reading from metadata ledger {} for 
cursor {}: {}", ledger.getName(),
                             ledgerId, name, BKException.getMessage(rc1));
                     // Rewind to oldest entry available
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 1067cda441f..8ae5a04a507 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -70,11 +70,14 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -98,6 +101,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.IntRange;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -4538,7 +4542,6 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ledger.close();
     }
 
-
     @Test
     public void testReadEntriesWithSkipDeletedEntries() throws Exception {
         @Cleanup
@@ -4795,5 +4798,58 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
     }
 
+    @Test
+    void testForceCursorRecovery() throws Exception {
+        ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new 
ManagedLedgerFactoryConfig();
+        TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
+        factory = new ManagedLedgerFactoryImpl(metadataStore, bk);
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setLedgerForceRecovery(true);
+        ManagedLedger ledger = factory.open("my_test_ledger", config);
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+        ledger.addEntry("entry-1".getBytes(Encoding));
+        long invalidLedger = -1L;
+        bk.setErrorCodeMap(invalidLedger, 
BKException.Code.BookieHandleNotAvailableException);
+        ManagedCursorInfo info = 
ManagedCursorInfo.newBuilder().setCursorsLedgerId(invalidLedger).build();
+        CountDownLatch latch = new CountDownLatch(1);
+        MutableBoolean recovered = new MutableBoolean(false);
+        VoidCallback callback = new VoidCallback() {
+            @Override
+            public void operationComplete() {
+                recovered.setValue(true);
+                latch.countDown();
+            }
+
+            @Override
+            public void operationFailed(ManagedLedgerException exception) {
+                recovered.setValue(false);
+                latch.countDown();
+            }
+        };
+        c1.recoverFromLedger(info, callback);
+        latch.await();
+        assertTrue(recovered.booleanValue());
+    }
+
+    class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
+        Map<Long, Integer> ledgerErrors = new HashMap<>();
+
+        public TestPulsarMockBookKeeper(OrderedExecutor orderedExecutor) 
throws Exception {
+            super(orderedExecutor);
+        }
+
+        public void setErrorCodeMap(long ledgerId, int rc) {
+            ledgerErrors.put(ledgerId, rc);
+        }
+
+        public void asyncOpenLedger(final long lId, final DigestType 
digestType, final byte[] passwd,
+                final OpenCallback cb, final Object ctx) {
+            if (ledgerErrors.containsKey(lId)) {
+                cb.openComplete(ledgerErrors.get(lId), null, ctx);
+            }
+            super.asyncOpenLedger(lId, digestType, passwd, cb, ctx);
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 33b4fbff5f5..58d6444e719 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2249,6 +2249,18 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             + " It helps when data-ledgers gets corrupted at bookkeeper and 
managed-cursor is stuck at that ledger."
     )
     private boolean autoSkipNonRecoverableData = false;
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_STORAGE_ML,
+        doc = "Skip managed ledger failure to forcefully recover managed 
ledger."
+    )
+    private boolean managedLedgerForceRecovery = false;
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_STORAGE_ML,
+        doc = "Skip schema ledger failure to forcefully recover topic 
successfully."
+    )
+    private boolean schemaLedgerForceRecovery = false;
     @FieldContext(
         category = CATEGORY_STORAGE_ML,
         doc = "operation timeout while updating managed-ledger metadata."
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bfa99eedcad..dd722dffcfb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1970,6 +1970,7 @@ public class BrokerService implements Closeable {
                     
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), 
TimeUnit.MINUTES);
             
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
             
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+            
managedLedgerConfig.setLedgerForceRecovery(serviceConfig.isManagedLedgerForceRecovery());
             
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
             managedLedgerConfig.setInactiveLedgerRollOverTime(
                     
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), 
TimeUnit.SECONDS);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 99f0249b304..85c8aa06458 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -528,7 +528,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
 
         return openLedger(position.getLedgerId())
             .thenCompose((ledger) ->
-                Functions.getLedgerEntry(ledger, position.getEntryId())
+                Functions.getLedgerEntry(ledger, position.getEntryId(), 
config.isSchemaLedgerForceRecovery())
                     .thenCompose(entry -> closeLedger(ledger)
                         .thenApply(ignore -> entry)
                     )
@@ -560,7 +560,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         ledgerHandle.asyncAddEntry(entry.toByteArray(),
             (rc, handle, entryId, ctx) -> {
                 if (rc != BKException.Code.OK) {
-                    future.completeExceptionally(bkException("Failed to add 
entry", rc, ledgerHandle.getId(), -1));
+                    future.completeExceptionally(bkException("Failed to add 
entry", rc, ledgerHandle.getId(), -1,
+                            config.isSchemaLedgerForceRecovery()));
                 } else {
                     future.complete(entryId);
                 }
@@ -582,7 +583,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                     LedgerPassword,
                     (rc, handle, ctx) -> {
                         if (rc != BKException.Code.OK) {
-                            future.completeExceptionally(bkException("Failed 
to create ledger", rc, -1, -1));
+                            future.completeExceptionally(bkException("Failed 
to create ledger", rc, -1, -1,
+                                    config.isSchemaLedgerForceRecovery()));
                         } else {
                             future.complete(handle);
                         }
@@ -603,7 +605,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             LedgerPassword,
             (rc, handle, ctx) -> {
                 if (rc != BKException.Code.OK) {
-                    future.completeExceptionally(bkException("Failed to open 
ledger", rc, ledgerId, -1));
+                    future.completeExceptionally(bkException("Failed to open 
ledger", rc, ledgerId, -1,
+                            config.isSchemaLedgerForceRecovery()));
                 } else {
                     future.complete(handle);
                 }
@@ -617,7 +620,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         CompletableFuture<Void> future = new CompletableFuture<>();
         ledgerHandle.asyncClose((rc, handle, ctx) -> {
             if (rc != BKException.Code.OK) {
-                future.completeExceptionally(bkException("Failed to close 
ledger", rc, ledgerHandle.getId(), -1));
+                future.completeExceptionally(bkException("Failed to close 
ledger", rc, ledgerHandle.getId(), -1,
+                        config.isSchemaLedgerForceRecovery()));
             } else {
                 future.complete(null);
             }
@@ -648,12 +652,14 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     }
 
     interface Functions {
-        static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle 
ledger, long entry) {
+        static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle 
ledger, long entry,
+                boolean forceRecovery) {
             final CompletableFuture<LedgerEntry> future = new 
CompletableFuture<>();
             ledger.asyncReadEntries(entry, entry,
                 (rc, handle, entries, ctx) -> {
                     if (rc != BKException.Code.OK) {
-                        future.completeExceptionally(bkException("Failed to 
read entry", rc, ledger.getId(), entry));
+                        future.completeExceptionally(bkException("Failed to 
read entry", rc, ledger.getId(), entry,
+                                forceRecovery));
                     } else {
                         future.complete(entries.nextElement());
                     }
@@ -700,7 +706,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         }
     }
 
-    public static Exception bkException(String operation, int rc, long 
ledgerId, long entryId) {
+    public static Exception bkException(String operation, int rc, long 
ledgerId, long entryId,
+            boolean forceRecovery) {
         String message = 
org.apache.bookkeeper.client.api.BKException.getMessage(rc)
                 + " -  ledger=" + ledgerId + " - operation=" + operation;
 
@@ -709,7 +716,10 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         }
         boolean recoverable = rc != 
BKException.Code.NoSuchLedgerExistsException
                 && rc != BKException.Code.NoSuchEntryException
-                && rc != 
BKException.Code.NoSuchLedgerExistsOnMetadataServerException;
+                && rc != 
BKException.Code.NoSuchLedgerExistsOnMetadataServerException
+                // if force-recovery is enabled then made it non-recoverable 
exception
+                // and force schema to skip this exception and recover 
immediately
+                && !forceRecovery;
         return new SchemaException(recoverable, message);
     }
 
@@ -732,4 +742,4 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             throw t instanceof CompletionException ? (CompletionException) t : 
new CompletionException(t);
         });
     }
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
index d0c2e149bf4..3653c01daec 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.schema;
 import java.nio.ByteBuffer;
 import org.apache.bookkeeper.client.api.BKException;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.testng.annotations.Test;
@@ -29,23 +30,29 @@ import static 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.bk
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 @Test(groups = "broker")
 public class BookkeeperSchemaStorageTest {
 
     @Test
     public void testBkException() {
-        Exception ex = bkException("test", BKException.Code.ReadException, 1, 
-1);
+        Exception ex = bkException("test", BKException.Code.ReadException, 1, 
-1, false);
         assertEquals("Error while reading ledger -  ledger=1 - 
operation=test", ex.getMessage());
-        ex = bkException("test", BKException.Code.ReadException, 1, 0);
+        ex = bkException("test", BKException.Code.ReadException, 1, 0, false);
         assertEquals("Error while reading ledger -  ledger=1 - operation=test 
- entry=0",
                 ex.getMessage());
-        ex = bkException("test", BKException.Code.QuorumException, 1, -1);
+        ex = bkException("test", BKException.Code.QuorumException, 1, -1, 
false);
         assertEquals("Invalid quorum size on ensemble size -  ledger=1 - 
operation=test",
                 ex.getMessage());
-        ex = bkException("test", BKException.Code.QuorumException, 1, 0);
+        ex = bkException("test", BKException.Code.QuorumException, 1, 0, 
false);
         assertEquals("Invalid quorum size on ensemble size -  ledger=1 - 
operation=test - entry=0",
                 ex.getMessage());
+        SchemaException sc = (SchemaException) bkException("test", 
BKException.Code.BookieHandleNotAvailableException, 1, 0, false);
+        assertTrue(sc.isRecoverable());
+        sc = (SchemaException) bkException("test", 
BKException.Code.BookieHandleNotAvailableException, 1, 0, true);
+        assertFalse(sc.isRecoverable());
     }
 
     @Test

Reply via email to