This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 308646bbfd95c3858d352fbf9c3fc56ffe2eec68 Author: wenbingshen <[email protected]> AuthorDate: Thu Jul 28 11:41:52 2022 +0800 Fix the infinite waiting for shutdown due to throttler limit (#2942) Descriptions of the changes in this PR: If the compactor is limited, the shutdown priority should be higher than waiting for RateLimiter.acquire. According to @hangc0276 suggestion, when processing the shutdown logic of `GarbageCollectorThread`, we should check the status of the `newScanner.process` method. If the status is false, throw an `IOException` and stop compact immediately. Master Issue: #2941 (cherry picked from commit 442e3bbad384fba9b09f8ea774e28562a8d5c6d7) --- .../bookkeeper/bookie/AbstractLogCompactor.java | 32 ++++++++++++- .../bookkeeper/bookie/GarbageCollectorThread.java | 2 + .../apache/bookkeeper/bookie/CompactionTest.java | 56 ++++++++++++++++++++++ 3 files changed, 88 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java index 57ec8978cc..56d4ff5cb7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java @@ -23,6 +23,9 @@ package org.apache.bookkeeper.bookie; import com.google.common.util.concurrent.RateLimiter; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.conf.ServerConfiguration; /** @@ -60,6 +63,7 @@ public abstract class AbstractLogCompactor { static class Throttler { private final RateLimiter rateLimiter; private final boolean isThrottleByBytes; + private final AtomicBoolean cancelled = new AtomicBoolean(false); Throttler(ServerConfiguration conf) { this.isThrottleByBytes = conf.getIsThrottleByBytes(); @@ -68,8 +72,32 @@ public abstract class AbstractLogCompactor { } // acquire. if bybytes: bytes of this entry; if byentries: 1. - void acquire(int permits) { - rateLimiter.acquire(this.isThrottleByBytes ? permits : 1); + boolean tryAcquire(int permits, long timeout, TimeUnit unit) { + return rateLimiter.tryAcquire(this.isThrottleByBytes ? permits : 1, timeout, unit); + } + + // GC thread will check the status for the rate limiter + // If the compactor is being stopped by other threads, + // and the GC thread is still limited, the compact task will be stopped. + public void acquire(int permits) throws IOException { + long timeout = 100; + long start = System.currentTimeMillis(); + while (!tryAcquire(permits, timeout, TimeUnit.MILLISECONDS)) { + if (cancelled.get()) { + throw new IOException("Failed to get permits takes " + + (System.currentTimeMillis() - start) + + " ms may be compactor has been shutting down"); + } + try { + TimeUnit.MILLISECONDS.sleep(timeout); + } catch (InterruptedException e) { + // ignore + } + } + } + + public void cancelledAcquire() { + cancelled.set(true); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index b98cd2a8f0..6e3d00c3d4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -611,6 +611,8 @@ public class GarbageCollectorThread extends SafeRunnable { } LOG.info("Shutting down GarbageCollectorThread"); + throttler.cancelledAcquire(); + compactor.throttler.cancelledAcquire(); while (!compacting.compareAndSet(false, true)) { // Wait till the thread stops compacting Thread.sleep(100); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index 765d8256a7..d39e6bf134 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -54,6 +54,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; @@ -1389,6 +1390,61 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase { storage.getEntry(1, 1); // entry should exist } + @Test + public void testCancelledCompactionWhenShuttingDown() throws Exception { + // prepare data + LedgerHandle[] lhs = prepareData(3, false); + + // change compaction in low throughput + // restart bookies + restartBookies(c -> { + c.setIsThrottleByBytes(true); + c.setCompactionRateByBytes(ENTRY_SIZE / 1000); + c.setMinorCompactionThreshold(0.2f); + c.setMajorCompactionThreshold(0.5f); + return c; + }); + + // remove ledger2 and ledger3 + // so entry log 1 and 2 would have ledger1 entries left + bkc.deleteLedger(lhs[1].getId()); + bkc.deleteLedger(lhs[2].getId()); + LOG.info("Finished deleting the ledgers contains most entries."); + + getGCThread().triggerGC(true, false, false); + getGCThread().throttler.cancelledAcquire(); + waitUntilTrue(() -> { + try { + return getGCThread().compacting.get(); + } catch (Exception e) { + fail("Get GC thread failed"); + } + return null; + }, () -> "Not attempting to complete", 10000, 200); + + getGCThread().shutdown(); + // after garbage collection shutdown, compaction should be cancelled when acquire permits + // and GC running flag should be false. + assertFalse(getGCThread().running); + + } + + private void waitUntilTrue(Supplier<Boolean> condition, + Supplier<String> msg, + long waitTime, + long pause) throws InterruptedException { + long startTime = System.currentTimeMillis(); + while (true) { + if (condition.get()) { + return; + } + if (System.currentTimeMillis() > startTime + waitTime) { + fail(msg.get()); + } + Thread.sleep(Math.min(waitTime, pause)); + } + } + private LedgerManager getLedgerManager(final Set<Long> ledgers) { LedgerManager manager = new LedgerManager() { @Override
