This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new d2eab2a6095 [fix] [ml] topic load fail by ledger lost (#19444)
d2eab2a6095 is described below
commit d2eab2a6095f84e651dc0daef40a72aef0205ba5
Author: fengyubiao <[email protected]>
AuthorDate: Tue Feb 21 21:47:28 2023 +0800
[fix] [ml] topic load fail by ledger lost (#19444)
Makes only ledgers removed from the meta of ledger info can be deleted from
the BK.
(cherry picked from commit 3314d70231cbed89b6eefa2073ef8c048d84ec16)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 43 +++++---
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 116 ++++++++++++++++++++-
2 files changed, 141 insertions(+), 18 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 5fa8f04e6a2..a00a69b5b81 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -66,6 +66,7 @@ import
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import lombok.Getter;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
@@ -148,7 +149,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
protected final BookKeeper bookKeeper;
protected final String name;
private final Map<String, byte[]> ledgerMetadata;
- private final BookKeeper.DigestType digestType;
+ protected final BookKeeper.DigestType digestType;
protected ManagedLedgerConfig config;
protected Map<String, String> propertiesMap;
@@ -432,6 +433,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
// Calculate total entries and size
+ final List<Long> emptyLedgersToBeDeleted =
Collections.synchronizedList(new ArrayList<>());
Iterator<LedgerInfo> iterator = ledgers.values().iterator();
while (iterator.hasNext()) {
LedgerInfo li = iterator.next();
@@ -440,9 +442,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize());
} else {
iterator.remove();
- bookKeeper.asyncDeleteLedger(li.getLedgerId(), (rc, ctx) -> {
- log.info("[{}] Deleted empty ledger ledgerId={} rc={}",
name, li.getLedgerId(), rc);
- }, null);
+ emptyLedgersToBeDeleted.add(li.getLedgerId());
}
}
@@ -458,6 +458,11 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public void operationComplete(Void v, Stat stat) {
ledgersStat = stat;
+ emptyLedgersToBeDeleted.forEach(ledgerId -> {
+ bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
+ log.info("[{}] Deleted empty ledger ledgerId={}
rc={}", name, ledgerId, rc);
+ }, null);
+ });
initializeCursors(callback);
}
@@ -1443,16 +1448,17 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
log.debug("[{}] Updating of ledgers list after create
complete. version={}", name, stat);
}
ledgersStat = stat;
- ledgers.put(lh.getId(), newLedger);
- currentLedger = lh;
- currentLedgerEntries = 0;
- currentLedgerSize = 0;
- metadataMutex.unlock();
- updateLedgersIdsComplete(stat);
synchronized (ManagedLedgerImpl.this) {
-
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() -
lastLedgerCreationInitiationTimestamp,
- TimeUnit.MILLISECONDS);
+ LedgerHandle originalCurrentLedger = currentLedger;
+ ledgers.put(lh.getId(), newLedger);
+ currentLedger = lh;
+ currentLedgerEntries = 0;
+ currentLedgerSize = 0;
+ updateLedgersIdsComplete(originalCurrentLedger);
+
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
+ - lastLedgerCreationInitiationTimestamp,
TimeUnit.MILLISECONDS);
}
+ metadataMutex.unlock();
// May need to update the cursor position
maybeUpdateCursorBeforeTrimmingConsumedLedger();
@@ -1518,8 +1524,15 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, callback);
}
- public synchronized void updateLedgersIdsComplete(Stat stat) {
+ public synchronized void updateLedgersIdsComplete(@Nullable LedgerHandle
originalCurrentLedger) {
STATE_UPDATER.set(this, State.LedgerOpened);
+ // Delete original "currentLedger" if it has been removed from
"ledgers".
+ if (originalCurrentLedger != null &&
!ledgers.containsKey(originalCurrentLedger.getId())){
+ bookKeeper.asyncDeleteLedger(originalCurrentLedger.getId(), (rc,
ctx) -> {
+ mbean.endDataLedgerDeleteOp();
+ log.info("[{}] Delete complete for empty ledger {}. rc={}",
name, originalCurrentLedger.getId(), rc);
+ }, null);
+ }
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
if (log.isDebugEnabled()) {
@@ -1596,10 +1609,6 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// The last ledger was empty, so we can discard it
ledgers.remove(lh.getId());
mbean.startDataLedgerDeleteOp();
- bookKeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
- mbean.endDataLedgerDeleteOp();
- log.info("[{}] Delete complete for empty ledger {}. rc={}",
name, lh.getId(), rc);
- }, null);
}
trimConsumedLedgersInBackground();
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index c6ceecf8bc3..f484eb5670e 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -48,6 +48,7 @@ import java.lang.reflect.Field;
import java.nio.ReadOnlyBufferException;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -74,7 +75,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import lombok.Cleanup;
+import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -144,6 +147,117 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}
+ private void makeAddEntryTimeout(ManagedLedgerImpl ml, AtomicBoolean
addEntryFinished) throws Exception {
+ LedgerHandle currentLedger = ml.currentLedger;
+ final LedgerHandle spyLedgerHandle = spy(currentLedger);
+ doAnswer(invocation -> {
+ ByteBuf bs = (ByteBuf) invocation.getArguments()[0];
+ AddCallback addCallback = (AddCallback)
invocation.getArguments()[1];
+ Object originalContext = invocation.getArguments()[2];
+ currentLedger.asyncAddEntry(bs, (rc, lh, entryId, ctx) -> {
+ addEntryFinished.set(true);
+ addCallback.addComplete(BKException.Code.TimeoutException,
spyLedgerHandle, -1, ctx);
+ }, originalContext);
+ return null;
+ }).when(spyLedgerHandle).asyncAddEntry(any(ByteBuf.class),
any(AddCallback.class), any());
+ ml.currentLedger = spyLedgerHandle;
+ }
+
+ @Data
+ private static class DeleteLedgerInfo{
+ volatile boolean hasCalled;
+ volatile CompletableFuture<Void> future = new CompletableFuture<>();
+ }
+
+ private DeleteLedgerInfo makeDelayIfDoLedgerDelete(LedgerHandle ledger,
final AtomicBoolean signal,
+ BookKeeper
spyBookKeeper) {
+ DeleteLedgerInfo deleteLedgerInfo = new DeleteLedgerInfo();
+ doAnswer(invocation -> {
+ long ledgerId = (long) invocation.getArguments()[0];
+ AsyncCallback.DeleteCallback originalCb =
(AsyncCallback.DeleteCallback) invocation.getArguments()[1];
+ AsyncCallback.DeleteCallback cb = (rc, ctx) -> {
+ if (deleteLedgerInfo.hasCalled) {
+ deleteLedgerInfo.future.complete(null);
+ }
+ originalCb.deleteComplete(rc, ctx);
+ };
+ Object ctx = invocation.getArguments()[2];
+ if (ledgerId != ledger.getId()){
+ bkc.asyncDeleteLedger(ledgerId, originalCb, ctx);
+ } else {
+ deleteLedgerInfo.hasCalled = true;
+ new Thread(() -> {
+
Awaitility.await().atMost(Duration.ofSeconds(60)).until(signal::get);
+ bkc.asyncDeleteLedger(ledgerId, cb, ctx);
+ }).start();
+ }
+ return null;
+ }).when(spyBookKeeper).asyncDeleteLedger(any(long.class),
any(AsyncCallback.DeleteCallback.class), any());
+ return deleteLedgerInfo;
+ }
+
+ /***
+ * This test simulates the following problems that can occur when ZK
connections are unstable:
+ * - add entry timeout
+ * - write ZK fail when update ledger info of ML
+ * and verifies that ledger info of ML is still correct when the above
problems occur.
+ */
+ @Test
+ public void testLedgerInfoMetaCorrectIfAddEntryTimeOut() throws Exception {
+ String mlName = "testLedgerInfoMetaCorrectIfAddEntryTimeOut";
+ BookKeeper spyBookKeeper = spy(bkc);
+ ManagedLedgerFactoryImpl factory = new
ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper);
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
+
+ // Make add entry timeout(The data write was actually successful).
+ AtomicBoolean addEntryFinished = new AtomicBoolean(false);
+ makeAddEntryTimeout(ml, addEntryFinished);
+
+ // Make the update operation of ledger info failure when switch ledger.
+ metadataStore.failConditional(new
MetadataStoreException.BadVersionException(""), (opType, path) -> {
+ if (opType == FaultInjectionMetadataStore.OperationType.PUT &&
addEntryFinished.get()
+ &&
"/managed-ledgers/testLedgerInfoMetaCorrectIfAddEntryTimeOut".equals(path)) {
+ return true;
+ }
+ return false;
+ });
+
+ // Make delete ledger is delayed if delete is called.
+ AtomicBoolean deleteLedgerDelaySignal = new AtomicBoolean(false);
+ DeleteLedgerInfo deleteLedgerInfo =
+ makeDelayIfDoLedgerDelete(ml.currentLedger,
deleteLedgerDelaySignal, spyBookKeeper);
+
+ // Add one entry.
+ // - it will fail and trigger ledger switch(we mocked the error).
+ // - ledger switch will also fail(we mocked the error).
+ try {
+ ml.addEntry("1".getBytes(Charset.defaultCharset()));
+ fail("Expected the operation of add entry will fail by timeout or
ledger fenced.");
+ } catch (Exception e){
+ // expected ex.
+ }
+
+ // Reopen ML.
+ try {
+ ml.close();
+ fail("Expected the operation of ml close will fail by fenced
state.");
+ } catch (Exception e){
+ // expected ex.
+ }
+ ManagedLedgerImpl mlReopened = (ManagedLedgerImpl)
factory.open(mlName);
+ deleteLedgerDelaySignal.set(true);
+ if (deleteLedgerInfo.hasCalled){
+ deleteLedgerInfo.future.join();
+ }
+ mlReopened.close();
+
+ // verify: all ledgers in ledger info is worked.
+ for (long ledgerId : mlReopened.getLedgersInfo().keySet()){
+ LedgerHandle lh = bkc.openLedger(ledgerId, ml.digestType,
ml.getConfig().getPassword());
+ lh.close();
+ }
+ }
+
@Test
public void managedLedgerApi() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
@@ -3091,7 +3205,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ledger.pendingAddEntries.add(op);
}
- ledger.updateLedgersIdsComplete(mock(Stat.class));
+ ledger.updateLedgersIdsComplete(null);
for (int i = 0; i < 10; i++) {
OpAddEntry oldOp = oldOps.get(i);
if (i > 4) {