This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 38322a689b2 [improve][broker] PIP-327: Support force topic loading for
unrecoverable errors (#21759)
38322a689b2 is described below
commit 38322a689b205fa8e4233146a2f9136081f92f26
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Oct 3 21:58:43 2024 -0700
[improve][broker] PIP-327: Support force topic loading for unrecoverable
errors (#21759)
---
.../bookkeeper/mledger/ManagedLedgerConfig.java | 12 +++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 ++-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 58 +++++++++++++++++++++-
.../apache/pulsar/broker/ServiceConfiguration.java | 12 +++++
.../pulsar/broker/service/BrokerService.java | 1 +
.../service/schema/BookkeeperSchemaStorage.java | 30 +++++++----
.../schema/BookkeeperSchemaStorageTest.java | 15 ++++--
7 files changed, 117 insertions(+), 17 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index a24251450b4..7b28990f355 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -64,6 +64,7 @@ public class ManagedLedgerConfig {
private long retentionTimeMs = 0;
private long retentionSizeInMB = 0;
private boolean autoSkipNonRecoverableData;
+ private boolean ledgerForceRecovery;
private boolean lazyCursorRecovery = false;
private long metadataOperationsTimeoutSeconds = 60;
private long readEntryTimeoutSeconds = 120;
@@ -465,6 +466,17 @@ public class ManagedLedgerConfig {
this.autoSkipNonRecoverableData = skipNonRecoverableData;
}
+ /**
+ * Skip managed ledger failure to recover managed ledger forcefully.
+ */
+ public boolean isLedgerForceRecovery() {
+ return ledgerForceRecovery;
+ }
+
+ public void setLedgerForceRecovery(boolean ledgerForceRecovery) {
+ this.ledgerForceRecovery = ledgerForceRecovery;
+ }
+
/**
* @return max unacked message ranges that will be persisted and recovered.
*
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b39fd231cdc..f469b88cae8 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -182,6 +182,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// Wether the current cursorLedger is read-only or writable
private boolean isCursorLedgerReadOnly = true;
+ private boolean ledgerForceRecovery;
// Stat of the cursor z-node
// NOTE: Don't update cursorLedgerStat alone,
@@ -332,6 +333,7 @@ public class ManagedCursorImpl implements ManagedCursor {
markDeleteLimiter = null;
}
this.mbean = new ManagedCursorMXBeanImpl(this);
+ this.ledgerForceRecovery = getConfig().isLedgerForceRecovery();
}
private void updateCursorLedgerStat(ManagedCursorInfo cursorInfo, Stat
stat) {
@@ -547,7 +549,7 @@ public class ManagedCursorImpl implements ManagedCursor {
if (log.isInfoEnabled()) {
log.info("[{}] Opened ledger {} for cursor {}. rc={}",
ledger.getName(), ledgerId, name, rc);
}
- if (isBkErrorNotRecoverable(rc)) {
+ if (isBkErrorNotRecoverable(rc) || ledgerForceRecovery) {
log.error("[{}] Error opening metadata ledger {} for cursor
{}: {}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
// Rewind to oldest entry available
@@ -575,7 +577,7 @@ public class ManagedCursorImpl implements ManagedCursor {
if (log.isDebugEnabled()) {
log.debug("[{}} readComplete rc={} entryId={}",
ledger.getName(), rc1, lh1.getLastAddConfirmed());
}
- if (isBkErrorNotRecoverable(rc1)) {
+ if (isBkErrorNotRecoverable(rc1) || ledgerForceRecovery) {
log.error("[{}] Error reading from metadata ledger {} for
cursor {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 1067cda441f..8ae5a04a507 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -70,11 +70,14 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Cleanup;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -98,6 +101,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.util.FutureUtil;
@@ -4538,7 +4542,6 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ledger.close();
}
-
@Test
public void testReadEntriesWithSkipDeletedEntries() throws Exception {
@Cleanup
@@ -4795,5 +4798,58 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
}
+ @Test
+ void testForceCursorRecovery() throws Exception {
+ ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new
ManagedLedgerFactoryConfig();
+ TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
+ factory = new ManagedLedgerFactoryImpl(metadataStore, bk);
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setLedgerForceRecovery(true);
+ ManagedLedger ledger = factory.open("my_test_ledger", config);
+ ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+ ledger.addEntry("entry-1".getBytes(Encoding));
+ long invalidLedger = -1L;
+ bk.setErrorCodeMap(invalidLedger,
BKException.Code.BookieHandleNotAvailableException);
+ ManagedCursorInfo info =
ManagedCursorInfo.newBuilder().setCursorsLedgerId(invalidLedger).build();
+ CountDownLatch latch = new CountDownLatch(1);
+ MutableBoolean recovered = new MutableBoolean(false);
+ VoidCallback callback = new VoidCallback() {
+ @Override
+ public void operationComplete() {
+ recovered.setValue(true);
+ latch.countDown();
+ }
+
+ @Override
+ public void operationFailed(ManagedLedgerException exception) {
+ recovered.setValue(false);
+ latch.countDown();
+ }
+ };
+ c1.recoverFromLedger(info, callback);
+ latch.await();
+ assertTrue(recovered.booleanValue());
+ }
+
+ class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
+ Map<Long, Integer> ledgerErrors = new HashMap<>();
+
+ public TestPulsarMockBookKeeper(OrderedExecutor orderedExecutor)
throws Exception {
+ super(orderedExecutor);
+ }
+
+ public void setErrorCodeMap(long ledgerId, int rc) {
+ ledgerErrors.put(ledgerId, rc);
+ }
+
+ public void asyncOpenLedger(final long lId, final DigestType
digestType, final byte[] passwd,
+ final OpenCallback cb, final Object ctx) {
+ if (ledgerErrors.containsKey(lId)) {
+ cb.openComplete(ledgerErrors.get(lId), null, ctx);
+ }
+ super.asyncOpenLedger(lId, digestType, passwd, cb, ctx);
+ }
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ManagedCursorTest.class);
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 33b4fbff5f5..58d6444e719 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2249,6 +2249,18 @@ public class ServiceConfiguration implements
PulsarConfiguration {
+ " It helps when data-ledgers gets corrupted at bookkeeper and
managed-cursor is stuck at that ledger."
)
private boolean autoSkipNonRecoverableData = false;
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Skip managed ledger failure to forcefully recover managed
ledger."
+ )
+ private boolean managedLedgerForceRecovery = false;
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Skip schema ledger failure to forcefully recover topic
successfully."
+ )
+ private boolean schemaLedgerForceRecovery = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "operation timeout while updating managed-ledger metadata."
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bfa99eedcad..dd722dffcfb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1970,6 +1970,7 @@ public class BrokerService implements Closeable {
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(),
TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+
managedLedgerConfig.setLedgerForceRecovery(serviceConfig.isManagedLedgerForceRecovery());
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
managedLedgerConfig.setInactiveLedgerRollOverTime(
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(),
TimeUnit.SECONDS);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 99f0249b304..85c8aa06458 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -528,7 +528,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
return openLedger(position.getLedgerId())
.thenCompose((ledger) ->
- Functions.getLedgerEntry(ledger, position.getEntryId())
+ Functions.getLedgerEntry(ledger, position.getEntryId(),
config.isSchemaLedgerForceRecovery())
.thenCompose(entry -> closeLedger(ledger)
.thenApply(ignore -> entry)
)
@@ -560,7 +560,8 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
ledgerHandle.asyncAddEntry(entry.toByteArray(),
(rc, handle, entryId, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Failed to add
entry", rc, ledgerHandle.getId(), -1));
+ future.completeExceptionally(bkException("Failed to add
entry", rc, ledgerHandle.getId(), -1,
+ config.isSchemaLedgerForceRecovery()));
} else {
future.complete(entryId);
}
@@ -582,7 +583,8 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Failed
to create ledger", rc, -1, -1));
+ future.completeExceptionally(bkException("Failed
to create ledger", rc, -1, -1,
+ config.isSchemaLedgerForceRecovery()));
} else {
future.complete(handle);
}
@@ -603,7 +605,8 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Failed to open
ledger", rc, ledgerId, -1));
+ future.completeExceptionally(bkException("Failed to open
ledger", rc, ledgerId, -1,
+ config.isSchemaLedgerForceRecovery()));
} else {
future.complete(handle);
}
@@ -617,7 +620,8 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
CompletableFuture<Void> future = new CompletableFuture<>();
ledgerHandle.asyncClose((rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Failed to close
ledger", rc, ledgerHandle.getId(), -1));
+ future.completeExceptionally(bkException("Failed to close
ledger", rc, ledgerHandle.getId(), -1,
+ config.isSchemaLedgerForceRecovery()));
} else {
future.complete(null);
}
@@ -648,12 +652,14 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
}
interface Functions {
- static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle
ledger, long entry) {
+ static CompletableFuture<LedgerEntry> getLedgerEntry(LedgerHandle
ledger, long entry,
+ boolean forceRecovery) {
final CompletableFuture<LedgerEntry> future = new
CompletableFuture<>();
ledger.asyncReadEntries(entry, entry,
(rc, handle, entries, ctx) -> {
if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Failed to
read entry", rc, ledger.getId(), entry));
+ future.completeExceptionally(bkException("Failed to
read entry", rc, ledger.getId(), entry,
+ forceRecovery));
} else {
future.complete(entries.nextElement());
}
@@ -700,7 +706,8 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
}
}
- public static Exception bkException(String operation, int rc, long
ledgerId, long entryId) {
+ public static Exception bkException(String operation, int rc, long
ledgerId, long entryId,
+ boolean forceRecovery) {
String message =
org.apache.bookkeeper.client.api.BKException.getMessage(rc)
+ " - ledger=" + ledgerId + " - operation=" + operation;
@@ -709,7 +716,10 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
}
boolean recoverable = rc !=
BKException.Code.NoSuchLedgerExistsException
&& rc != BKException.Code.NoSuchEntryException
- && rc !=
BKException.Code.NoSuchLedgerExistsOnMetadataServerException;
+ && rc !=
BKException.Code.NoSuchLedgerExistsOnMetadataServerException
+ // if force-recovery is enabled then made it non-recoverable
exception
+ // and force schema to skip this exception and recover
immediately
+ && !forceRecovery;
return new SchemaException(recoverable, message);
}
@@ -732,4 +742,4 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
throw t instanceof CompletionException ? (CompletionException) t :
new CompletionException(t);
});
}
-}
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
index d0c2e149bf4..3653c01daec 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.schema;
import java.nio.ByteBuffer;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.annotations.Test;
@@ -29,23 +30,29 @@ import static
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.bk
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
@Test(groups = "broker")
public class BookkeeperSchemaStorageTest {
@Test
public void testBkException() {
- Exception ex = bkException("test", BKException.Code.ReadException, 1,
-1);
+ Exception ex = bkException("test", BKException.Code.ReadException, 1,
-1, false);
assertEquals("Error while reading ledger - ledger=1 -
operation=test", ex.getMessage());
- ex = bkException("test", BKException.Code.ReadException, 1, 0);
+ ex = bkException("test", BKException.Code.ReadException, 1, 0, false);
assertEquals("Error while reading ledger - ledger=1 - operation=test
- entry=0",
ex.getMessage());
- ex = bkException("test", BKException.Code.QuorumException, 1, -1);
+ ex = bkException("test", BKException.Code.QuorumException, 1, -1,
false);
assertEquals("Invalid quorum size on ensemble size - ledger=1 -
operation=test",
ex.getMessage());
- ex = bkException("test", BKException.Code.QuorumException, 1, 0);
+ ex = bkException("test", BKException.Code.QuorumException, 1, 0,
false);
assertEquals("Invalid quorum size on ensemble size - ledger=1 -
operation=test - entry=0",
ex.getMessage());
+ SchemaException sc = (SchemaException) bkException("test",
BKException.Code.BookieHandleNotAvailableException, 1, 0, false);
+ assertTrue(sc.isRecoverable());
+ sc = (SchemaException) bkException("test",
BKException.Code.BookieHandleNotAvailableException, 1, 0, true);
+ assertFalse(sc.isRecoverable());
}
@Test