This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 47ef48e074 [fix] Fix data lost after when writing ledger and deleting
legder execute concurrency (#4462)
47ef48e074 is described below
commit 47ef48e074df42705bc0bf14bac5b3cc05c5f0c3
Author: fengyubiao <[email protected]>
AuthorDate: Fri Sep 13 09:01:55 2024 +0800
[fix] Fix data lost after when writing ledger and deleting legder execute
concurrency (#4462)
### Motivation
| step | `BK client 1` | `BK client 2` |
| --- | --- | --- |
| 1 | create ledger `1` |
| 2 | | open ledger `1` |
| 3 | | delete ledger `1` |
| 4 | write data to ledger `1` |
At the step `4`, the write should fail, but it succeeds. It leads users to
assume the data has been written, but it can not be read.
You can reproduce the issue by `testWriteAfterDeleted`
There is a scenario that will lead to Pulsar loss messages
- `broker-2` created a ledger
- `broker-2`'s ZK session is expired, which will lead the topic it owned to
be assigned to other brokers
- `broker-0` owned the topic again
- it will delete the last empty ledger
- consumers connected to `broker-0`
- producers connected to `broker-2`
- send messages to the topic
- on `broker-2`, the ledger can not be closed due to the ledger metadata
has been deleted
### Changes
Once the ledger is fenced, it can not be wrote anymore.
---
.../org/apache/bookkeeper/bookie/BookieImpl.java | 14 ++---
.../apache/bookkeeper/bookie/HandleFactory.java | 2 +-
.../bookkeeper/bookie/HandleFactoryImpl.java | 31 ++++++++++-
.../bookkeeper/bookie/BookieJournalTest.java | 4 +-
.../org/apache/bookkeeper/client/TestFencing.java | 65 ++++++++++++++++++++++
5 files changed, 105 insertions(+), 11 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index a69df4a176..a37d559c7c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -543,7 +543,7 @@ public class BookieImpl implements Bookie {
masterKeyCache.put(ledgerId, masterKey);
// Force to re-insert the master key in ledger
storage
- handles.getHandle(ledgerId, masterKey);
+ handles.getHandle(ledgerId, masterKey, true);
} else {
throw new IOException("Invalid journal. Contains
journalKey "
+ " but layout version (" + journalVersion
@@ -555,7 +555,7 @@ public class BookieImpl implements Bookie {
if (key == null) {
key = ledgerStorage.readMasterKey(ledgerId);
}
- LedgerDescriptor handle =
handles.getHandle(ledgerId, key);
+ LedgerDescriptor handle =
handles.getHandle(ledgerId, key, true);
handle.setFenced();
} else {
throw new IOException("Invalid journal. Contains
fenceKey "
@@ -573,7 +573,7 @@ public class BookieImpl implements Bookie {
if (key == null) {
key = ledgerStorage.readMasterKey(ledgerId);
}
- LedgerDescriptor handle =
handles.getHandle(ledgerId, key);
+ LedgerDescriptor handle =
handles.getHandle(ledgerId, key, true);
handle.setExplicitLac(explicitLacBuf);
} else {
throw new IOException("Invalid journal. Contains
explicitLAC " + " but layout version ("
@@ -596,7 +596,7 @@ public class BookieImpl implements Bookie {
if (key == null) {
key = ledgerStorage.readMasterKey(ledgerId);
}
- LedgerDescriptor handle = handles.getHandle(ledgerId,
key);
+ LedgerDescriptor handle = handles.getHandle(ledgerId,
key, true);
recBuff.rewind();
handle.addEntry(Unpooled.wrappedBuffer(recBuff));
@@ -933,7 +933,7 @@ public class BookieImpl implements Bookie {
throws IOException, BookieException {
final long ledgerId = entry.getLong(entry.readerIndex());
- return handles.getHandle(ledgerId, masterKey);
+ return handles.getHandle(ledgerId, masterKey, false);
}
private Journal getJournal(long ledgerId) {
@@ -1042,7 +1042,7 @@ public class BookieImpl implements Bookie {
ByteBuf explicitLACEntry = null;
try {
long ledgerId = entry.getLong(entry.readerIndex());
- LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
+ LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey,
false);
synchronized (handle) {
entry.markReaderIndex();
handle.setExplicitLac(entry);
@@ -1131,7 +1131,7 @@ public class BookieImpl implements Bookie {
*/
public CompletableFuture<Boolean> fenceLedger(long ledgerId, byte[]
masterKey)
throws IOException, BookieException {
- LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
+ LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey,
false);
return handle.fenceAndLogInJournal(getJournal(ledgerId));
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactory.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactory.java
index 22500b74cb..c81294d4db 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactory.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactory.java
@@ -24,7 +24,7 @@ package org.apache.bookkeeper.bookie;
import java.io.IOException;
interface HandleFactory {
- LedgerDescriptor getHandle(long ledgerId, byte[] masterKey)
+ LedgerDescriptor getHandle(long ledgerId, byte[] masterKey, boolean
journalReplay)
throws IOException, BookieException;
LedgerDescriptor getReadOnlyHandle(long ledgerId)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
index 3f643019c9..b331f12506 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
@@ -21,7 +21,10 @@
package org.apache.bookkeeper.bookie;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import java.io.IOException;
+import java.time.Duration;
import org.apache.bookkeeper.bookie.LedgerStorage.LedgerDeletionListener;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
@@ -29,6 +32,14 @@ class HandleFactoryImpl implements HandleFactory,
LedgerDeletionListener {
private final ConcurrentLongHashMap<LedgerDescriptor> ledgers;
private final ConcurrentLongHashMap<LedgerDescriptor> readOnlyLedgers;
+ /**
+ * Once the ledger was marked "fenced" before, the ledger was accessed by
multi clients. One client is calling
+ * "delete" now, and other clients may call "write" continuously later. We
mark these ledgers can not be written
+ * anymore. And maintains the state for 7 days is safety.
+ */
+ private final Cache<Long, Boolean> recentlyFencedAndDeletedLedgers =
CacheBuilder.newBuilder()
+ .expireAfterAccess(Duration.ofDays(7)).build();
+
final LedgerStorage ledgerStorage;
HandleFactoryImpl(LedgerStorage ledgerStorage) {
@@ -40,10 +51,14 @@ class HandleFactoryImpl implements HandleFactory,
LedgerDeletionListener {
}
@Override
- public LedgerDescriptor getHandle(final long ledgerId, final byte[]
masterKey) throws IOException, BookieException {
+ public LedgerDescriptor getHandle(final long ledgerId, final byte[]
masterKey, boolean journalReplay)
+ throws IOException, BookieException {
LedgerDescriptor handle = ledgers.get(ledgerId);
if (handle == null) {
+ if (!journalReplay &&
recentlyFencedAndDeletedLedgers.getIfPresent(ledgerId) != null) {
+ throw
BookieException.create(BookieException.Code.LedgerFencedException);
+ }
handle = LedgerDescriptor.create(masterKey, ledgerId,
ledgerStorage);
ledgers.putIfAbsent(ledgerId, handle);
}
@@ -64,8 +79,22 @@ class HandleFactoryImpl implements HandleFactory,
LedgerDeletionListener {
return handle;
}
+ private void markIfConflictWritingOccurs(long ledgerId) {
+ LedgerDescriptor ledgerDescriptor = ledgers.get(ledgerId);
+ try {
+ if (ledgerDescriptor != null && ledgerDescriptor.isFenced()) {
+ recentlyFencedAndDeletedLedgers.put(ledgerId, true);
+ }
+ } catch (IOException | BookieException ex) {
+ // The ledger is in limbo state.
+ recentlyFencedAndDeletedLedgers.put(ledgerId, true);
+ }
+ }
+
@Override
public void ledgerDeleted(long ledgerId) {
+ markIfConflictWritingOccurs(ledgerId);
+ // Do delete.
ledgers.remove(ledgerId);
readOnlyLedgers.remove(ledgerId);
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 21608a19b2..29632dfb17 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -452,7 +452,7 @@ public class BookieJournalTest {
} catch (Bookie.NoEntryException e) {
// correct behaviour
}
- assertTrue(b.handles.getHandle(1, "testPasswd".getBytes()).isFenced());
+ assertTrue(b.handles.getHandle(1, "testPasswd".getBytes(),
false).isFenced());
b.shutdown();
}
@@ -484,7 +484,7 @@ public class BookieJournalTest {
} catch (Bookie.NoEntryException e) {
// correct behavior
}
- assertTrue(b.handles.getHandle(1,
"testV5Journal".getBytes()).isFenced());
+ assertTrue(b.handles.getHandle(1, "testV5Journal".getBytes(),
false).isFenced());
b.shutdown();
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
index 34751525aa..77382b4ebd 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
@@ -26,10 +26,18 @@ import static org.junit.Assert.fail;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.SortedLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.awaitility.reflect.WhiteboxImpl;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +46,7 @@ import org.slf4j.LoggerFactory;
* This unit test tests ledger fencing.
*
*/
+@Slf4j
public class TestFencing extends BookKeeperClusterTestCase {
private static final Logger LOG =
LoggerFactory.getLogger(TestFencing.class);
@@ -77,6 +86,7 @@ public class TestFencing extends BookKeeperClusterTestCase {
fail("Should have thrown an exception when trying to write");
} catch (BKException.BKLedgerFencedException e) {
// correct behaviour
+ log.info("expected a fenced error", e);
}
/*
@@ -87,6 +97,61 @@ public class TestFencing extends BookKeeperClusterTestCase {
readlh.getLastAddConfirmed() ==
writelh.getLastAddConfirmed());
}
+ @Test
+ public void testWriteAfterDeleted() throws Exception {
+ LedgerHandle writeLedger;
+ writeLedger = bkc.createLedger(digestType, "password".getBytes());
+
+ String tmp = "BookKeeper is cool!";
+ for (int i = 0; i < 10; i++) {
+ long entryId = writeLedger.addEntry(tmp.getBytes());
+ LOG.info("entryId: {}", entryId);
+ }
+
+ // Fence and delete.
+ BookKeeperTestClient bkc2 = new BookKeeperTestClient(baseClientConf,
new TestStatsProvider());
+ LedgerHandle readLedger = bkc2.openLedger(writeLedger.getId(),
digestType, "password".getBytes());
+ bkc2.deleteLedger(readLedger.ledgerId);
+
+ // Waiting for GC.
+ for (ServerTester server : servers) {
+ triggerGC(server.getServer().getBookie());
+ }
+
+ try {
+ long entryId = writeLedger.addEntry(tmp.getBytes());
+ LOG.info("Not expected: entryId: {}", entryId);
+ LOG.error("Should have thrown an exception");
+ fail("Should have thrown an exception when trying to write");
+ } catch (BKException.BKLedgerFencedException e) {
+ log.info("expected a fenced error", e);
+ // correct behaviour
+ }
+
+ /*
+ * Check it has been recovered properly.
+ */
+ assertTrue("Has not recovered correctly: " +
readLedger.getLastAddConfirmed()
+ + " original " + writeLedger.getLastAddConfirmed(),
+ readLedger.getLastAddConfirmed() ==
writeLedger.getLastAddConfirmed());
+
+ // cleanup.
+ bkc2.close();
+ }
+
+ private void triggerGC(Bookie bookie) {
+ LedgerStorage ledgerStorage = bookie.getLedgerStorage();
+ if (ledgerStorage instanceof InterleavedLedgerStorage
+ || ledgerStorage instanceof SingleDirectoryDbLedgerStorage) {
+ Runnable gcThread = WhiteboxImpl.getInternalState(ledgerStorage,
"gcThread");
+ gcThread.run();
+ } else if (ledgerStorage instanceof SortedLedgerStorage) {
+ Object actLedgerStorage =
WhiteboxImpl.getInternalState(ledgerStorage, "interleavedLedgerStorage");
+ Runnable gcThread =
WhiteboxImpl.getInternalState(actLedgerStorage, "gcThread");
+ gcThread.run();
+ }
+ }
+
private static int threadCount = 0;
class LedgerOpenThread extends Thread {