This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 796dd629e3cd3ec35a71fad2da3db71328798089 Author: Andrey Yegorov <[email protected]> AuthorDate: Tue Aug 2 21:37:46 2022 -0700 [fix][broker] PulsarLedgerManager to pass correct error code to BK client (#16857) (cherry picked from commit 2e8bd3d7b17190d6fb45d5b35eff598948975385) --- .../mledger/impl/ManagedLedgerFactoryImpl.java | 29 +++- .../broker/service/BrokerBkEnsemblesTests.java | 150 ++++++++++++++++++++- .../metadata/bookkeeper/PulsarLedgerManager.java | 65 +++++++-- 3 files changed, 231 insertions(+), 13 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index d829efad87f..e437bdaa47c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -883,7 +883,19 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { DeleteLedgerCallback callback, Object ctx) { Futures.waitForAll(info.ledgers.stream() .filter(li -> !li.isOffloaded) - .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()) + .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute() + .handle((result, ex) -> { + if (ex != null) { + int rc = BKException.getExceptionCode(ex); + if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException + || rc == BKException.Code.NoSuchLedgerExistsException) { + log.info("Ledger {} does not exist, ignoring", li.ledgerId); + return null; + } + throw new CompletionException(ex); + } + return result; + })) .collect(Collectors.toList())) .thenRun(() -> { // Delete the metadata @@ -911,7 +923,20 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { // Delete the cursor ledger if present if (cursor.cursorsLedgerId != -1) { - cursorLedgerDeleteFuture = bkc.newDeleteLedgerOp().withLedgerId(cursor.cursorsLedgerId).execute(); + cursorLedgerDeleteFuture = bkc.newDeleteLedgerOp().withLedgerId(cursor.cursorsLedgerId) + .execute() + .handle((result, ex) -> { + if (ex != null) { + int rc = BKException.getExceptionCode(ex); + if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException + || rc == BKException.Code.NoSuchLedgerExistsException) { + log.info("Ledger {} does not exist, ignoring", cursor.cursorsLedgerId); + return null; + } + throw new CompletionException(ex); + } + return result; + }); } else { cursorLedgerDeleteFuture = CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index aa63b224a9d..d9ea5355dd3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.fail; import java.lang.reflect.Field; @@ -31,9 +32,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Sets; import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; @@ -47,11 +52,13 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.zookeeper.ZooKeeper; import org.testng.Assert; import org.testng.annotations.Test; @Test(groups = "broker") +@Slf4j public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase { public BrokerBkEnsemblesTests() { @@ -235,7 +242,7 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase { try { bookKeeper.deleteLedger(entry.getKey()); } catch (Exception e) { - e.printStackTrace(); + log.warn("failed to delete ledger {}", entry.getKey(), e); } } }); @@ -276,6 +283,147 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase { consumer.close(); } + @Test + public void testTruncateCorruptDataLedger() throws Exception { + // Ensure intended state for autoSkipNonRecoverableData + admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false"); + + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getWebServiceAddress()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + + final int totalMessages = 100; + final int totalDataLedgers = 5; + final int entriesPerLedger = totalMessages / totalDataLedgers; + + final String tenant = "prop"; + try { + admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("role1", "role2"), + Sets.newHashSet(config.getClusterName()))); + } catch (Exception e) { + + } + final String ns1 = tenant + "/crash-broker"; + try { + admin.namespaces().createNamespace(ns1, Sets.newHashSet(config.getClusterName())); + } catch (Exception e) { + + } + + final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis(); + + // Create subscription + Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name") + .receiverQueueSize(5).subscribe(); + + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger(); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); + Field configField = ManagedCursorImpl.class.getDeclaredField("config"); + configField.setAccessible(true); + // Create multiple data-ledger + ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor); + config.setMaxEntriesPerLedger(entriesPerLedger); + config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + // bookkeeper client + Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper"); + bookKeeperField.setAccessible(true); + // Create multiple data-ledger + BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml); + + // (1) publish messages in 10 data-ledgers each with 20 entries under managed-ledger + Producer<byte[]> producer = client.newProducer().topic(topic1).create(); + for (int i = 0; i < totalMessages; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + // validate: consumer is able to consume msg and close consumer after reading 1 entry + Assert.assertNotNull(consumer.receive(1, TimeUnit.SECONDS)); + consumer.close(); + + NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo(); + Assert.assertEquals(ledgerInfo.size(), totalDataLedgers); + Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry(); + long firstLedgerToDelete = lastLedger.getKey(); + + // (2) delete first 4 data-ledgers + ledgerInfo.entrySet().forEach(entry -> { + if (!entry.equals(lastLedger)) { + assertEquals(entry.getValue().getEntries(), entriesPerLedger); + try { + bookKeeper.deleteLedger(entry.getKey()); + } catch (Exception e) { + log.warn("failed to delete ledger {}", entry.getKey(), e); + } + } + }); + + // create 5 more ledgers + for (int i = 0; i < totalMessages; i++) { + String message = "my-message2-" + i; + producer.send(message.getBytes()); + } + + ml.delete(); + + // Admin should be able to truncate the topic + admin.topics().truncate(topic1); + + ledgerInfo.entrySet().forEach(entry -> { + log.warn("found ledger: {}", entry.getKey()); + assertNotEquals(firstLedgerToDelete, entry.getKey()); + }); + + // Currently, ledger deletion is async and failed deletion + // does not actually fail truncation but logs an exception + // and creates scheduled task to retry + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + LedgerMetadata meta = bookKeeper + .getLedgerMetadata(firstLedgerToDelete) + .exceptionally(e -> null) + .get(); + assertEquals(null, meta, "ledger should be deleted " + firstLedgerToDelete); + }); + + // Should not throw, deleting absent ledger must be a noop + // unless PulsarManager returned a wrong error which + // got translated to BKUnexpectedConditionException + try { + bookKeeper.deleteLedger(firstLedgerToDelete); + } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException bke) { + // pass + } + + producer.close(); + consumer.close(); + } + + @Test + public void testDeleteLedgerFactoryCorruptLedger() throws Exception { + ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("test"); + + // bookkeeper client + Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper"); + bookKeeperField.setAccessible(true); + // Create multiple data-ledger + BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml); + + ml.addEntry("dummy-entry-1".getBytes()); + + NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo(); + long lastLedger = ledgerInfo.lastEntry().getKey(); + + ml.close(); + bookKeeper.deleteLedger(lastLedger); + + // BK ledger is deleted, factory should not throw on delete + factory.delete("test"); + } + @Test(timeOut = 20000) public void testTopicWithWildCardChar() throws Exception { @Cleanup diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java index 5d5854743b2..1d05f5726a5 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java @@ -24,8 +24,10 @@ import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -89,6 +91,32 @@ public class PulsarLedgerManager implements LedgerManager { store.registerListener(this::handleDataNotification); } + private static Throwable mapToBkException(Throwable ex) { + if (ex instanceof CompletionException || ex instanceof ExecutionException) { + return mapToBkException(ex.getCause()); + } + + if (ex instanceof MetadataStoreException.NotFoundException) { + BKException bke = BKException.create(BKException.Code.NoSuchLedgerExistsOnMetadataServerException); + bke.initCause(ex); + return bke; + } else if (ex instanceof MetadataStoreException.AlreadyExistsException) { + BKException bke = BKException.create(BKException.Code.LedgerExistException); + bke.initCause(ex); + return bke; + } else if (ex instanceof MetadataStoreException.BadVersionException) { + BKException bke = BKException.create(BKException.Code.MetadataVersionException); + bke.initCause(ex); + return bke; + } else if (ex instanceof MetadataStoreException.AlreadyClosedException) { + BKException bke = BKException.create(BKException.Code.LedgerClosedException); + bke.initCause(ex); + return bke; + } + + return ex; + } + @Override public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long ledgerId, LedgerMetadata inputMetadata) { @@ -106,14 +134,21 @@ public class PulsarLedgerManager implements LedgerManager { return FutureUtil.failedFuture(new BKException.BKMetadataSerializationException(ioe)); } - CompletableFuture<Versioned<LedgerMetadata>> future = store.put(getLedgerPath(ledgerId), data, Optional.of(-1L)) - .thenApply(stat -> new Versioned(metadata, new LongVersion(stat.getVersion()))); - future.exceptionally(ex -> { - log.error("Failed to create ledger {}: {}", ledgerId, ex.getMessage()); - return null; - }); + CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>(); - return future; + store.put(getLedgerPath(ledgerId), data, Optional.of(-1L)) + .whenComplete((stat, ex) -> { + if (ex != null) { + log.error("Failed to create ledger {}: {}", ledgerId, ex.getMessage()); + promise.completeExceptionally(mapToBkException(ex)); + return; + } + + Versioned<LedgerMetadata> result = new Versioned(metadata, new LongVersion(stat.getVersion())); + promise.complete(result); + }); + + return promise; } @Override @@ -131,9 +166,17 @@ public class PulsarLedgerManager implements LedgerManager { } } - return store.delete(getLedgerPath(ledgerId), existingVersion) - .thenRun(() -> { - // removed listener on ledgerId + CompletableFuture<Void> promise = new CompletableFuture<>(); + store.delete(getLedgerPath(ledgerId), existingVersion) + .whenComplete((result, ex) -> { + if (ex != null) { + log.error("Failed to remove ledger metadata {}: {}", ledgerId, ex.getMessage()); + promise.completeExceptionally(mapToBkException(ex)); + return; + } + + promise.complete(result); + // remove listener on ledgerId Set<BookkeeperInternalCallbacks.LedgerMetadataListener> listenerSet = listeners.remove(ledgerId); if (null != listenerSet) { if (log.isDebugEnabled()) { @@ -148,6 +191,8 @@ public class PulsarLedgerManager implements LedgerManager { } } }); + + return promise; } @Override
