sijie closed pull request #1280: Improve write rejection in DbLedgerStorage URL: https://github.com/apache/bookkeeper/pull/1280
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java index 99ae39e76..b7428b282 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java @@ -82,6 +82,7 @@ public static BookieException create(int code) { int CookieNotFoundException = -105; int MetadataStoreException = -106; int UnknownBookieIdException = -107; + int OperationRejectedException = -108; } public void setCode(int code) { @@ -122,6 +123,9 @@ public String getMessage(int code) { case Code.UnknownBookieIdException: err = "Unknown bookie id"; break; + case Code.OperationRejectedException: + err = "Operation rejected"; + break; default: err = "Invalid operation"; break; @@ -174,6 +178,22 @@ public LedgerFencedException() { } } + /** + * Signals that a ledger has been fenced in a bookie. No more entries can be appended to that ledger. + */ + public static class OperationRejectedException extends BookieException { + public OperationRejectedException() { + super(Code.OperationRejectedException); + } + + @Override + public Throwable fillInStackTrace() { + // Since this exception is a way to signal a specific condition and it's triggered and very specific points, + // we can disable stack traces. + return null; + } + } + /** * Signal that an invalid cookie is found when starting a bookie. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java index f4db7a689..23840be27 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java @@ -73,7 +73,7 @@ static ByteBuf createLedgerFenceEntry(Long ledgerId) { */ abstract SettableFuture<Boolean> fenceAndLogInJournal(Journal journal) throws IOException; - abstract long addEntry(ByteBuf entry) throws IOException; + abstract long addEntry(ByteBuf entry) throws IOException, BookieException; abstract ByteBuf readEntry(long entryId) throws IOException; abstract long getLastAddConfirmed() throws IOException; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java index 84730cb76..a0f34eab6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java @@ -138,7 +138,7 @@ ByteBuf getExplicitLac() { } @Override - long addEntry(ByteBuf entry) throws IOException { + long addEntry(ByteBuf entry) throws IOException, BookieException { long ledgerId = entry.getLong(entry.readerIndex()); if (ledgerId != this.ledgerId) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java index 83ac2c0ae..dcfac3147 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java @@ -100,7 +100,7 @@ void initialize(ServerConfiguration conf, * * @return the entry id of the entry added */ - long addEntry(ByteBuf entry) throws IOException; + long addEntry(ByteBuf entry) throws IOException, BookieException; /** * Read an entry from storage. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index ff9cd316a..55b094587 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -42,13 +42,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.StampedLock; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.Bookie.NoEntryException; import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException; import org.apache.bookkeeper.bookie.CheckpointSource; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.Checkpointer; @@ -67,6 +67,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -78,8 +79,7 @@ import org.slf4j.LoggerFactory; /** - * Implementation of LedgerStorage that uses RocksDB to keep the indexes for - * entries stored in EntryLogs. + * Implementation of LedgerStorage that uses RocksDB to keep the indexes for entries stored in EntryLogs. */ public class DbLedgerStorage implements CompactableLedgerStorage { @@ -90,8 +90,7 @@ * * <p>This class is used for holding all the transient states for a given ledger. */ - private static class TransientLedgerInfo - extends Watchable<LastAddConfirmedUpdateNotification> + private static class TransientLedgerInfo extends Watchable<LastAddConfirmedUpdateNotification> implements AutoCloseable { // lac @@ -139,8 +138,7 @@ long setLastAddConfirmed(long lac) { } synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC, - Watcher<LastAddConfirmedUpdateNotification> watcher) - throws IOException { + Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException { lastAccessed = System.currentTimeMillis(); if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || ledgerIndex.get(ledgerId).getFenced()) { return false; @@ -155,7 +153,7 @@ public ByteBuf getExplicitLac() { synchronized (this) { if (explicitLac != null) { retLac = Unpooled.buffer(explicitLac.capacity()); - explicitLac.rewind(); //copy from the beginning + explicitLac.rewind(); // copy from the beginning retLac.writeBytes(explicitLac); explicitLac.rewind(); return retLac; @@ -194,13 +192,13 @@ void notifyWatchers(long lastAddConfirmed) { @Override public void close() { - synchronized (this) { - if (isClosed) { - return; - } - isClosed = true; - } - // notify watchers + synchronized (this) { + if (isClosed) { + return; + } + isClosed = true; + } + // notify watchers notifyWatchers(Long.MAX_VALUE); } @@ -217,16 +215,15 @@ public void close() { private GarbageCollectorThread gcThread; // Write cache where all new entries are inserted into - protected WriteCache writeCache; + protected volatile WriteCache writeCache; // Write cache that is used to swap with writeCache during flushes - protected WriteCache writeCacheBeingFlushed; + protected volatile WriteCache writeCacheBeingFlushed; // Cache where we insert entries for speculative reading private ReadCache readCache; - private final ReentrantReadWriteLock writeCacheMutex = new ReentrantReadWriteLock(); - private final Condition flushWriteCacheCondition = writeCacheMutex.writeLock().newCondition(); + private final StampedLock writeCacheRotationLock = new StampedLock(); protected final ReentrantLock flushMutex = new ReentrantLock(); @@ -243,10 +240,14 @@ public void close() { static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize"; static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb"; + static final String MAX_THROTTLE_TIME_MILLIS = "dbStorage_maxThrottleTimeMs"; + private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16; private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16; private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100; + private static final long DEFAUL_MAX_THROTTLE_TIME_MILLIS = TimeUnit.SECONDS.toMillis(10); + private static final int MB = 1024 * 1024; private final CopyOnWriteArrayList<LedgerDeletionListener> ledgerDeletionListeners = Lists @@ -260,6 +261,8 @@ public void close() { private long readCacheMaxSize; private int readAheadCacheBatchSize; + private long maxThrottleTimeNanos; + private StatsLogger stats; private OpStatsLogger addEntryStats; @@ -271,10 +274,13 @@ public void close() { private OpStatsLogger flushStats; private OpStatsLogger flushSizeStats; + private Counter throttledWriteRequests; + private Counter rejectedWriteRequests; + @Override public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, - LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource, - Checkpointer checkpointer, StatsLogger statsLogger) throws IOException { + LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource, + Checkpointer checkpointer, StatsLogger statsLogger) throws IOException { checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1, "Db implementation only allows for one storage dir"); @@ -290,6 +296,9 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB; readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE); + long maxThrottleTimeMillis = conf.getLong(MAX_THROTTLE_TIME_MILLIS, DEFAUL_MAX_THROTTLE_TIME_MILLIS); + maxThrottleTimeNanos = TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis); + readCache = new ReadCache(readCacheMaxSize); this.stats = statsLogger; @@ -381,6 +390,9 @@ public Long getSample() { readAheadBatchSizeStats = stats.getOpStatsLogger("readahead-batch-size"); flushStats = stats.getOpStatsLogger("flush"); flushSizeStats = stats.getOpStatsLogger("flush-size"); + + throttledWriteRequests = stats.getCounter("throttled-write-requests"); + rejectedWriteRequests = stats.getCounter("rejected-write-requests"); } @Override @@ -467,7 +479,7 @@ public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException { } @Override - public long addEntry(ByteBuf entry) throws IOException { + public long addEntry(ByteBuf entry) throws IOException, BookieException { long startTime = MathUtils.nowInNano(); long ledgerId = entry.getLong(entry.readerIndex()); @@ -478,13 +490,23 @@ public long addEntry(ByteBuf entry) throws IOException { log.debug("Add entry. {}@{}, lac = {}", ledgerId, entryId, lac); } - // Waits if the write cache is being switched for a flush - writeCacheMutex.readLock().lock(); - boolean inserted; - try { - inserted = writeCache.put(ledgerId, entryId, entry); - } finally { - writeCacheMutex.readLock().unlock(); + // First we try to do an optimistic locking to get access to the current write cache. + // This is based on the fact that the write cache is only being rotated (swapped) every 1 minute. During the + // rest of the time, we can have multiple thread using the optimistic lock here without interfering. + long stamp = writeCacheRotationLock.tryOptimisticRead(); + boolean inserted = false; + + inserted = writeCache.put(ledgerId, entryId, entry); + if (!writeCacheRotationLock.validate(stamp)) { + // The write cache was rotated while we were inserting. We need to acquire the proper read lock and repeat + // the operation because we might have inserted in a write cache that was already being flushed and cleared, + // without being sure about this last entry being flushed or not. + stamp = writeCacheRotationLock.readLock(); + try { + inserted = writeCache.put(ledgerId, entryId, entry); + } finally { + writeCacheRotationLock.unlockRead(stamp); + } } if (!inserted) { @@ -498,46 +520,49 @@ public long addEntry(ByteBuf entry) throws IOException { return entryId; } - private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) throws IOException { + private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) + throws IOException, BookieException { // Write cache is full, we need to trigger a flush so that it gets rotated - writeCacheMutex.writeLock().lock(); + // If the flush has already been triggered or flush has already switched the + // cache, we don't need to trigger another flush + if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) { + // Trigger an early flush in background + log.info("Write cache is full, triggering flush"); + executor.execute(() -> { + try { + flush(); + } catch (IOException e) { + log.error("Error during flush", e); + } + }); + } - try { - // If the flush has already been triggered or flush has already switched the - // cache, we don't need to - // trigger another flush - if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) { - // Trigger an early flush in background - log.info("Write cache is full, triggering flush"); - executor.execute(() -> { - try { - flush(); - } catch (IOException e) { - log.error("Error during flush", e); - } - }); - } + throttledWriteRequests.inc(); + long absoluteTimeoutNanos = System.nanoTime() + maxThrottleTimeNanos; - long timeoutNs = TimeUnit.MILLISECONDS.toNanos(100); - while (hasFlushBeenTriggered.get()) { - if (timeoutNs <= 0L) { - throw new IOException("Write cache was not trigger within the timeout, cannot add entry " + ledgerId - + "@" + entryId); + while (System.nanoTime() < absoluteTimeoutNanos) { + long stamp = writeCacheRotationLock.readLock(); + try { + if (writeCache.put(ledgerId, entryId, entry)) { + // We succeeded in putting the entry in write cache in the + return; } - timeoutNs = flushWriteCacheCondition.awaitNanos(timeoutNs); + } finally { + writeCacheRotationLock.unlockRead(stamp); } - if (!writeCache.put(ledgerId, entryId, entry)) { - // Still wasn't able to cache entry - throw new IOException("Error while inserting entry in write cache" + ledgerId + "@" + entryId); + // Wait some time and try again + try { + Thread.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId); } - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId); - } finally { - writeCacheMutex.writeLock().unlock(); } + + // Timeout expired and we weren't able to insert in write cache + rejectedWriteRequests.inc(); + throw new OperationRejectedException(); } @Override @@ -551,29 +576,41 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { return getLastEntry(ledgerId); } - writeCacheMutex.readLock().lock(); - try { - // First try to read from the write cache of recent entries - ByteBuf entry = writeCache.get(ledgerId, entryId); - if (entry != null) { - recordSuccessfulEvent(readCacheHitStats, startTime); - recordSuccessfulEvent(readEntryStats, startTime); - return entry; + // We need to try to read from both write caches, since recent entries could be found in either of the two. The + // write caches are already thread safe on their own, here we just need to make sure we get references to both + // of them. Using an optimistic lock since the read lock is always free, unless we're swapping the caches. + long stamp = writeCacheRotationLock.tryOptimisticRead(); + WriteCache localWriteCache = writeCache; + WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed; + if (!writeCacheRotationLock.validate(stamp)) { + // Fallback to regular read lock approach + stamp = writeCacheRotationLock.readLock(); + try { + localWriteCache = writeCache; + localWriteCacheBeingFlushed = writeCacheBeingFlushed; + } finally { + writeCacheRotationLock.unlockRead(stamp); } + } - // If there's a flush going on, the entry might be in the flush buffer - entry = writeCacheBeingFlushed.get(ledgerId, entryId); - if (entry != null) { - recordSuccessfulEvent(readCacheHitStats, startTime); - recordSuccessfulEvent(readEntryStats, startTime); - return entry; - } - } finally { - writeCacheMutex.readLock().unlock(); + // First try to read from the write cache of recent entries + ByteBuf entry = localWriteCache.get(ledgerId, entryId); + if (entry != null) { + recordSuccessfulEvent(readCacheHitStats, startTime); + recordSuccessfulEvent(readEntryStats, startTime); + return entry; + } + + // If there's a flush going on, the entry might be in the flush buffer + entry = localWriteCacheBeingFlushed.get(ledgerId, entryId); + if (entry != null) { + recordSuccessfulEvent(readCacheHitStats, startTime); + recordSuccessfulEvent(readEntryStats, startTime); + return entry; } // Try reading from read-ahead cache - ByteBuf entry = readCache.get(ledgerId, entryId); + entry = readCache.get(ledgerId, entryId); if (entry != null) { recordSuccessfulEvent(readCacheHitStats, startTime); recordSuccessfulEvent(readEntryStats, startTime); @@ -650,7 +687,7 @@ private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long fi public ByteBuf getLastEntry(long ledgerId) throws IOException { long startTime = MathUtils.nowInNano(); - writeCacheMutex.readLock().lock(); + long stamp = writeCacheRotationLock.readLock(); try { // First try to read from the write cache of recent entries ByteBuf entry = writeCache.getLastEntry(ledgerId); @@ -687,7 +724,7 @@ public ByteBuf getLastEntry(long ledgerId) throws IOException { return entry; } } finally { - writeCacheMutex.readLock().unlock(); + writeCacheRotationLock.unlockRead(stamp); } // Search the last entry in storage @@ -706,11 +743,11 @@ public ByteBuf getLastEntry(long ledgerId) throws IOException { @VisibleForTesting boolean isFlushRequired() { - writeCacheMutex.readLock().lock(); + long stamp = writeCacheRotationLock.readLock(); try { return !writeCache.isEmpty(); } finally { - writeCacheMutex.readLock().unlock(); + writeCacheRotationLock.unlockRead(stamp); } } @@ -810,7 +847,7 @@ public void checkpoint(Checkpoint checkpoint) throws IOException { * Swap the current write cache with the replacement cache. */ private void swapWriteCache() { - writeCacheMutex.writeLock().lock(); + long stamp = writeCacheRotationLock.writeLock(); try { // First, swap the current write-cache map with an empty one so that writes will // go on unaffected. Only a single flush is happening at the same time @@ -820,12 +857,11 @@ private void swapWriteCache() { // since the cache is switched, we can allow flush to be triggered hasFlushBeenTriggered.set(false); - flushWriteCacheCondition.signalAll(); } finally { try { isFlushOngoing.set(true); } finally { - writeCacheMutex.writeLock().unlock(); + writeCacheRotationLock.unlockWrite(stamp); } } } @@ -844,11 +880,11 @@ public void deleteLedger(long ledgerId) throws IOException { } // Delete entries from this ledger that are still in the write cache - writeCacheMutex.readLock().lock(); + long stamp = writeCacheRotationLock.readLock(); try { writeCache.deleteLedger(ledgerId); } finally { - writeCacheMutex.readLock().unlock(); + writeCacheRotationLock.unlockRead(stamp); } entryLocationIndex.delete(ledgerId); @@ -944,7 +980,6 @@ private void updateCachedLacIfNeeded(long ledgerId, long lac) { } } - @Override public void flushEntriesLocationsIndex() throws IOException { // No-op. Location index is already flushed in updateEntriesLocations() call @@ -953,8 +988,7 @@ public void flushEntriesLocationsIndex() throws IOException { /** * Add an already existing ledger to the index. * - * <p>This method is only used as a tool to help the migration from - * InterleaveLedgerStorage to DbLedgerStorage + * <p>This method is only used as a tool to help the migration from InterleaveLedgerStorage to DbLedgerStorage * * @param ledgerId * the ledger id @@ -988,6 +1022,7 @@ public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey, return numberOfEntries.get(); } + @Override public void registerLedgerDeletionListener(LedgerDeletionListener listener) { ledgerDeletionListeners.add(listener); @@ -1020,7 +1055,7 @@ public static void readLedgerIndexEntries(long ledgerId, ServerConfiguration ser checkNotNull(processor, "LedgerLoggger info processor can't null"); LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(serverConf, serverConf.getLedgerDirs(), - new DiskChecker(serverConf.getDiskUsageThreshold(), serverConf.getDiskUsageWarnThreshold())); + new DiskChecker(serverConf.getDiskUsageThreshold(), serverConf.getDiskUsageWarnThreshold())); String ledgerBasePath = ledgerDirsManager.getAllLedgerDirs().get(0).toString(); EntryLocationIndex entryLocationIndex = new EntryLocationIndex(serverConf, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index 993700092..eaf2473a2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -20,12 +20,12 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.util.Recycler; -import io.netty.util.Recycler.Handle; import java.io.IOException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; @@ -76,6 +76,13 @@ protected void processPacket() { } else { requestProcessor.bookie.addEntry(addData, false, this, channel, request.getMasterKey()); } + } catch (OperationRejectedException e) { + // Avoid to log each occurence of this exception as this can happen when the ledger storage is + // unable to keep up with the write rate. + if (LOG.isDebugEnabled()) { + LOG.debug("Operation rejected while writing {}", request, e); + } + rc = BookieProtocol.EIO; } catch (IOException e) { LOG.error("Error writing {}", request, e); rc = BookieProtocol.EIO; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java index 7bd78d3f4..0854bf538 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException; import org.apache.bookkeeper.client.api.WriteFlag; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest; @@ -120,6 +121,13 @@ public void writeComplete(int rc, long ledgerId, long entryId, requestProcessor.bookie.addEntry(entryToAdd, ackBeforeSync, wcb, channel, masterKey); } status = StatusCode.EOK; + } catch (OperationRejectedException e) { + // Avoid to log each occurence of this exception as this can happen when the ledger storage is + // unable to keep up with the write rate. + if (logger.isDebugEnabled()) { + logger.debug("Operation rejected while writing {}", request, e); + } + status = StatusCode.EIO; } catch (IOException e) { logger.error("Error writing entry:{} to ledger:{}", entryId, ledgerId, e); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java index d4a98cee0..cc02e4e8c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java @@ -413,7 +413,7 @@ public void testGetEntryLogsSet() throws Exception { } @Override - public Boolean call() throws IOException { + public Boolean call() throws IOException, BookieException { try { ledgerStorage.addEntry(generateEntry(ledgerId, entryId)); } catch (IOException e) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java index df810826d..4894814d7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java @@ -30,6 +30,7 @@ import java.io.IOException; import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.junit.After; @@ -85,6 +86,7 @@ public void setup() throws Exception { conf.setGcWaitTime(gcWaitTime); conf.setLedgerStorageClass(MockedDbLedgerStorage.class.getName()); conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 1); + conf.setProperty(DbLedgerStorage.MAX_THROTTLE_TIME_MILLIS, 1000); conf.setLedgerDirNames(new String[] { tmpDir.toString() }); Bookie bookie = new Bookie(conf); @@ -131,7 +133,7 @@ public void writeCacheFull() throws Exception { try { storage.addEntry(entry); fail("Should have thrown exception"); - } catch (IOException e) { + } catch (OperationRejectedException e) { // Expected } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services