This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new ede1ba972c Fix SST files not being cleaned up in the locations folder
(#4555)
ede1ba972c is described below
commit ede1ba972cf0458ca1dc00338b5feb5b0f617aa2
Author: danpi <[email protected]>
AuthorDate: Tue Apr 1 11:53:36 2025 +0800
Fix SST files not being cleaned up in the locations folder (#4555)
* fix entry location compaction
* replace entryLocationCompactionEnable with entryLocationCompactionInterval
* Add randomCompactionDelay to avoid all the bookies triggering compaction
simultaneously
* Fix the style issue
* Fix the style issue
* Fix test
---------
Co-authored-by: houbonan <[email protected]>
Co-authored-by: zymap <[email protected]>
---
.../bookkeeper/bookie/BookKeeperServerStats.java | 1 +
.../bookkeeper/bookie/GarbageCollectionStatus.java | 2 ++
.../bookkeeper/bookie/GarbageCollectorThread.java | 38 +++++++++++++++++++++-
.../bookie/stats/GarbageCollectorStats.java | 7 ++++
.../ldb/SingleDirectoryDbLedgerStorage.java | 2 ++
.../bookkeeper/conf/ServerConfiguration.java | 30 +++++++++++++++++
.../bookkeeper/conf/TestServerConfiguration.java | 20 ++++++++++++
conf/bk_server.conf | 5 +++
8 files changed, 104 insertions(+), 1 deletion(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index cc1d3a4ad6..8cb515b1e6 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -153,6 +153,7 @@ public interface BookKeeperServerStats {
String THREAD_RUNTIME = "THREAD_RUNTIME";
String MAJOR_COMPACTION_COUNT = "MAJOR_COMPACTION_TOTAL";
String MINOR_COMPACTION_COUNT = "MINOR_COMPACTION_TOTAL";
+ String ENTRY_LOCATION_COMPACTION_COUNT = "ENTRY_LOCATION_COMPACTION_TOTAL";
String ACTIVE_LEDGER_COUNT = "ACTIVE_LEDGER_TOTAL";
String DELETED_LEDGER_COUNT = "DELETED_LEDGER_TOTAL";
String GC_LEDGER_RUNTIME = "GC_LEDGER_RUNTIME";
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java
index 3f872092f0..4ad450a64f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java
@@ -42,6 +42,8 @@ public class GarbageCollectionStatus {
private long lastMajorCompactionTime;
private long lastMinorCompactionTime;
+ private long lastEntryLocationCompactionTime;
private long majorCompactionCounter;
private long minorCompactionCounter;
+ private long entryLocationCompactionCounter;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index f9bdef9d56..513c54a82f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -34,6 +34,7 @@ import java.util.LinkedList;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -87,6 +88,10 @@ public class GarbageCollectorThread implements Runnable {
long majorCompactionMaxTimeMillis;
long lastMajorCompactionTime;
+ final long entryLocationCompactionInterval;
+ long randomCompactionDelay;
+ long lastEntryLocationCompactionTime;
+
@Getter
final boolean isForceGCAllowWhenNoSpace;
@@ -211,6 +216,10 @@ public class GarbageCollectorThread implements Runnable {
isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace();
majorCompactionMaxTimeMillis = conf.getMajorCompactionMaxTimeMillis();
minorCompactionMaxTimeMillis = conf.getMinorCompactionMaxTimeMillis();
+ entryLocationCompactionInterval =
conf.getEntryLocationCompactionInterval() * SECOND;
+ if (entryLocationCompactionInterval > 0) {
+ randomCompactionDelay =
ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval);
+ }
boolean isForceAllowCompaction = conf.isForceAllowCompaction();
@@ -277,12 +286,22 @@ public class GarbageCollectorThread implements Runnable {
}
}
+ if (entryLocationCompactionInterval > 0) {
+ if (entryLocationCompactionInterval < gcWaitTime) {
+ throw new IOException(
+ "Too short entry location compaction interval : " +
entryLocationCompactionInterval);
+ }
+ }
+
LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ",
threshold="
+ minorCompactionThreshold + ", interval=" +
minorCompactionInterval);
LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ",
threshold="
+ majorCompactionThreshold + ", interval=" +
majorCompactionInterval);
+ LOG.info("Entry Location Compaction : interval=" +
entryLocationCompactionInterval + ", randomCompactionDelay="
+ + randomCompactionDelay);
- lastMinorCompactionTime = lastMajorCompactionTime =
System.currentTimeMillis();
+ lastMinorCompactionTime = lastMajorCompactionTime =
+ lastEntryLocationCompactionTime = System.currentTimeMillis();
}
private EntryLogMetadataMap createEntryLogMetadataMap() throws IOException
{
@@ -470,6 +489,7 @@ public class GarbageCollectorThread implements Runnable {
gcStats.getMajorCompactionCounter().inc();
majorCompacting.set(false);
}
+
} else if (((isForceMinorCompactionAllow && force) ||
(enableMinorCompaction
&& (force || curTime - lastMinorCompactionTime >
minorCompactionInterval)))
&& (!suspendMinor)) {
@@ -489,6 +509,20 @@ public class GarbageCollectorThread implements Runnable {
minorCompacting.set(false);
}
}
+ if (entryLocationCompactionInterval > 0 && (curTime -
lastEntryLocationCompactionTime > (
+ entryLocationCompactionInterval + randomCompactionDelay)))
{
+ // enter entry location compaction
+ LOG.info(
+ "Enter entry location compaction,
entryLocationCompactionInterval {}, randomCompactionDelay "
+ + "{}, lastEntryLocationCompactionTime {}",
+ entryLocationCompactionInterval,
randomCompactionDelay, lastEntryLocationCompactionTime);
+ ledgerStorage.entryLocationCompact();
+ lastEntryLocationCompactionTime = System.currentTimeMillis();
+ randomCompactionDelay =
ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval);
+ LOG.info("Next entry location compaction interval {}",
+ entryLocationCompactionInterval +
randomCompactionDelay);
+ gcStats.getEntryLocationCompactionCounter().inc();
+ }
gcStats.getCompactRuntime()
.registerSuccessfulEvent(MathUtils.elapsedNanos(compactStart),
TimeUnit.NANOSECONDS);
gcStats.getGcThreadRuntime().registerSuccessfulEvent(
@@ -855,8 +889,10 @@ public class GarbageCollectorThread implements Runnable {
.minorCompacting(minorCompacting.get())
.lastMajorCompactionTime(lastMajorCompactionTime)
.lastMinorCompactionTime(lastMinorCompactionTime)
+ .lastEntryLocationCompactionTime(lastEntryLocationCompactionTime)
.majorCompactionCounter(gcStats.getMajorCompactionCounter().get())
.minorCompactionCounter(gcStats.getMinorCompactionCounter().get())
+
.entryLocationCompactionCounter(gcStats.getEntryLocationCompactionCounter().get())
.build();
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java
index f579036df0..a9ecd180dd 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java
@@ -26,6 +26,7 @@ import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.COMPACT_RUNTIME;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.DELETED_LEDGER_COUNT;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOCATION_COMPACTION_COUNT;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOG_COMPACT_RATIO;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOG_SPACE_BYTES;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.EXTRACT_META_RUNTIME;
@@ -67,6 +68,11 @@ public class GarbageCollectorStats {
help = "Number of major compactions"
)
private final Counter majorCompactionCounter;
+ @StatsDoc(
+ name = ENTRY_LOCATION_COMPACTION_COUNT,
+ help = "Number of entry location compactions"
+ )
+ private final Counter entryLocationCompactionCounter;
@StatsDoc(
name = RECLAIMED_DELETION_SPACE_BYTES,
help = "Number of disk space bytes reclaimed via deleting entry log
files"
@@ -147,6 +153,7 @@ public class GarbageCollectorStats {
this.minorCompactionCounter =
statsLogger.getCounter(MINOR_COMPACTION_COUNT);
this.majorCompactionCounter =
statsLogger.getCounter(MAJOR_COMPACTION_COUNT);
+ this.entryLocationCompactionCounter =
statsLogger.getCounter(ENTRY_LOCATION_COMPACTION_COUNT);
this.reclaimedSpaceViaCompaction =
statsLogger.getCounter(RECLAIMED_COMPACTION_SPACE_BYTES);
this.reclaimedSpaceViaDeletes =
statsLogger.getCounter(RECLAIMED_DELETION_SPACE_BYTES);
this.reclaimFailedToDelete =
statsLogger.getCounter(RECLAIM_FAILED_TO_DELETE);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 6ce2d4b4f5..1a1e92dd30 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -313,6 +313,8 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
public void entryLocationCompact() {
if (entryLocationIndex.isCompacting()) {
// RocksDB already running compact.
+ log.info("Compacting directory {}, skipping this
entryLocationCompaction this time.",
+ entryLocationIndex.getEntryLocationDBPath());
return;
}
cleanupExecutor.execute(() -> {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 01848882c5..65cfa2d7aa 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -106,6 +106,7 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
protected static final String COMPACTION_RATE = "compactionRate";
protected static final String COMPACTION_RATE_BY_ENTRIES =
"compactionRateByEntries";
protected static final String COMPACTION_RATE_BY_BYTES =
"compactionRateByBytes";
+ protected static final String ENTRY_LOCATION_COMPACTION_INTERVAL =
"entryLocationCompactionInterval";
// Gc Parameters
protected static final String GC_WAIT_TIME = "gcWaitTime";
@@ -2974,6 +2975,31 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
return this;
}
+ /**
+ * Get interval to run entry location compaction, in seconds.
+ *
+ * <p>If it is set to less than zero, the entry location compaction is
disabled.
+ *
+ * @return high water mark.
+ */
+ public long getEntryLocationCompactionInterval() {
+ return getLong(ENTRY_LOCATION_COMPACTION_INTERVAL, -1);
+ }
+
+ /**
+ * Set interval to run entry location compaction.
+ *
+ * @see #getMajorCompactionInterval()
+ *
+ * @param interval
+ * Interval to run entry location compaction
+ * @return server configuration
+ */
+ public ServerConfiguration setEntryLocationCompactionInterval(long
interval) {
+ setProperty(ENTRY_LOCATION_COMPACTION_INTERVAL, interval);
+ return this;
+ }
+
/**
* Should we remove pages from page cache after force write.
*
@@ -3214,6 +3240,10 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
if (getMajorCompactionInterval() > 0 && getMajorCompactionInterval() *
SECOND < getGcWaitTime()) {
throw new ConfigurationException("majorCompactionInterval should
be >= gcWaitTime.");
}
+ if (getEntryLocationCompactionInterval() > 0
+ && getEntryLocationCompactionInterval() * SECOND <
getGcWaitTime()) {
+ throw new ConfigurationException("entryLocationCompactionInterval
should be >= gcWaitTime.");
+ }
}
/**
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java
index 04ac87818f..d8aa62d0d2 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java
@@ -158,6 +158,7 @@ public class TestServerConfiguration {
public void testCompactionSettings() throws ConfigurationException {
ServerConfiguration conf = new ServerConfiguration();
long major, minor;
+ long entryLocationCompactionInterval;
// Default Values
major = conf.getMajorCompactionMaxTimeMillis();
@@ -239,5 +240,24 @@ public class TestServerConfiguration {
minorThreshold = conf.getMinorCompactionThreshold();
Assert.assertEquals(0.6, majorThreshold, 0.00001);
Assert.assertEquals(0.3, minorThreshold, 0.00001);
+
+ // Default Values
+ entryLocationCompactionInterval =
conf.getEntryLocationCompactionInterval();
+ Assert.assertEquals(-1, entryLocationCompactionInterval);
+
+ // Set entry location compaction
+ conf.setEntryLocationCompactionInterval(3600);
+ entryLocationCompactionInterval =
conf.getEntryLocationCompactionInterval();
+ Assert.assertEquals(3600, entryLocationCompactionInterval);
+
+ conf.setEntryLocationCompactionInterval(550);
+ try {
+ conf.validate();
+ fail();
+ } catch (ConfigurationException ignore) {
+ }
+
+ conf.setEntryLocationCompactionInterval(650);
+ conf.validate();
}
}
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 58a1a89589..98b5264aca 100644
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -581,6 +581,11 @@ ledgerDirectories=/tmp/bk-data
# Set the rate at which compaction will readd entries. The unit is bytes added
per second.
# compactionRateByBytes=1000000
+# Interval to run entry location compaction, in seconds
+# If it is set to less than zero, the entry location compaction is disabled.
+# Note: should be greater than gcWaitTime.
+# entryLocationCompactionInterval=-1
+
# Flag to enable/disable transactional compaction. If it is set to true, it
will use transactional compaction,
# which it will use new entry log files to store compacted entries during
compaction; if it is set to false,
# it will use normal compaction, which it shares same entry log file with
normal add operations.