This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.16 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit fbd33b56210bc17965c2d4bf0d835c44c5d4836f Author: danpi <[email protected]> AuthorDate: Tue Apr 1 11:53:36 2025 +0800 Fix SST files not being cleaned up in the locations folder (#4555) * fix entry location compaction * replace entryLocationCompactionEnable with entryLocationCompactionInterval * Add randomCompactionDelay to avoid all the bookies triggering compaction simultaneously * Fix the style issue * Fix the style issue * Fix test --------- Co-authored-by: houbonan <[email protected]> Co-authored-by: zymap <[email protected]> (cherry picked from commit ede1ba972cf0458ca1dc00338b5feb5b0f617aa2) --- .../bookkeeper/bookie/BookKeeperServerStats.java | 1 + .../bookkeeper/bookie/GarbageCollectionStatus.java | 2 ++ .../bookkeeper/bookie/GarbageCollectorThread.java | 38 +++++++++++++++++++++- .../bookie/stats/GarbageCollectorStats.java | 7 ++++ .../ldb/SingleDirectoryDbLedgerStorage.java | 2 ++ .../bookkeeper/conf/ServerConfiguration.java | 30 +++++++++++++++++ .../bookkeeper/conf/TestServerConfiguration.java | 20 ++++++++++++ conf/bk_server.conf | 5 +++ 8 files changed, 104 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java index d4657d2036..361861cda1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java @@ -150,6 +150,7 @@ public interface BookKeeperServerStats { String THREAD_RUNTIME = "THREAD_RUNTIME"; String MAJOR_COMPACTION_COUNT = "MAJOR_COMPACTION_TOTAL"; String MINOR_COMPACTION_COUNT = "MINOR_COMPACTION_TOTAL"; + String ENTRY_LOCATION_COMPACTION_COUNT = "ENTRY_LOCATION_COMPACTION_TOTAL"; String ACTIVE_LEDGER_COUNT = "ACTIVE_LEDGER_TOTAL"; String DELETED_LEDGER_COUNT = "DELETED_LEDGER_TOTAL"; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java index 3f872092f0..4ad450a64f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java @@ -42,6 +42,8 @@ public class GarbageCollectionStatus { private long lastMajorCompactionTime; private long lastMinorCompactionTime; + private long lastEntryLocationCompactionTime; private long majorCompactionCounter; private long minorCompactionCounter; + private long entryLocationCompactionCounter; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 4a1c028987..5abf79b2e6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -33,6 +33,7 @@ import java.util.LinkedList; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -85,6 +86,10 @@ public class GarbageCollectorThread implements Runnable { long majorCompactionMaxTimeMillis; long lastMajorCompactionTime; + final long entryLocationCompactionInterval; + long randomCompactionDelay; + long lastEntryLocationCompactionTime; + @Getter final boolean isForceGCAllowWhenNoSpace; @@ -196,6 +201,10 @@ public class GarbageCollectorThread implements Runnable { isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace(); majorCompactionMaxTimeMillis = conf.getMajorCompactionMaxTimeMillis(); minorCompactionMaxTimeMillis = conf.getMinorCompactionMaxTimeMillis(); + entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval() * SECOND; + if (entryLocationCompactionInterval > 0) { + randomCompactionDelay = ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval); + } boolean isForceAllowCompaction = conf.isForceAllowCompaction(); @@ -262,12 +271,22 @@ public class GarbageCollectorThread implements Runnable { } } + if (entryLocationCompactionInterval > 0) { + if (entryLocationCompactionInterval < gcWaitTime) { + throw new IOException( + "Too short entry location compaction interval : " + entryLocationCompactionInterval); + } + } + LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", threshold=" + minorCompactionThreshold + ", interval=" + minorCompactionInterval); LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", threshold=" + majorCompactionThreshold + ", interval=" + majorCompactionInterval); + LOG.info("Entry Location Compaction : interval=" + entryLocationCompactionInterval + ", randomCompactionDelay=" + + randomCompactionDelay); - lastMinorCompactionTime = lastMajorCompactionTime = System.currentTimeMillis(); + lastMinorCompactionTime = lastMajorCompactionTime = + lastEntryLocationCompactionTime = System.currentTimeMillis(); } private EntryLogMetadataMap createEntryLogMetadataMap() throws IOException { @@ -445,6 +464,7 @@ public class GarbageCollectorThread implements Runnable { gcStats.getMajorCompactionCounter().inc(); majorCompacting.set(false); } + } else if (((isForceMinorCompactionAllow && force) || (enableMinorCompaction && (force || curTime - lastMinorCompactionTime > minorCompactionInterval))) && (!suspendMinor)) { @@ -459,6 +479,20 @@ public class GarbageCollectorThread implements Runnable { minorCompacting.set(false); } } + if (entryLocationCompactionInterval > 0 && (curTime - lastEntryLocationCompactionTime > ( + entryLocationCompactionInterval + randomCompactionDelay))) { + // enter entry location compaction + LOG.info( + "Enter entry location compaction, entryLocationCompactionInterval {}, randomCompactionDelay " + + "{}, lastEntryLocationCompactionTime {}", + entryLocationCompactionInterval, randomCompactionDelay, lastEntryLocationCompactionTime); + ledgerStorage.entryLocationCompact(); + lastEntryLocationCompactionTime = System.currentTimeMillis(); + randomCompactionDelay = ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval); + LOG.info("Next entry location compaction interval {}", + entryLocationCompactionInterval + randomCompactionDelay); + gcStats.getEntryLocationCompactionCounter().inc(); + } gcStats.getGcThreadRuntime().registerSuccessfulEvent( MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS); } catch (EntryLogMetadataMapException e) { @@ -796,8 +830,10 @@ public class GarbageCollectorThread implements Runnable { .minorCompacting(minorCompacting.get()) .lastMajorCompactionTime(lastMajorCompactionTime) .lastMinorCompactionTime(lastMinorCompactionTime) + .lastEntryLocationCompactionTime(lastEntryLocationCompactionTime) .majorCompactionCounter(gcStats.getMajorCompactionCounter().get()) .minorCompactionCounter(gcStats.getMinorCompactionCounter().get()) + .entryLocationCompactionCounter(gcStats.getEntryLocationCompactionCounter().get()) .build(); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java index 1c9475608f..3029e7d55b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java @@ -25,6 +25,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ACTIVE_LEDGER_C import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.DELETED_LEDGER_COUNT; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOCATION_COMPACTION_COUNT; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.MAJOR_COMPACTION_COUNT; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.MINOR_COMPACTION_COUNT; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.RECLAIMED_COMPACTION_SPACE_BYTES; @@ -61,6 +62,11 @@ public class GarbageCollectorStats { help = "Number of major compactions" ) private final Counter majorCompactionCounter; + @StatsDoc( + name = ENTRY_LOCATION_COMPACTION_COUNT, + help = "Number of entry location compactions" + ) + private final Counter entryLocationCompactionCounter; @StatsDoc( name = RECLAIMED_DELETION_SPACE_BYTES, help = "Number of disk space bytes reclaimed via deleting entry log files" @@ -105,6 +111,7 @@ public class GarbageCollectorStats { this.minorCompactionCounter = statsLogger.getCounter(MINOR_COMPACTION_COUNT); this.majorCompactionCounter = statsLogger.getCounter(MAJOR_COMPACTION_COUNT); + this.entryLocationCompactionCounter = statsLogger.getCounter(ENTRY_LOCATION_COMPACTION_COUNT); this.reclaimedSpaceViaCompaction = statsLogger.getCounter(RECLAIMED_COMPACTION_SPACE_BYTES); this.reclaimedSpaceViaDeletes = statsLogger.getCounter(RECLAIMED_DELETION_SPACE_BYTES); this.gcThreadRuntime = statsLogger.getOpStatsLogger(THREAD_RUNTIME); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 61aebd8e1a..867ba905ff 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -313,6 +313,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage public void entryLocationCompact() { if (entryLocationIndex.isCompacting()) { // RocksDB already running compact. + log.info("Compacting directory {}, skipping this entryLocationCompaction this time.", + entryLocationIndex.getEntryLocationDBPath()); return; } cleanupExecutor.execute(() -> { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 019144e994..1f04e53f57 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -105,6 +105,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati protected static final String COMPACTION_RATE = "compactionRate"; protected static final String COMPACTION_RATE_BY_ENTRIES = "compactionRateByEntries"; protected static final String COMPACTION_RATE_BY_BYTES = "compactionRateByBytes"; + protected static final String ENTRY_LOCATION_COMPACTION_INTERVAL = "entryLocationCompactionInterval"; // Gc Parameters protected static final String GC_WAIT_TIME = "gcWaitTime"; @@ -2969,6 +2970,31 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati return this; } + /** + * Get interval to run entry location compaction, in seconds. + * + * <p>If it is set to less than zero, the entry location compaction is disabled. + * + * @return high water mark. + */ + public long getEntryLocationCompactionInterval() { + return getLong(ENTRY_LOCATION_COMPACTION_INTERVAL, -1); + } + + /** + * Set interval to run entry location compaction. + * + * @see #getMajorCompactionInterval() + * + * @param interval + * Interval to run entry location compaction + * @return server configuration + */ + public ServerConfiguration setEntryLocationCompactionInterval(long interval) { + setProperty(ENTRY_LOCATION_COMPACTION_INTERVAL, interval); + return this; + } + /** * Should we remove pages from page cache after force write. * @@ -3187,6 +3213,10 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati if (getMajorCompactionInterval() > 0 && getMajorCompactionInterval() * SECOND < getGcWaitTime()) { throw new ConfigurationException("majorCompactionInterval should be >= gcWaitTime."); } + if (getEntryLocationCompactionInterval() > 0 + && getEntryLocationCompactionInterval() * SECOND < getGcWaitTime()) { + throw new ConfigurationException("entryLocationCompactionInterval should be >= gcWaitTime."); + } } /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java index 04ac87818f..d8aa62d0d2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java @@ -158,6 +158,7 @@ public class TestServerConfiguration { public void testCompactionSettings() throws ConfigurationException { ServerConfiguration conf = new ServerConfiguration(); long major, minor; + long entryLocationCompactionInterval; // Default Values major = conf.getMajorCompactionMaxTimeMillis(); @@ -239,5 +240,24 @@ public class TestServerConfiguration { minorThreshold = conf.getMinorCompactionThreshold(); Assert.assertEquals(0.6, majorThreshold, 0.00001); Assert.assertEquals(0.3, minorThreshold, 0.00001); + + // Default Values + entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval(); + Assert.assertEquals(-1, entryLocationCompactionInterval); + + // Set entry location compaction + conf.setEntryLocationCompactionInterval(3600); + entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval(); + Assert.assertEquals(3600, entryLocationCompactionInterval); + + conf.setEntryLocationCompactionInterval(550); + try { + conf.validate(); + fail(); + } catch (ConfigurationException ignore) { + } + + conf.setEntryLocationCompactionInterval(650); + conf.validate(); } } diff --git a/conf/bk_server.conf b/conf/bk_server.conf index c0a021418d..be3d845fad 100755 --- a/conf/bk_server.conf +++ b/conf/bk_server.conf @@ -581,6 +581,11 @@ ledgerDirectories=/tmp/bk-data # Set the rate at which compaction will readd entries. The unit is bytes added per second. # compactionRateByBytes=1000000 +# Interval to run entry location compaction, in seconds +# If it is set to less than zero, the entry location compaction is disabled. +# Note: should be greater than gcWaitTime. +# entryLocationCompactionInterval=-1 + # Flag to enable/disable transactional compaction. If it is set to true, it will use transactional compaction, # which it will use new entry log files to store compacted entries during compaction; if it is set to false, # it will use normal compaction, which it shares same entry log file with normal add operations.
