This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 47ef48e074 [fix] Fix data lost after when writing ledger and deleting 
legder execute concurrency (#4462)
47ef48e074 is described below

commit 47ef48e074df42705bc0bf14bac5b3cc05c5f0c3
Author: fengyubiao <[email protected]>
AuthorDate: Fri Sep 13 09:01:55 2024 +0800

    [fix] Fix data lost after when writing ledger and deleting legder execute 
concurrency (#4462)
    
    ### Motivation
    
    | step | `BK client 1` | `BK client 2` |
    | --- | --- | --- |
    | 1 | create ledger `1` |
    | 2 | | open ledger `1` |
    | 3 | | delete ledger `1` |
    | 4 | write data to ledger `1` |
    
    At the step `4`, the write should fail, but it succeeds. It leads users to 
assume the data has been written, but it can not be read.
    
    You can reproduce the issue by `testWriteAfterDeleted`
    
    There is a scenario that will lead to Pulsar loss messages
    
    - `broker-2` created a ledger
    - `broker-2`'s ZK session is expired, which will lead the topic it owned to 
be assigned to other brokers
    - `broker-0` owned the topic again
      - it will delete the last empty ledger
    - consumers connected to `broker-0`
    - producers connected to `broker-2`
      - send messages to the topic
    - on `broker-2`, the ledger can not be closed due to the ledger metadata 
has been deleted
    
    ### Changes
    
    Once the ledger is fenced, it can not be wrote anymore.
---
 .../org/apache/bookkeeper/bookie/BookieImpl.java   | 14 ++---
 .../apache/bookkeeper/bookie/HandleFactory.java    |  2 +-
 .../bookkeeper/bookie/HandleFactoryImpl.java       | 31 ++++++++++-
 .../bookkeeper/bookie/BookieJournalTest.java       |  4 +-
 .../org/apache/bookkeeper/client/TestFencing.java  | 65 ++++++++++++++++++++++
 5 files changed, 105 insertions(+), 11 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index a69df4a176..a37d559c7c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -543,7 +543,7 @@ public class BookieImpl implements Bookie {
                             masterKeyCache.put(ledgerId, masterKey);
 
                             // Force to re-insert the master key in ledger 
storage
-                            handles.getHandle(ledgerId, masterKey);
+                            handles.getHandle(ledgerId, masterKey, true);
                         } else {
                             throw new IOException("Invalid journal. Contains 
journalKey "
                                     + " but layout version (" + journalVersion
@@ -555,7 +555,7 @@ public class BookieImpl implements Bookie {
                             if (key == null) {
                                 key = ledgerStorage.readMasterKey(ledgerId);
                             }
-                            LedgerDescriptor handle = 
handles.getHandle(ledgerId, key);
+                            LedgerDescriptor handle = 
handles.getHandle(ledgerId, key, true);
                             handle.setFenced();
                         } else {
                             throw new IOException("Invalid journal. Contains 
fenceKey "
@@ -573,7 +573,7 @@ public class BookieImpl implements Bookie {
                             if (key == null) {
                                 key = ledgerStorage.readMasterKey(ledgerId);
                             }
-                            LedgerDescriptor handle = 
handles.getHandle(ledgerId, key);
+                            LedgerDescriptor handle = 
handles.getHandle(ledgerId, key, true);
                             handle.setExplicitLac(explicitLacBuf);
                         } else {
                             throw new IOException("Invalid journal. Contains 
explicitLAC " + " but layout version ("
@@ -596,7 +596,7 @@ public class BookieImpl implements Bookie {
                         if (key == null) {
                             key = ledgerStorage.readMasterKey(ledgerId);
                         }
-                        LedgerDescriptor handle = handles.getHandle(ledgerId, 
key);
+                        LedgerDescriptor handle = handles.getHandle(ledgerId, 
key, true);
 
                         recBuff.rewind();
                         handle.addEntry(Unpooled.wrappedBuffer(recBuff));
@@ -933,7 +933,7 @@ public class BookieImpl implements Bookie {
             throws IOException, BookieException {
         final long ledgerId = entry.getLong(entry.readerIndex());
 
-        return handles.getHandle(ledgerId, masterKey);
+        return handles.getHandle(ledgerId, masterKey, false);
     }
 
     private Journal getJournal(long ledgerId) {
@@ -1042,7 +1042,7 @@ public class BookieImpl implements Bookie {
         ByteBuf explicitLACEntry = null;
         try {
             long ledgerId = entry.getLong(entry.readerIndex());
-            LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
+            LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey, 
false);
             synchronized (handle) {
                 entry.markReaderIndex();
                 handle.setExplicitLac(entry);
@@ -1131,7 +1131,7 @@ public class BookieImpl implements Bookie {
      */
     public CompletableFuture<Boolean> fenceLedger(long ledgerId, byte[] 
masterKey)
             throws IOException, BookieException {
-        LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
+        LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey, 
false);
         return handle.fenceAndLogInJournal(getJournal(ledgerId));
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactory.java
index 22500b74cb..c81294d4db 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactory.java
@@ -24,7 +24,7 @@ package org.apache.bookkeeper.bookie;
 import java.io.IOException;
 
 interface HandleFactory {
-    LedgerDescriptor getHandle(long ledgerId, byte[] masterKey)
+    LedgerDescriptor getHandle(long ledgerId, byte[] masterKey, boolean 
journalReplay)
             throws IOException, BookieException;
 
     LedgerDescriptor getReadOnlyHandle(long ledgerId)
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
index 3f643019c9..b331f12506 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java
@@ -21,7 +21,10 @@
 
 package org.apache.bookkeeper.bookie;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import java.io.IOException;
+import java.time.Duration;
 import org.apache.bookkeeper.bookie.LedgerStorage.LedgerDeletionListener;
 import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
 
@@ -29,6 +32,14 @@ class HandleFactoryImpl implements HandleFactory, 
LedgerDeletionListener {
     private final ConcurrentLongHashMap<LedgerDescriptor> ledgers;
     private final ConcurrentLongHashMap<LedgerDescriptor> readOnlyLedgers;
 
+    /**
+     * Once the ledger was marked "fenced" before, the ledger was accessed by 
multi clients. One client is calling
+     * "delete" now, and other clients may call "write" continuously later. We 
mark these ledgers can not be written
+     * anymore. And maintains the state for 7 days is safety.
+     */
+    private final Cache<Long, Boolean> recentlyFencedAndDeletedLedgers = 
CacheBuilder.newBuilder()
+            .expireAfterAccess(Duration.ofDays(7)).build();
+
     final LedgerStorage ledgerStorage;
 
     HandleFactoryImpl(LedgerStorage ledgerStorage) {
@@ -40,10 +51,14 @@ class HandleFactoryImpl implements HandleFactory, 
LedgerDeletionListener {
     }
 
     @Override
-    public LedgerDescriptor getHandle(final long ledgerId, final byte[] 
masterKey) throws IOException, BookieException {
+    public LedgerDescriptor getHandle(final long ledgerId, final byte[] 
masterKey, boolean journalReplay)
+            throws IOException, BookieException {
         LedgerDescriptor handle = ledgers.get(ledgerId);
 
         if (handle == null) {
+            if (!journalReplay && 
recentlyFencedAndDeletedLedgers.getIfPresent(ledgerId) != null) {
+                throw 
BookieException.create(BookieException.Code.LedgerFencedException);
+            }
             handle = LedgerDescriptor.create(masterKey, ledgerId, 
ledgerStorage);
             ledgers.putIfAbsent(ledgerId, handle);
         }
@@ -64,8 +79,22 @@ class HandleFactoryImpl implements HandleFactory, 
LedgerDeletionListener {
         return handle;
     }
 
+    private void markIfConflictWritingOccurs(long ledgerId) {
+        LedgerDescriptor ledgerDescriptor = ledgers.get(ledgerId);
+        try {
+            if (ledgerDescriptor != null && ledgerDescriptor.isFenced()) {
+                recentlyFencedAndDeletedLedgers.put(ledgerId, true);
+            }
+        } catch (IOException | BookieException ex) {
+            // The ledger is in limbo state.
+            recentlyFencedAndDeletedLedgers.put(ledgerId, true);
+        }
+    }
+
     @Override
     public void ledgerDeleted(long ledgerId) {
+        markIfConflictWritingOccurs(ledgerId);
+        // Do delete.
         ledgers.remove(ledgerId);
         readOnlyLedgers.remove(ledgerId);
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 21608a19b2..29632dfb17 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -452,7 +452,7 @@ public class BookieJournalTest {
         } catch (Bookie.NoEntryException e) {
             // correct behaviour
         }
-        assertTrue(b.handles.getHandle(1, "testPasswd".getBytes()).isFenced());
+        assertTrue(b.handles.getHandle(1, "testPasswd".getBytes(), 
false).isFenced());
 
         b.shutdown();
     }
@@ -484,7 +484,7 @@ public class BookieJournalTest {
         } catch (Bookie.NoEntryException e) {
             // correct behavior
         }
-        assertTrue(b.handles.getHandle(1, 
"testV5Journal".getBytes()).isFenced());
+        assertTrue(b.handles.getHandle(1, "testV5Journal".getBytes(), 
false).isFenced());
 
         b.shutdown();
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
index 34751525aa..77382b4ebd 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
@@ -26,10 +26,18 @@ import static org.junit.Assert.fail;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.SortedLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +46,7 @@ import org.slf4j.LoggerFactory;
  * This unit test tests ledger fencing.
  *
  */
+@Slf4j
 public class TestFencing extends BookKeeperClusterTestCase {
     private static final Logger LOG = 
LoggerFactory.getLogger(TestFencing.class);
 
@@ -77,6 +86,7 @@ public class TestFencing extends BookKeeperClusterTestCase {
             fail("Should have thrown an exception when trying to write");
         } catch (BKException.BKLedgerFencedException e) {
             // correct behaviour
+            log.info("expected a fenced error", e);
         }
 
         /*
@@ -87,6 +97,61 @@ public class TestFencing extends BookKeeperClusterTestCase {
                    readlh.getLastAddConfirmed() == 
writelh.getLastAddConfirmed());
     }
 
+    @Test
+    public void testWriteAfterDeleted() throws Exception {
+        LedgerHandle writeLedger;
+        writeLedger = bkc.createLedger(digestType, "password".getBytes());
+
+        String tmp = "BookKeeper is cool!";
+        for (int i = 0; i < 10; i++) {
+            long entryId = writeLedger.addEntry(tmp.getBytes());
+            LOG.info("entryId: {}", entryId);
+        }
+
+        // Fence and delete.
+        BookKeeperTestClient bkc2 = new BookKeeperTestClient(baseClientConf, 
new TestStatsProvider());
+        LedgerHandle readLedger = bkc2.openLedger(writeLedger.getId(), 
digestType, "password".getBytes());
+        bkc2.deleteLedger(readLedger.ledgerId);
+
+        // Waiting for GC.
+        for (ServerTester server : servers) {
+            triggerGC(server.getServer().getBookie());
+        }
+
+        try {
+            long entryId = writeLedger.addEntry(tmp.getBytes());
+            LOG.info("Not expected: entryId: {}", entryId);
+            LOG.error("Should have thrown an exception");
+            fail("Should have thrown an exception when trying to write");
+        } catch (BKException.BKLedgerFencedException e) {
+            log.info("expected a fenced error", e);
+            // correct behaviour
+        }
+
+        /*
+         * Check it has been recovered properly.
+         */
+        assertTrue("Has not recovered correctly: " + 
readLedger.getLastAddConfirmed()
+                   + " original " + writeLedger.getLastAddConfirmed(),
+                   readLedger.getLastAddConfirmed() == 
writeLedger.getLastAddConfirmed());
+
+        // cleanup.
+        bkc2.close();
+    }
+
+    private void triggerGC(Bookie bookie) {
+        LedgerStorage ledgerStorage = bookie.getLedgerStorage();
+        if (ledgerStorage instanceof InterleavedLedgerStorage
+                || ledgerStorage instanceof SingleDirectoryDbLedgerStorage) {
+            Runnable gcThread = WhiteboxImpl.getInternalState(ledgerStorage, 
"gcThread");
+            gcThread.run();
+        } else if (ledgerStorage instanceof SortedLedgerStorage) {
+            Object actLedgerStorage = 
WhiteboxImpl.getInternalState(ledgerStorage, "interleavedLedgerStorage");
+            Runnable gcThread = 
WhiteboxImpl.getInternalState(actLedgerStorage, "gcThread");
+            gcThread.run();
+        }
+    }
+
     private static int threadCount = 0;
 
     class LedgerOpenThread extends Thread {

Reply via email to