This is an automated email from the ASF dual-hosted git repository.
lushiji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 0621ae6fce Only stop Gc for the fill disk for DbLedgerStorage (#4661)
0621ae6fce is described below
commit 0621ae6fce446c00f5becf0c81744bd8f9ec1c90
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Sep 3 10:21:28 2025 +0800
Only stop Gc for the fill disk for DbLedgerStorage (#4661)
* Only stop Gc for the fill disk for DbLedgerStorage
---
.../bookie/storage/ldb/DbLedgerStorage.java | 2 +-
.../ldb/SingleDirectoryDbLedgerStorage.java | 40 +++++-
.../bookie/BookieStorageThresholdTest.java | 135 +++++++++++++++++++++
3 files changed, 171 insertions(+), 6 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index a47baddd35..69964c8f81 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -449,7 +449,7 @@ public class DbLedgerStorage implements LedgerStorage {
}
@VisibleForTesting
- List<SingleDirectoryDbLedgerStorage> getLedgerStorageList() {
+ public List<SingleDirectoryDbLedgerStorage> getLedgerStorageList() {
return ledgerStorageList;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index bab23b3235..774d10c158 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -39,6 +39,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
@@ -149,6 +150,8 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
private final Counter flushExecutorTime;
private final boolean singleLedgerDirs;
+ private final String ledgerBaseDir;
+ private final String indexBaseDir;
public SingleDirectoryDbLedgerStorage(ServerConfiguration conf,
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
@@ -158,8 +161,7 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
throws IOException {
checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
"Db implementation only allows for one storage dir");
-
- String ledgerBaseDir =
ledgerDirsManager.getAllLedgerDirs().get(0).getPath();
+ ledgerBaseDir = ledgerDirsManager.getAllLedgerDirs().get(0).getPath();
// indexBaseDir default use ledgerBaseDir
String indexBaseDir = ledgerBaseDir;
if (CollectionUtils.isEmpty(indexDirsManager.getAllLedgerDirs())
@@ -172,6 +174,7 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
log.info("indexDir is specified a separate dir, creating single
directory db ledger storage on {}",
indexBaseDir);
}
+ this.indexBaseDir = indexBaseDir;
StatsLogger ledgerIndexDirStatsLogger = statsLogger
.scopeLabel("ledgerDir", ledgerBaseDir)
@@ -228,9 +231,9 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
flushExecutorTime.addLatency(0, TimeUnit.NANOSECONDS);
});
- ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener(ledgerBaseDir));
if (!ledgerBaseDir.equals(indexBaseDir)) {
- indexDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+
indexDirsManager.addLedgerDirsListener(getLedgerDirsListener(indexBaseDir));
}
}
@@ -1151,11 +1154,19 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
"getListOfEntriesOfLedger method is currently unsupported for
SingleDirectoryDbLedgerStorage");
}
- private LedgerDirsManager.LedgerDirsListener getLedgerDirsListener() {
+ private LedgerDirsManager.LedgerDirsListener getLedgerDirsListener(String
diskPath) {
return new LedgerDirsListener() {
+ private final String currentFilePath = diskPath;
+
+ private boolean isCurrentFile(File disk) {
+ return Objects.equals(disk.getPath(), currentFilePath);
+ }
@Override
public void diskAlmostFull(File disk) {
+ if (!isCurrentFile(disk)) {
+ return;
+ }
if (gcThread.isForceGCAllowWhenNoSpace()) {
gcThread.enableForceGC();
} else {
@@ -1165,6 +1176,9 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
@Override
public void diskFull(File disk) {
+ if (!isCurrentFile(disk)) {
+ return;
+ }
if (gcThread.isForceGCAllowWhenNoSpace()) {
gcThread.enableForceGC();
} else {
@@ -1185,6 +1199,9 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
@Override
public void diskWritable(File disk) {
+ if (!isCurrentFile(disk)) {
+ return;
+ }
// we have enough space now
if (gcThread.isForceGCAllowWhenNoSpace()) {
// disable force gc.
@@ -1198,6 +1215,9 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
@Override
public void diskJustWritable(File disk) {
+ if (!isCurrentFile(disk)) {
+ return;
+ }
if (gcThread.isForceGCAllowWhenNoSpace()) {
// if a disk is just writable, we still need force gc.
gcThread.enableForceGC();
@@ -1300,4 +1320,14 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
DbLedgerStorageStats getDbLedgerStorageStats() {
return dbLedgerStorageStats;
}
+
+ @VisibleForTesting
+ public String getLedgerBaseDir() {
+ return ledgerBaseDir;
+ }
+
+ @VisibleForTesting
+ public String getIndexBaseDir() {
+ return indexBaseDir;
+ }
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
index b163064f56..e031ea8db9 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
@@ -21,21 +21,28 @@
package org.apache.bookkeeper.bookie;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.DiskChecker;
@@ -250,4 +257,132 @@ public class BookieStorageThresholdTest extends
BookKeeperClusterTestCase {
Thread.sleep(500);
assertFalse("Bookie should be transitioned to ReadWrite",
bookie.isReadOnly());
}
+
+ @org.junit.Test
+ public void testStopGCOnCorrespondingDiskWhenDiskFull() throws Exception {
+ // 1. Create test directories
+ File ledgerDir1 = tmpDirs.createNew("ledger", "test1");
+ File ledgerDir2 = tmpDirs.createNew("ledger", "test2");
+
+ // 2. Configure Bookie
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setGcWaitTime(1000);
+ conf.setLedgerDirNames(new String[] { ledgerDir1.getPath(),
ledgerDir2.getPath() });
+ conf.setDiskCheckInterval(100); // Shorten disk check interval
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+
+ // 3. Start Bookie and obtain internal components
+ Bookie bookie = new TestBookieImpl(conf);
+ BookieImpl bookieImpl = (BookieImpl) bookie;
+ LedgerDirsManager ledgerDirsManager =
bookieImpl.getLedgerDirsManager();
+
+ // 4. Create custom disk checker (key step)
+ GCTestDiskChecker diskChecker = new GCTestDiskChecker(
+ conf.getDiskUsageThreshold(),
+ conf.getDiskUsageWarnThreshold()
+ );
+ // Set directory status: dir1 full (100%), dir2 normal (50%)
+ File[] currentDirectories = BookieImpl.getCurrentDirectories(new
File[] { ledgerDir1, ledgerDir2 });
+ diskChecker.setUsageMap(currentDirectories[0], 1.0f); // 100% usage
+ diskChecker.setUsageMap(currentDirectories[1], 0.5f); // 50% usage
+
+ // 5. Replace Bookie's disk checker
+ bookieImpl.dirsMonitor.shutdown(); // Stop default monitor
+ bookieImpl.dirsMonitor = new LedgerDirsMonitor(
+ conf,
+ diskChecker,
+ Collections.singletonList(ledgerDirsManager)
+ );
+ bookieImpl.dirsMonitor.start();
+
+ // 6. Add disk state listener
+ CountDownLatch dir1Full = new CountDownLatch(1);
+ CountDownLatch dir1Writable = new CountDownLatch(1);
+
+ ledgerDirsManager.addLedgerDirsListener(new LedgerDirsListener() {
+ @Override
+ public void diskFull(File disk) {
+ if (disk.equals(currentDirectories[0])) {
+ dir1Full.countDown();
+ }
+ }
+
+ @Override
+ public void diskWritable(File disk) {
+ if (disk.equals(currentDirectories[0])) {
+ dir1Writable.countDown();
+ }
+ }
+ });
+
+ // 7. Wait for state update (ensure event is triggered)
+ assertTrue("dir1 did not trigger full state", dir1Full.await(30,
TimeUnit.SECONDS));
+
+ // 8. Verify directory status
+ List<File> fullDirs = ledgerDirsManager.getFullFilledLedgerDirs();
+ List<File> writableDirs = ledgerDirsManager.getWritableLedgerDirs();
+
+ assertTrue("dir1 should be marked as full",
fullDirs.contains(currentDirectories[0]));
+ assertTrue("dir2 should remain writable",
writableDirs.contains(currentDirectories[1]));
+ assertEquals("Only 1 writable directory should remain", 1,
writableDirs.size());
+
+ // 9. Verify GC status
+ ((DbLedgerStorage)
bookieImpl.ledgerStorage).getLedgerStorageList().forEach(storage -> {
+ if (Objects.equals(storage.getLedgerBaseDir(),
currentDirectories[0].getPath())) {
+ assertTrue("dir1 should suspend minor GC",
storage.isMinorGcSuspended());
+ assertTrue("dir1 should suspend major GC",
storage.isMajorGcSuspended());
+ } else {
+ assertFalse("dir2 should not suspend minor GC",
storage.isMinorGcSuspended());
+ assertFalse("dir2 should not suspend major GC",
storage.isMajorGcSuspended());
+ }
+ });
+
+ // 10. Restore dir1 status
+ diskChecker.setUsageMap(currentDirectories[0], 0.5f); // 50% usage
+ assertTrue("dir1 did not become writable again", dir1Writable.await(3,
TimeUnit.SECONDS));
+
+ // 11. Verify GC status after recovery
+ ((DbLedgerStorage)
bookieImpl.ledgerStorage).getLedgerStorageList().forEach(storage -> {
+ if (Objects.equals(storage.getLedgerBaseDir(),
currentDirectories[0].getPath())) {
+ assertFalse("dir1 should not suspend minor GC",
storage.isMinorGcSuspended());
+ assertFalse("dir1 should not suspend major GC",
storage.isMajorGcSuspended());
+ } else {
+ assertFalse("dir2 should not suspend minor GC",
storage.isMinorGcSuspended());
+ assertFalse("dir2 should not suspend major GC",
storage.isMajorGcSuspended());
+ }
+ });
+
+ // 12. Cleanup
+ bookie.shutdown();
+ }
+
+ // Custom disk checker (simulate different usage for directories)
+ static class GCTestDiskChecker extends DiskChecker {
+ private final Map<File, Float> usageMap = new ConcurrentHashMap<>();
+
+ public GCTestDiskChecker(float threshold, float warnThreshold) {
+ super(threshold, warnThreshold);
+ }
+
+ // Set simulated usage for a directory
+ public void setUsageMap(File dir, float usage) {
+ usageMap.put(dir, usage);
+ }
+
+ @Override
+ public float checkDir(File dir) throws DiskErrorException,
DiskWarnThresholdException, DiskOutOfSpaceException {
+ Float usage = usageMap.get(dir);
+ if (usage == null) {
+ return super.checkDir(dir); // Default behavior
+ }
+ // Throw exception based on preset usage rate
+ if (usage >= 1.0) {
+ throw new DiskOutOfSpaceException("Simulated disk full",
usage);
+ } else if (usage >= 0.9) {
+ throw new DiskWarnThresholdException("Simulated disk warning",
usage);
+ }
+ return usage;
+ }
+ }
+
}