sijie closed pull request #1280: Improve write rejection in DbLedgerStorage 
URL: https://github.com/apache/bookkeeper/pull/1280
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
index 99ae39e76..b7428b282 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
@@ -82,6 +82,7 @@ public static BookieException create(int code) {
         int CookieNotFoundException = -105;
         int MetadataStoreException = -106;
         int UnknownBookieIdException = -107;
+        int OperationRejectedException = -108;
     }
 
     public void setCode(int code) {
@@ -122,6 +123,9 @@ public String getMessage(int code) {
         case Code.UnknownBookieIdException:
             err = "Unknown bookie id";
             break;
+        case Code.OperationRejectedException:
+            err = "Operation rejected";
+            break;
         default:
             err = "Invalid operation";
             break;
@@ -174,6 +178,22 @@ public LedgerFencedException() {
         }
     }
 
+    /**
+     * Signals that a ledger has been fenced in a bookie. No more entries can 
be appended to that ledger.
+     */
+    public static class OperationRejectedException extends BookieException {
+        public OperationRejectedException() {
+            super(Code.OperationRejectedException);
+        }
+
+        @Override
+        public Throwable fillInStackTrace() {
+            // Since this exception is a way to signal a specific condition 
and it's triggered and very specific points,
+            // we can disable stack traces.
+            return null;
+        }
+    }
+
     /**
      * Signal that an invalid cookie is found when starting a bookie.
      *
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index f4db7a689..23840be27 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -73,7 +73,7 @@ static ByteBuf createLedgerFenceEntry(Long ledgerId) {
      */
     abstract SettableFuture<Boolean> fenceAndLogInJournal(Journal journal) 
throws IOException;
 
-    abstract long addEntry(ByteBuf entry) throws IOException;
+    abstract long addEntry(ByteBuf entry) throws IOException, BookieException;
     abstract ByteBuf readEntry(long entryId) throws IOException;
 
     abstract long getLastAddConfirmed() throws IOException;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index 84730cb76..a0f34eab6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -138,7 +138,7 @@ ByteBuf getExplicitLac() {
     }
 
     @Override
-    long addEntry(ByteBuf entry) throws IOException {
+    long addEntry(ByteBuf entry) throws IOException, BookieException {
         long ledgerId = entry.getLong(entry.readerIndex());
 
         if (ledgerId != this.ledgerId) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 83ac2c0ae..dcfac3147 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -100,7 +100,7 @@ void initialize(ServerConfiguration conf,
      *
      * @return the entry id of the entry added
      */
-    long addEntry(ByteBuf entry) throws IOException;
+    long addEntry(ByteBuf entry) throws IOException, BookieException;
 
     /**
      * Read an entry from storage.
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index ff9cd316a..55b094587 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -42,13 +42,13 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.StampedLock;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.Checkpointer;
@@ -67,6 +67,7 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -78,8 +79,7 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * Implementation of LedgerStorage that uses RocksDB to keep the indexes for
- * entries stored in EntryLogs.
+ * Implementation of LedgerStorage that uses RocksDB to keep the indexes for 
entries stored in EntryLogs.
  */
 public class DbLedgerStorage implements CompactableLedgerStorage {
 
@@ -90,8 +90,7 @@
      *
      * <p>This class is used for holding all the transient states for a given 
ledger.
      */
-    private static class TransientLedgerInfo
-            extends Watchable<LastAddConfirmedUpdateNotification>
+    private static class TransientLedgerInfo extends 
Watchable<LastAddConfirmedUpdateNotification>
             implements AutoCloseable {
 
         // lac
@@ -139,8 +138,7 @@ long setLastAddConfirmed(long lac) {
         }
 
         synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
-                                                           
Watcher<LastAddConfirmedUpdateNotification> watcher)
-                throws IOException {
+                Watcher<LastAddConfirmedUpdateNotification> watcher) throws 
IOException {
             lastAccessed = System.currentTimeMillis();
             if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || 
ledgerIndex.get(ledgerId).getFenced()) {
                 return false;
@@ -155,7 +153,7 @@ public ByteBuf getExplicitLac() {
             synchronized (this) {
                 if (explicitLac != null) {
                     retLac = Unpooled.buffer(explicitLac.capacity());
-                    explicitLac.rewind(); //copy from the beginning
+                    explicitLac.rewind(); // copy from the beginning
                     retLac.writeBytes(explicitLac);
                     explicitLac.rewind();
                     return retLac;
@@ -194,13 +192,13 @@ void notifyWatchers(long lastAddConfirmed) {
 
         @Override
         public void close() {
-           synchronized (this) {
-               if (isClosed) {
-                   return;
-               }
-               isClosed = true;
-           }
-           // notify watchers
+            synchronized (this) {
+                if (isClosed) {
+                    return;
+                }
+                isClosed = true;
+            }
+            // notify watchers
             notifyWatchers(Long.MAX_VALUE);
         }
 
@@ -217,16 +215,15 @@ public void close() {
     private GarbageCollectorThread gcThread;
 
     // Write cache where all new entries are inserted into
-    protected WriteCache writeCache;
+    protected volatile WriteCache writeCache;
 
     // Write cache that is used to swap with writeCache during flushes
-    protected WriteCache writeCacheBeingFlushed;
+    protected volatile WriteCache writeCacheBeingFlushed;
 
     // Cache where we insert entries for speculative reading
     private ReadCache readCache;
 
-    private final ReentrantReadWriteLock writeCacheMutex = new 
ReentrantReadWriteLock();
-    private final Condition flushWriteCacheCondition = 
writeCacheMutex.writeLock().newCondition();
+    private final StampedLock writeCacheRotationLock = new StampedLock();
 
     protected final ReentrantLock flushMutex = new ReentrantLock();
 
@@ -243,10 +240,14 @@ public void close() {
     static final String READ_AHEAD_CACHE_BATCH_SIZE = 
"dbStorage_readAheadCacheBatchSize";
     static final String READ_AHEAD_CACHE_MAX_SIZE_MB = 
"dbStorage_readAheadCacheMaxSizeMb";
 
+    static final String MAX_THROTTLE_TIME_MILLIS = 
"dbStorage_maxThrottleTimeMs";
+
     private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16;
     private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16;
     private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
 
+    private static final long DEFAUL_MAX_THROTTLE_TIME_MILLIS = 
TimeUnit.SECONDS.toMillis(10);
+
     private static final int MB = 1024 * 1024;
 
     private final CopyOnWriteArrayList<LedgerDeletionListener> 
ledgerDeletionListeners = Lists
@@ -260,6 +261,8 @@ public void close() {
     private long readCacheMaxSize;
     private int readAheadCacheBatchSize;
 
+    private long maxThrottleTimeNanos;
+
     private StatsLogger stats;
 
     private OpStatsLogger addEntryStats;
@@ -271,10 +274,13 @@ public void close() {
     private OpStatsLogger flushStats;
     private OpStatsLogger flushSizeStats;
 
+    private Counter throttledWriteRequests;
+    private Counter rejectedWriteRequests;
+
     @Override
     public void initialize(ServerConfiguration conf, LedgerManager 
ledgerManager, LedgerDirsManager ledgerDirsManager,
-        LedgerDirsManager indexDirsManager, StateManager stateManager, 
CheckpointSource checkpointSource,
-                           Checkpointer checkpointer, StatsLogger statsLogger) 
throws IOException {
+            LedgerDirsManager indexDirsManager, StateManager stateManager, 
CheckpointSource checkpointSource,
+            Checkpointer checkpointer, StatsLogger statsLogger) throws 
IOException {
         checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
                 "Db implementation only allows for one storage dir");
 
@@ -290,6 +296,9 @@ public void initialize(ServerConfiguration conf, 
LedgerManager ledgerManager, Le
         readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, 
DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
         readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, 
DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
 
+        long maxThrottleTimeMillis = conf.getLong(MAX_THROTTLE_TIME_MILLIS, 
DEFAUL_MAX_THROTTLE_TIME_MILLIS);
+        maxThrottleTimeNanos = 
TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
+
         readCache = new ReadCache(readCacheMaxSize);
 
         this.stats = statsLogger;
@@ -381,6 +390,9 @@ public Long getSample() {
         readAheadBatchSizeStats = 
stats.getOpStatsLogger("readahead-batch-size");
         flushStats = stats.getOpStatsLogger("flush");
         flushSizeStats = stats.getOpStatsLogger("flush-size");
+
+        throttledWriteRequests = stats.getCounter("throttled-write-requests");
+        rejectedWriteRequests = stats.getCounter("rejected-write-requests");
     }
 
     @Override
@@ -467,7 +479,7 @@ public void setMasterKey(long ledgerId, byte[] masterKey) 
throws IOException {
     }
 
     @Override
-    public long addEntry(ByteBuf entry) throws IOException {
+    public long addEntry(ByteBuf entry) throws IOException, BookieException {
         long startTime = MathUtils.nowInNano();
 
         long ledgerId = entry.getLong(entry.readerIndex());
@@ -478,13 +490,23 @@ public long addEntry(ByteBuf entry) throws IOException {
             log.debug("Add entry. {}@{}, lac = {}", ledgerId, entryId, lac);
         }
 
-        // Waits if the write cache is being switched for a flush
-        writeCacheMutex.readLock().lock();
-        boolean inserted;
-        try {
-            inserted = writeCache.put(ledgerId, entryId, entry);
-        } finally {
-            writeCacheMutex.readLock().unlock();
+        // First we try to do an optimistic locking to get access to the 
current write cache.
+        // This is based on the fact that the write cache is only being 
rotated (swapped) every 1 minute. During the
+        // rest of the time, we can have multiple thread using the optimistic 
lock here without interfering.
+        long stamp = writeCacheRotationLock.tryOptimisticRead();
+        boolean inserted = false;
+
+        inserted = writeCache.put(ledgerId, entryId, entry);
+        if (!writeCacheRotationLock.validate(stamp)) {
+            // The write cache was rotated while we were inserting. We need to 
acquire the proper read lock and repeat
+            // the operation because we might have inserted in a write cache 
that was already being flushed and cleared,
+            // without being sure about this last entry being flushed or not.
+            stamp = writeCacheRotationLock.readLock();
+            try {
+                inserted = writeCache.put(ledgerId, entryId, entry);
+            } finally {
+                 writeCacheRotationLock.unlockRead(stamp);
+            }
         }
 
         if (!inserted) {
@@ -498,46 +520,49 @@ public long addEntry(ByteBuf entry) throws IOException {
         return entryId;
     }
 
-    private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf 
entry) throws IOException {
+    private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf 
entry)
+            throws IOException, BookieException {
         // Write cache is full, we need to trigger a flush so that it gets 
rotated
-        writeCacheMutex.writeLock().lock();
+        // If the flush has already been triggered or flush has already 
switched the
+        // cache, we don't need to trigger another flush
+        if (!isFlushOngoing.get() && 
hasFlushBeenTriggered.compareAndSet(false, true)) {
+            // Trigger an early flush in background
+            log.info("Write cache is full, triggering flush");
+            executor.execute(() -> {
+                try {
+                    flush();
+                } catch (IOException e) {
+                    log.error("Error during flush", e);
+                }
+            });
+        }
 
-        try {
-            // If the flush has already been triggered or flush has already 
switched the
-            // cache, we don't need to
-            // trigger another flush
-            if (!isFlushOngoing.get() && 
hasFlushBeenTriggered.compareAndSet(false, true)) {
-                // Trigger an early flush in background
-                log.info("Write cache is full, triggering flush");
-                executor.execute(() -> {
-                    try {
-                        flush();
-                    } catch (IOException e) {
-                        log.error("Error during flush", e);
-                    }
-                });
-            }
+        throttledWriteRequests.inc();
+        long absoluteTimeoutNanos = System.nanoTime() + maxThrottleTimeNanos;
 
-            long timeoutNs = TimeUnit.MILLISECONDS.toNanos(100);
-            while (hasFlushBeenTriggered.get()) {
-                if (timeoutNs <= 0L) {
-                    throw new IOException("Write cache was not trigger within 
the timeout, cannot add entry " + ledgerId
-                            + "@" + entryId);
+        while (System.nanoTime() < absoluteTimeoutNanos) {
+            long stamp = writeCacheRotationLock.readLock();
+            try {
+                if (writeCache.put(ledgerId, entryId, entry)) {
+                    // We succeeded in putting the entry in write cache in the
+                    return;
                 }
-                timeoutNs = flushWriteCacheCondition.awaitNanos(timeoutNs);
+            } finally {
+                 writeCacheRotationLock.unlockRead(stamp);
             }
 
-            if (!writeCache.put(ledgerId, entryId, entry)) {
-                // Still wasn't able to cache entry
-                throw new IOException("Error while inserting entry in write 
cache" + ledgerId + "@" + entryId);
+            // Wait some time and try again
+            try {
+                Thread.sleep(1);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted when adding entry " + 
ledgerId + "@" + entryId);
             }
-
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IOException("Interrupted when adding entry " + ledgerId 
+ "@" + entryId);
-        } finally {
-            writeCacheMutex.writeLock().unlock();
         }
+
+        // Timeout expired and we weren't able to insert in write cache
+        rejectedWriteRequests.inc();
+        throw new OperationRejectedException();
     }
 
     @Override
@@ -551,29 +576,41 @@ public ByteBuf getEntry(long ledgerId, long entryId) 
throws IOException {
             return getLastEntry(ledgerId);
         }
 
-        writeCacheMutex.readLock().lock();
-        try {
-            // First try to read from the write cache of recent entries
-            ByteBuf entry = writeCache.get(ledgerId, entryId);
-            if (entry != null) {
-                recordSuccessfulEvent(readCacheHitStats, startTime);
-                recordSuccessfulEvent(readEntryStats, startTime);
-                return entry;
+        // We need to try to read from both write caches, since recent entries 
could be found in either of the two. The
+        // write caches are already thread safe on their own, here we just 
need to make sure we get references to both
+        // of them. Using an optimistic lock since the read lock is always 
free, unless we're swapping the caches.
+        long stamp = writeCacheRotationLock.tryOptimisticRead();
+        WriteCache localWriteCache = writeCache;
+        WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed;
+        if (!writeCacheRotationLock.validate(stamp)) {
+            // Fallback to regular read lock approach
+            stamp = writeCacheRotationLock.readLock();
+            try {
+                localWriteCache = writeCache;
+                localWriteCacheBeingFlushed = writeCacheBeingFlushed;
+            } finally {
+                writeCacheRotationLock.unlockRead(stamp);
             }
+        }
 
-            // If there's a flush going on, the entry might be in the flush 
buffer
-            entry = writeCacheBeingFlushed.get(ledgerId, entryId);
-            if (entry != null) {
-                recordSuccessfulEvent(readCacheHitStats, startTime);
-                recordSuccessfulEvent(readEntryStats, startTime);
-                return entry;
-            }
-        } finally {
-            writeCacheMutex.readLock().unlock();
+        // First try to read from the write cache of recent entries
+        ByteBuf entry = localWriteCache.get(ledgerId, entryId);
+        if (entry != null) {
+            recordSuccessfulEvent(readCacheHitStats, startTime);
+            recordSuccessfulEvent(readEntryStats, startTime);
+            return entry;
+        }
+
+        // If there's a flush going on, the entry might be in the flush buffer
+        entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
+        if (entry != null) {
+            recordSuccessfulEvent(readCacheHitStats, startTime);
+            recordSuccessfulEvent(readEntryStats, startTime);
+            return entry;
         }
 
         // Try reading from read-ahead cache
-        ByteBuf entry = readCache.get(ledgerId, entryId);
+        entry = readCache.get(ledgerId, entryId);
         if (entry != null) {
             recordSuccessfulEvent(readCacheHitStats, startTime);
             recordSuccessfulEvent(readEntryStats, startTime);
@@ -650,7 +687,7 @@ private void fillReadAheadCache(long orginalLedgerId, long 
firstEntryId, long fi
     public ByteBuf getLastEntry(long ledgerId) throws IOException {
         long startTime = MathUtils.nowInNano();
 
-        writeCacheMutex.readLock().lock();
+        long stamp = writeCacheRotationLock.readLock();
         try {
             // First try to read from the write cache of recent entries
             ByteBuf entry = writeCache.getLastEntry(ledgerId);
@@ -687,7 +724,7 @@ public ByteBuf getLastEntry(long ledgerId) throws 
IOException {
                 return entry;
             }
         } finally {
-            writeCacheMutex.readLock().unlock();
+            writeCacheRotationLock.unlockRead(stamp);
         }
 
         // Search the last entry in storage
@@ -706,11 +743,11 @@ public ByteBuf getLastEntry(long ledgerId) throws 
IOException {
 
     @VisibleForTesting
     boolean isFlushRequired() {
-        writeCacheMutex.readLock().lock();
+        long stamp = writeCacheRotationLock.readLock();
         try {
             return !writeCache.isEmpty();
         } finally {
-            writeCacheMutex.readLock().unlock();
+            writeCacheRotationLock.unlockRead(stamp);
         }
     }
 
@@ -810,7 +847,7 @@ public void checkpoint(Checkpoint checkpoint) throws 
IOException {
      * Swap the current write cache with the replacement cache.
      */
     private void swapWriteCache() {
-        writeCacheMutex.writeLock().lock();
+        long stamp = writeCacheRotationLock.writeLock();
         try {
             // First, swap the current write-cache map with an empty one so 
that writes will
             // go on unaffected. Only a single flush is happening at the same 
time
@@ -820,12 +857,11 @@ private void swapWriteCache() {
 
             // since the cache is switched, we can allow flush to be triggered
             hasFlushBeenTriggered.set(false);
-            flushWriteCacheCondition.signalAll();
         } finally {
             try {
                 isFlushOngoing.set(true);
             } finally {
-                writeCacheMutex.writeLock().unlock();
+                writeCacheRotationLock.unlockWrite(stamp);
             }
         }
     }
@@ -844,11 +880,11 @@ public void deleteLedger(long ledgerId) throws 
IOException {
         }
 
         // Delete entries from this ledger that are still in the write cache
-        writeCacheMutex.readLock().lock();
+        long stamp = writeCacheRotationLock.readLock();
         try {
             writeCache.deleteLedger(ledgerId);
         } finally {
-            writeCacheMutex.readLock().unlock();
+            writeCacheRotationLock.unlockRead(stamp);
         }
 
         entryLocationIndex.delete(ledgerId);
@@ -944,7 +980,6 @@ private void updateCachedLacIfNeeded(long ledgerId, long 
lac) {
         }
     }
 
-
     @Override
     public void flushEntriesLocationsIndex() throws IOException {
         // No-op. Location index is already flushed in 
updateEntriesLocations() call
@@ -953,8 +988,7 @@ public void flushEntriesLocationsIndex() throws IOException 
{
     /**
      * Add an already existing ledger to the index.
      *
-     * <p>This method is only used as a tool to help the migration from
-     * InterleaveLedgerStorage to DbLedgerStorage
+     * <p>This method is only used as a tool to help the migration from 
InterleaveLedgerStorage to DbLedgerStorage
      *
      * @param ledgerId
      *            the ledger id
@@ -988,6 +1022,7 @@ public long addLedgerToIndex(long ledgerId, boolean 
isFenced, byte[] masterKey,
 
         return numberOfEntries.get();
     }
+
     @Override
     public void registerLedgerDeletionListener(LedgerDeletionListener 
listener) {
         ledgerDeletionListeners.add(listener);
@@ -1020,7 +1055,7 @@ public static void readLedgerIndexEntries(long ledgerId, 
ServerConfiguration ser
         checkNotNull(processor, "LedgerLoggger info processor can't null");
 
         LedgerDirsManager ledgerDirsManager = new 
LedgerDirsManager(serverConf, serverConf.getLedgerDirs(),
-            new DiskChecker(serverConf.getDiskUsageThreshold(), 
serverConf.getDiskUsageWarnThreshold()));
+                new DiskChecker(serverConf.getDiskUsageThreshold(), 
serverConf.getDiskUsageWarnThreshold()));
         String ledgerBasePath = 
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
 
         EntryLocationIndex entryLocationIndex = new 
EntryLocationIndex(serverConf,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 993700092..eaf2473a2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -20,12 +20,12 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
 
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -76,6 +76,13 @@ protected void processPacket() {
             } else {
                 requestProcessor.bookie.addEntry(addData, false, this, 
channel, request.getMasterKey());
             }
+        } catch (OperationRejectedException e) {
+            // Avoid to log each occurence of this exception as this can 
happen when the ledger storage is
+            // unable to keep up with the write rate.
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Operation rejected while writing {}", request, e);
+            }
+            rc = BookieProtocol.EIO;
         } catch (IOException e) {
             LOG.error("Error writing {}", request, e);
             rc = BookieProtocol.EIO;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 7bd78d3f4..0854bf538 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
@@ -120,6 +121,13 @@ public void writeComplete(int rc, long ledgerId, long 
entryId,
                 requestProcessor.bookie.addEntry(entryToAdd, ackBeforeSync, 
wcb, channel, masterKey);
             }
             status = StatusCode.EOK;
+        } catch (OperationRejectedException e) {
+            // Avoid to log each occurence of this exception as this can 
happen when the ledger storage is
+            // unable to keep up with the write rate.
+            if (logger.isDebugEnabled()) {
+                logger.debug("Operation rejected while writing {}", request, 
e);
+            }
+            status = StatusCode.EIO;
         } catch (IOException e) {
             logger.error("Error writing entry:{} to ledger:{}",
                     entryId, ledgerId, e);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index d4a98cee0..cc02e4e8c 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -413,7 +413,7 @@ public void testGetEntryLogsSet() throws Exception {
         }
 
         @Override
-        public Boolean call() throws IOException {
+        public Boolean call() throws IOException, BookieException {
             try {
                 ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
             } catch (IOException e) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
index df810826d..4894814d7 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -30,6 +30,7 @@
 import java.io.IOException;
 
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.junit.After;
@@ -85,6 +86,7 @@ public void setup() throws Exception {
         conf.setGcWaitTime(gcWaitTime);
         conf.setLedgerStorageClass(MockedDbLedgerStorage.class.getName());
         conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 1);
+        conf.setProperty(DbLedgerStorage.MAX_THROTTLE_TIME_MILLIS, 1000);
         conf.setLedgerDirNames(new String[] { tmpDir.toString() });
         Bookie bookie = new Bookie(conf);
 
@@ -131,7 +133,7 @@ public void writeCacheFull() throws Exception {
         try {
             storage.addEntry(entry);
             fail("Should have thrown exception");
-        } catch (IOException e) {
+        } catch (OperationRejectedException e) {
             // Expected
         }
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to