athanatos closed pull request #1768: ISSUE-1757: prevent race between flush and 
delete from recreating index
URL: https://github.com/apache/bookkeeper/pull/1768
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index a5ddacf0ff..c38bb3795d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -101,6 +101,8 @@
     // this FileInfo Header Version
     int headerVersion;
 
+    private boolean deleted;
+
     public FileInfo(File lf, byte[] masterKey, int fileInfoVersionToWrite) 
throws IOException {
         super(WATCHER_RECYCLER);
 
@@ -108,6 +110,7 @@ public FileInfo(File lf, byte[] masterKey, int 
fileInfoVersionToWrite) throws IO
         this.masterKey = masterKey;
         mode = "rw";
         this.headerVersion = fileInfoVersionToWrite;
+        this.deleted = false;
     }
 
     synchronized Long getLastAddConfirmed() {
@@ -257,6 +260,16 @@ public synchronized void readHeader() throws IOException {
         }
     }
 
+    public synchronized boolean isDeleted() {
+        return deleted;
+    }
+
+    public class FileInfoDeleted extends IOException {
+        FileInfoDeleted() {
+            super("FileInfo already deleted");
+        }
+    }
+
     @VisibleForTesting
     void checkOpen(boolean create) throws IOException {
         checkOpen(create, false);
@@ -264,6 +277,9 @@ void checkOpen(boolean create) throws IOException {
 
     private synchronized void checkOpen(boolean create, boolean 
openBeforeClose)
             throws IOException {
+        if (deleted) {
+            throw new FileInfoDeleted();
+        }
         if (fc != null) {
             return;
         }
@@ -540,6 +556,7 @@ public synchronized void moveToNewLocation(File newFile, 
long size) throws IOExc
     }
 
     public synchronized boolean delete() {
+        deleted = true;
         return lf.delete();
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
index 6beba6a744..078292fb81 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
@@ -49,6 +49,8 @@ private static CachedFileInfo 
tryRetainFileInfo(CachedFileInfo fi) throws IOExce
         boolean retained = fi.tryRetain();
         if (!retained) {
             throw new IOException("FileInfo " + fi + " is already marked 
dead");
+        } else if (fi.isDeleted()) {
+            throw new Bookie.NoLedgerException(fi.ledgerId);
         }
         return fi;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
index 66e97f7942..0cf5cc93f8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
@@ -40,7 +40,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -345,12 +344,6 @@ public void onSetDirty(LedgerEntryPage lep) {
     // flush and read pages
     private final IndexPersistenceMgr indexPersistenceManager;
 
-    /**
-     * the list of potentially dirty ledgers.
-     */
-    private final ConcurrentLinkedQueue<Long> ledgersToFlush = new 
ConcurrentLinkedQueue<Long>();
-    private final ConcurrentSkipListSet<Long> ledgersFlushing = new 
ConcurrentSkipListSet<Long>();
-
     // Stats
     private final Counter ledgerCacheHitCounter;
     private final Counter ledgerCacheMissCounter;
@@ -504,7 +497,6 @@ private LedgerEntryPage grabLedgerEntryPage(long ledger, 
long pageEntry) throws
 
     void removePagesForLedger(long ledgerId) {
         pageMapAndList.removeEntriesForALedger(ledgerId);
-        ledgersToFlush.remove(ledgerId);
     }
 
     long getLastEntryInMem(long ledgerId) {
@@ -542,18 +534,12 @@ private LedgerEntryPage grabCleanPage(long ledger, long 
entry) throws IOExceptio
     }
 
     void flushOneOrMoreLedgers(boolean doAll) throws IOException {
-        if (ledgersToFlush.isEmpty()) {
-            ledgersToFlush.addAll(pageMapAndList.getActiveLedgers());
-        }
-        Long potentiallyDirtyLedger;
-        while (null != (potentiallyDirtyLedger = ledgersToFlush.poll())) {
-            if (!ledgersFlushing.add(potentiallyDirtyLedger)) {
-                continue;
-            }
+        List<Long> ledgersToFlush = new 
ArrayList<>(pageMapAndList.getActiveLedgers());
+        for (Long potentiallyDirtyLedger : ledgersToFlush) {
             try {
                 flushSpecificLedger(potentiallyDirtyLedger);
-            } finally {
-                ledgersFlushing.remove(potentiallyDirtyLedger);
+            } catch (Bookie.NoLedgerException e) {
+                continue;
             }
             if (!doAll) {
                 break;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 83cb88fcc3..07a7b3b6ec 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -508,6 +508,9 @@ private void relocateIndexFileAndFlushHeader(long ledger, 
FileInfo fi) throws IO
                 if 
(!ledgerDirsManager.isDirWritableForNewIndexFile(currentDir)) {
                     throw nwe;
                 }
+            } catch (FileInfo.FileInfoDeleted fileInfoDeleted) {
+                // File concurrently deleted
+                throw new Bookie.NoLedgerException(ledger);
             }
         }
         fi.flushHeader();
@@ -523,9 +526,19 @@ private File getLedgerDirForLedger(FileInfo fi) {
         return fi.getLf().getParentFile().getParentFile().getParentFile();
     }
 
-    private void moveLedgerIndexFile(Long l, FileInfo fi) throws 
NoWritableLedgerDirException, IOException {
-        File newLedgerIndexFile = getNewLedgerIndexFile(l, 
getLedgerDirForLedger(fi));
-        fi.moveToNewLocation(newLedgerIndexFile, fi.getSizeSinceLastwrite());
+    private void moveLedgerIndexFile(Long l, FileInfo fi)
+            throws NoWritableLedgerDirException, IOException, 
FileInfo.FileInfoDeleted {
+        File newLedgerIndexFile = null;
+        boolean success = false;
+        try {
+            newLedgerIndexFile = getNewLedgerIndexFile(l, 
getLedgerDirForLedger(fi));
+            fi.moveToNewLocation(newLedgerIndexFile, 
fi.getSizeSinceLastwrite());
+            success = true;
+        } finally {
+            if (!success && newLedgerIndexFile != null) {
+                newLedgerIndexFile.delete();
+            }
+        }
     }
 
     void flushLedgerHeader(long ledger) throws IOException {
@@ -599,7 +612,7 @@ public int compare(LedgerEntryPage o1, LedgerEntryPage o2) {
 
     private void writeBuffers(Long ledger,
                               List<LedgerEntryPage> entries, FileInfo fi,
-                              int start, int count) throws IOException {
+                              int start, int count) throws IOException, 
Bookie.NoLedgerException {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Writing {} buffers of {}", count, 
Long.toHexString(ledger));
         }
@@ -616,7 +629,12 @@ private void writeBuffers(Long ledger,
         }
         long totalWritten = 0;
         while (buffs[buffs.length - 1].remaining() > 0) {
-            long rc = fi.write(buffs, entries.get(start + 
0).getFirstEntryPosition());
+            long rc = 0;
+            try {
+                rc = fi.write(buffs, entries.get(start + 
0).getFirstEntryPosition());
+            } catch (FileInfo.FileInfoDeleted e) {
+                throw new Bookie.NoLedgerException(ledger);
+            }
             if (rc <= 0) {
                 throw new IOException("Short write to ledger " + ledger + " rc 
= " + rc);
             }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
index a9cef72306..be8755959c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
@@ -207,7 +207,8 @@ public void readPage(FileInfo fi) throws IOException {
     public ByteBuffer getPageToWrite() {
         checkPage();
         page.clear();
-        return page;
+        // Different callers to this method should be able to reasonably 
expect independent read pointers
+        return page.duplicate();
     }
 
     long getLedger() {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 1a01299a6c..3ef607c596 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -379,24 +379,36 @@ public void testSyncThreadNPE() throws IOException {
     }
 
     /**
-     * Race where a flush would fail because a garbage collection occurred at
-     * the wrong time.
+     * Test for race between delete and flush.
      * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-604}
+     * {@link https://github.com/apache/bookkeeper/issues/1757}
      */
     @Test
     public void testFlushDeleteRace() throws Exception {
         newLedgerCache();
         final AtomicInteger rc = new AtomicInteger(0);
-        final LinkedBlockingQueue<Long> ledgerQ = new 
LinkedBlockingQueue<Long>(1);
+        final LinkedBlockingQueue<Long> ledgerQ = new 
LinkedBlockingQueue<>(100);
         final byte[] masterKey = "masterKey".getBytes();
+        final long numLedgers = 1000;
+        final int numFlushers = 10;
+        final int numDeleters = 10;
+        final AtomicBoolean running = new AtomicBoolean(true);
         Thread newLedgerThread = new Thread() {
                 public void run() {
                     try {
-                        for (int i = 0; i < 1000 && rc.get() == 0; i++) {
+                        for (long i = 0; i < numLedgers && rc.get() == 0; i++) 
{
                             ledgerCache.setMasterKey(i, masterKey);
-                            ledgerQ.put((long) i);
+
+                            ledgerCache.putEntryOffset(i, 1, 0);
+                            //ledgerCache.putEntryOffset(i, 1024, 0);
+                            //ledgerCache.putEntryOffset(i, 4096, 0);
+                            //ledgerCache.putEntryOffset(i, 8192, 0);
+                            ledgerQ.put(i);
+                        }
+                        for (int i = 0; i < numDeleters; ++i) {
+                            ledgerQ.put(-1L);
                         }
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         rc.set(-1);
                         LOG.error("Exception in new ledger thread", e);
                     }
@@ -404,51 +416,73 @@ public void run() {
             };
         newLedgerThread.start();
 
-        Thread flushThread = new Thread() {
+        Thread[] flushThreads = new Thread[numFlushers];
+        for (int i = 0; i < numFlushers; ++i) {
+            Thread flushThread = new Thread() {
                 public void run() {
                     try {
-                        while (true) {
-                            Long id = ledgerQ.peek();
-                            if (id == null) {
-                                continue;
-                            }
-                            LOG.info("Put entry for {}", id);
-                            try {
-                                ledgerCache.putEntryOffset((long) id, 1, 0);
-                            } catch (Bookie.NoLedgerException nle) {
-                                //ignore
-                            }
+                        while (running.get()) {
                             ledgerCache.flushLedger(true);
                         }
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         rc.set(-1);
                         LOG.error("Exception in flush thread", e);
                     }
+                    LOG.error("Shutting down flush thread");
                 }
             };
-        flushThread.start();
+            flushThread.start();
+            flushThreads[i] = flushThread;
+        }
 
-        Thread deleteThread = new Thread() {
+        Thread[] deleteThreads = new Thread[numDeleters];
+        for (int i = 0; i < numDeleters; ++i) {
+            Thread deleteThread = new Thread() {
                 public void run() {
                     try {
                         while (true) {
                             long id = ledgerQ.take();
+                            if (id == -1L) {
+                                break;
+                            }
                             LOG.info("Deleting {}", id);
                             ledgerCache.deleteLedger(id);
                         }
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         rc.set(-1);
                         LOG.error("Exception in delete thread", e);
                     }
                 }
             };
-        deleteThread.start();
+            deleteThread.start();
+            deleteThreads[i] = deleteThread;
+        }
 
         newLedgerThread.join();
-        assertEquals("Should have been no errors", rc.get(), 0);
 
-        deleteThread.interrupt();
-        flushThread.interrupt();
+        for (Thread deleteThread : deleteThreads) {
+            deleteThread.join();
+        }
+
+        running.set(false);
+        for (Thread flushThread : flushThreads) {
+            flushThread.join();
+        }
+
+        assertEquals("Should have been no errors", rc.get(), 0);
+        for (long i = 0L; i < numLedgers; ++i) {
+            boolean gotError = false;
+            try {
+                LOG.error("Checking {}", i);
+                ledgerCache.getEntryOffset(i, 0);
+            } catch (NoLedgerException e) {
+                gotError = true;
+            }
+            if (!gotError) {
+                LOG.error("Ledger {} is still around", i);
+                assertTrue(gotError);
+            }
+        }
     }
 
     // Mock SortedLedgerStorage to simulate flush failure (Dependency Fault 
Injection)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to