This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit fbd33b56210bc17965c2d4bf0d835c44c5d4836f
Author: danpi <[email protected]>
AuthorDate: Tue Apr 1 11:53:36 2025 +0800

    Fix SST files not being cleaned up in the locations folder (#4555)
    
    * fix entry location compaction
    
    * replace entryLocationCompactionEnable with entryLocationCompactionInterval
    
    * Add randomCompactionDelay to avoid all the bookies triggering compaction 
simultaneously
    
    * Fix the style issue
    
    * Fix the style issue
    
    * Fix test
    
    ---------
    
    Co-authored-by: houbonan <[email protected]>
    Co-authored-by: zymap <[email protected]>
    (cherry picked from commit ede1ba972cf0458ca1dc00338b5feb5b0f617aa2)
---
 .../bookkeeper/bookie/BookKeeperServerStats.java   |  1 +
 .../bookkeeper/bookie/GarbageCollectionStatus.java |  2 ++
 .../bookkeeper/bookie/GarbageCollectorThread.java  | 38 +++++++++++++++++++++-
 .../bookie/stats/GarbageCollectorStats.java        |  7 ++++
 .../ldb/SingleDirectoryDbLedgerStorage.java        |  2 ++
 .../bookkeeper/conf/ServerConfiguration.java       | 30 +++++++++++++++++
 .../bookkeeper/conf/TestServerConfiguration.java   | 20 ++++++++++++
 conf/bk_server.conf                                |  5 +++
 8 files changed, 104 insertions(+), 1 deletion(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index d4657d2036..361861cda1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -150,6 +150,7 @@ public interface BookKeeperServerStats {
     String THREAD_RUNTIME = "THREAD_RUNTIME";
     String MAJOR_COMPACTION_COUNT = "MAJOR_COMPACTION_TOTAL";
     String MINOR_COMPACTION_COUNT = "MINOR_COMPACTION_TOTAL";
+    String ENTRY_LOCATION_COMPACTION_COUNT = "ENTRY_LOCATION_COMPACTION_TOTAL";
     String ACTIVE_LEDGER_COUNT = "ACTIVE_LEDGER_TOTAL";
     String DELETED_LEDGER_COUNT = "DELETED_LEDGER_TOTAL";
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java
index 3f872092f0..4ad450a64f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java
@@ -42,6 +42,8 @@ public class GarbageCollectionStatus {
 
     private long lastMajorCompactionTime;
     private long lastMinorCompactionTime;
+    private long lastEntryLocationCompactionTime;
     private long majorCompactionCounter;
     private long minorCompactionCounter;
+    private long entryLocationCompactionCounter;
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 4a1c028987..5abf79b2e6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -33,6 +33,7 @@ import java.util.LinkedList;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -85,6 +86,10 @@ public class GarbageCollectorThread implements Runnable {
     long majorCompactionMaxTimeMillis;
     long lastMajorCompactionTime;
 
+    final long entryLocationCompactionInterval;
+    long randomCompactionDelay;
+    long lastEntryLocationCompactionTime;
+
     @Getter
     final boolean isForceGCAllowWhenNoSpace;
 
@@ -196,6 +201,10 @@ public class GarbageCollectorThread implements Runnable {
         isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace();
         majorCompactionMaxTimeMillis = conf.getMajorCompactionMaxTimeMillis();
         minorCompactionMaxTimeMillis = conf.getMinorCompactionMaxTimeMillis();
+        entryLocationCompactionInterval = 
conf.getEntryLocationCompactionInterval() * SECOND;
+        if (entryLocationCompactionInterval > 0) {
+            randomCompactionDelay = 
ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval);
+        }
 
         boolean isForceAllowCompaction = conf.isForceAllowCompaction();
 
@@ -262,12 +271,22 @@ public class GarbageCollectorThread implements Runnable {
             }
         }
 
+        if (entryLocationCompactionInterval > 0) {
+            if (entryLocationCompactionInterval < gcWaitTime) {
+                throw new IOException(
+                        "Too short entry location compaction interval : " + 
entryLocationCompactionInterval);
+            }
+        }
+
         LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", 
threshold="
                + minorCompactionThreshold + ", interval=" + 
minorCompactionInterval);
         LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", 
threshold="
                + majorCompactionThreshold + ", interval=" + 
majorCompactionInterval);
+        LOG.info("Entry Location Compaction : interval=" + 
entryLocationCompactionInterval + ", randomCompactionDelay="
+                + randomCompactionDelay);
 
-        lastMinorCompactionTime = lastMajorCompactionTime = 
System.currentTimeMillis();
+        lastMinorCompactionTime = lastMajorCompactionTime =
+            lastEntryLocationCompactionTime = System.currentTimeMillis();
     }
 
     private EntryLogMetadataMap createEntryLogMetadataMap() throws IOException 
{
@@ -445,6 +464,7 @@ public class GarbageCollectorThread implements Runnable {
                     gcStats.getMajorCompactionCounter().inc();
                     majorCompacting.set(false);
                 }
+
             } else if (((isForceMinorCompactionAllow && force) || 
(enableMinorCompaction
                     && (force || curTime - lastMinorCompactionTime > 
minorCompactionInterval)))
                     && (!suspendMinor)) {
@@ -459,6 +479,20 @@ public class GarbageCollectorThread implements Runnable {
                     minorCompacting.set(false);
                 }
             }
+            if (entryLocationCompactionInterval > 0 && (curTime - 
lastEntryLocationCompactionTime > (
+                entryLocationCompactionInterval + randomCompactionDelay))) {
+                // enter entry location compaction
+                LOG.info(
+                    "Enter entry location compaction, 
entryLocationCompactionInterval {}, randomCompactionDelay "
+                        + "{}, lastEntryLocationCompactionTime {}",
+                    entryLocationCompactionInterval, randomCompactionDelay, 
lastEntryLocationCompactionTime);
+                ledgerStorage.entryLocationCompact();
+                lastEntryLocationCompactionTime = System.currentTimeMillis();
+                randomCompactionDelay = 
ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval);
+                LOG.info("Next entry location compaction interval {}",
+                    entryLocationCompactionInterval + randomCompactionDelay);
+                gcStats.getEntryLocationCompactionCounter().inc();
+            }
             gcStats.getGcThreadRuntime().registerSuccessfulEvent(
                     MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS);
         } catch (EntryLogMetadataMapException e) {
@@ -796,8 +830,10 @@ public class GarbageCollectorThread implements Runnable {
             .minorCompacting(minorCompacting.get())
             .lastMajorCompactionTime(lastMajorCompactionTime)
             .lastMinorCompactionTime(lastMinorCompactionTime)
+            .lastEntryLocationCompactionTime(lastEntryLocationCompactionTime)
             .majorCompactionCounter(gcStats.getMajorCompactionCounter().get())
             .minorCompactionCounter(gcStats.getMinorCompactionCounter().get())
+            
.entryLocationCompactionCounter(gcStats.getEntryLocationCompactionCounter().get())
             .build();
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java
index 1c9475608f..3029e7d55b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java
@@ -25,6 +25,7 @@ import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.ACTIVE_LEDGER_C
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.DELETED_LEDGER_COUNT;
+import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOCATION_COMPACTION_COUNT;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.MAJOR_COMPACTION_COUNT;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.MINOR_COMPACTION_COUNT;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.RECLAIMED_COMPACTION_SPACE_BYTES;
@@ -61,6 +62,11 @@ public class GarbageCollectorStats {
         help = "Number of major compactions"
     )
     private final Counter majorCompactionCounter;
+    @StatsDoc(
+            name = ENTRY_LOCATION_COMPACTION_COUNT,
+            help = "Number of entry location compactions"
+    )
+    private final Counter entryLocationCompactionCounter;
     @StatsDoc(
         name = RECLAIMED_DELETION_SPACE_BYTES,
         help = "Number of disk space bytes reclaimed via deleting entry log 
files"
@@ -105,6 +111,7 @@ public class GarbageCollectorStats {
 
         this.minorCompactionCounter = 
statsLogger.getCounter(MINOR_COMPACTION_COUNT);
         this.majorCompactionCounter = 
statsLogger.getCounter(MAJOR_COMPACTION_COUNT);
+        this.entryLocationCompactionCounter = 
statsLogger.getCounter(ENTRY_LOCATION_COMPACTION_COUNT);
         this.reclaimedSpaceViaCompaction = 
statsLogger.getCounter(RECLAIMED_COMPACTION_SPACE_BYTES);
         this.reclaimedSpaceViaDeletes = 
statsLogger.getCounter(RECLAIMED_DELETION_SPACE_BYTES);
         this.gcThreadRuntime = statsLogger.getOpStatsLogger(THREAD_RUNTIME);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 61aebd8e1a..867ba905ff 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -313,6 +313,8 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
     public void entryLocationCompact() {
         if (entryLocationIndex.isCompacting()) {
             // RocksDB already running compact.
+            log.info("Compacting directory {}, skipping this 
entryLocationCompaction this time.",
+                    entryLocationIndex.getEntryLocationDBPath());
             return;
         }
         cleanupExecutor.execute(() -> {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 019144e994..1f04e53f57 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -105,6 +105,7 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     protected static final String COMPACTION_RATE = "compactionRate";
     protected static final String COMPACTION_RATE_BY_ENTRIES = 
"compactionRateByEntries";
     protected static final String COMPACTION_RATE_BY_BYTES = 
"compactionRateByBytes";
+    protected static final String ENTRY_LOCATION_COMPACTION_INTERVAL = 
"entryLocationCompactionInterval";
 
     // Gc Parameters
     protected static final String GC_WAIT_TIME = "gcWaitTime";
@@ -2969,6 +2970,31 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
         return this;
     }
 
+    /**
+     * Get interval to run entry location compaction, in seconds.
+     *
+     * <p>If it is set to less than zero, the entry location compaction is 
disabled.
+     *
+     * @return high water mark.
+     */
+    public long getEntryLocationCompactionInterval() {
+        return getLong(ENTRY_LOCATION_COMPACTION_INTERVAL, -1);
+    }
+
+    /**
+     * Set interval to run entry location compaction.
+     *
+     * @see #getMajorCompactionInterval()
+     *
+     * @param interval
+     *          Interval to run entry location compaction
+     * @return server configuration
+     */
+    public ServerConfiguration setEntryLocationCompactionInterval(long 
interval) {
+        setProperty(ENTRY_LOCATION_COMPACTION_INTERVAL, interval);
+        return this;
+    }
+
     /**
      * Should we remove pages from page cache after force write.
      *
@@ -3187,6 +3213,10 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
         if (getMajorCompactionInterval() > 0 && getMajorCompactionInterval() * 
SECOND < getGcWaitTime()) {
             throw new ConfigurationException("majorCompactionInterval should 
be >= gcWaitTime.");
         }
+        if (getEntryLocationCompactionInterval() > 0
+            && getEntryLocationCompactionInterval() * SECOND < 
getGcWaitTime()) {
+            throw new ConfigurationException("entryLocationCompactionInterval 
should be >= gcWaitTime.");
+        }
     }
 
     /**
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java
index 04ac87818f..d8aa62d0d2 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java
@@ -158,6 +158,7 @@ public class TestServerConfiguration {
     public void testCompactionSettings() throws ConfigurationException {
         ServerConfiguration conf = new ServerConfiguration();
         long major, minor;
+        long entryLocationCompactionInterval;
 
         // Default Values
         major = conf.getMajorCompactionMaxTimeMillis();
@@ -239,5 +240,24 @@ public class TestServerConfiguration {
         minorThreshold = conf.getMinorCompactionThreshold();
         Assert.assertEquals(0.6, majorThreshold, 0.00001);
         Assert.assertEquals(0.3, minorThreshold, 0.00001);
+
+        // Default Values
+        entryLocationCompactionInterval = 
conf.getEntryLocationCompactionInterval();
+        Assert.assertEquals(-1, entryLocationCompactionInterval);
+
+        // Set entry location compaction
+        conf.setEntryLocationCompactionInterval(3600);
+        entryLocationCompactionInterval = 
conf.getEntryLocationCompactionInterval();
+        Assert.assertEquals(3600, entryLocationCompactionInterval);
+
+        conf.setEntryLocationCompactionInterval(550);
+        try {
+            conf.validate();
+            fail();
+        } catch (ConfigurationException ignore) {
+        }
+
+        conf.setEntryLocationCompactionInterval(650);
+        conf.validate();
     }
 }
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index c0a021418d..be3d845fad 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -581,6 +581,11 @@ ledgerDirectories=/tmp/bk-data
 # Set the rate at which compaction will readd entries. The unit is bytes added 
per second.
 # compactionRateByBytes=1000000
 
+# Interval to run entry location compaction, in seconds
+# If it is set to less than zero, the entry location compaction is disabled.
+# Note: should be greater than gcWaitTime.
+# entryLocationCompactionInterval=-1
+
 # Flag to enable/disable transactional compaction. If it is set to true, it 
will use transactional compaction,
 # which it will use new entry log files to store compacted entries during 
compaction; if it is set to false,
 # it will use normal compaction, which it shares same entry log file with 
normal add operations.

Reply via email to