This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 28de2a8 Issue #1124: Lower memory usage in GarbageCollectionThread
while extracting all ledger meta data
28de2a8 is described below
commit 28de2a8d626f43eda39af00e452e3c12c98c5cba
Author: Kishor Patil <[email protected]>
AuthorDate: Sat Feb 17 00:58:52 2018 +0800
Issue #1124: Lower memory usage in GarbageCollectionThread while extracting
all ledger meta data
Descriptions of the changes in this PR:
The PR contains the fix to cleanup non-existent ledger log entries from
EntryLogMetadata while extracting all log entries.
Master Issue: #1124
Author: Kishor Patil <[email protected]>
Reviewers: Yiming Zang <[email protected]>, Sijie Guo <[email protected]>
This closes #1125 from kishorvpatil/gcThreadFix, closes #1124
---
.../bookkeeper/bookie/GarbageCollectorThread.java | 30 ++++++----
.../apache/bookkeeper/bookie/CompactionTest.java | 70 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 11 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 4880fa3..79515ea 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -389,16 +389,7 @@ public class GarbageCollectorThread extends SafeRunnable {
// Loop through all of the entry logs and remove the non-active
ledgers.
entryLogMetaMap.forEach((entryLogId, meta) -> {
- meta.removeLedgerIf((entryLogLedger) -> {
- // Remove the entry log ledger from the set if it isn't active.
- try {
- return !ledgerStorage.ledgerExists(entryLogLedger);
- } catch (IOException e) {
- LOG.error("Error reading from ledger storage", e);
- return false;
- }
- });
-
+ removeIfLedgerNotExists(meta);
if (meta.isEmpty()) {
// This means the entry log is not associated with any active
ledgers anymore.
// We can remove this entry log file now.
@@ -414,6 +405,18 @@ public class GarbageCollectorThread extends SafeRunnable {
this.numActiveEntryLogs = entryLogMetaMap.keySet().size();
}
+ private void removeIfLedgerNotExists(EntryLogMetadata meta) {
+ meta.removeLedgerIf((entryLogLedger) -> {
+ // Remove the entry log ledger from the set if it isn't active.
+ try {
+ return !ledgerStorage.ledgerExists(entryLogLedger);
+ } catch (IOException e) {
+ LOG.error("Error reading from ledger storage", e);
+ return false;
+ }
+ });
+ }
+
/**
* Compact entry logs if necessary.
*
@@ -546,7 +549,12 @@ public class GarbageCollectorThread extends SafeRunnable {
try {
// Read through the entry log file and extract the entry log
meta
EntryLogMetadata entryLogMeta =
entryLogger.getEntryLogMetadata(entryLogId);
- entryLogMetaMap.put(entryLogId, entryLogMeta);
+ removeIfLedgerNotExists(entryLogMeta);
+ if (entryLogMeta.isEmpty()) {
+ entryLogger.removeEntryLog(entryLogId);
+ } else {
+ entryLogMetaMap.put(entryLogId, entryLogMeta);
+ }
} catch (IOException e) {
hasExceptionWhenScan = true;
LOG.warn("Premature exception when processing " + entryLogId
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 6873a5c..cbf844f 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -39,8 +39,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -835,6 +837,74 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
storage.gcThread.doCompactEntryLogs(threshold);
}
+ /**
+ * Test extractMetaFromEntryLogs optimized method to avoid excess memory
usage.
+ */
+ public void testExtractMetaFromEntryLogs() throws Exception {
+ // Always run this test with Throttle enabled.
+ baseConf.setIsThrottleByBytes(true);
+ // restart bookies
+ restartBookies(baseConf);
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ File tmpDir = createTempDir("bkTest", ".dir");
+ File curDir = Bookie.getCurrentDirectory(tmpDir);
+ Bookie.checkDirectoryStructure(curDir);
+ conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+
+ LedgerDirsManager dirs = new LedgerDirsManager(conf,
conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
+ final Set<Long> ledgers = Collections
+ .newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
+
+ LedgerManager manager = getLedgerManager(ledgers);
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+
+ @Override
+ public Checkpoint newCheckpoint() {
+ return null;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint,
+ boolean compact) throws IOException
{
+ }
+ };
+ InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
+ storage.initialize(conf, manager, dirs, dirs, null, checkpointSource,
+ Checkpointer.NULL, NullStatsLogger.INSTANCE);
+
+ for (long ledger = 0; ledger <= 10; ledger++) {
+ ledgers.add(ledger);
+ for (int entry = 1; entry <= 50; entry++) {
+ try {
+ storage.addEntry(genEntry(ledger, entry, ENTRY_SIZE));
+ } catch (IOException e) {
+ //ignore exception on failure to add entry.
+ }
+ }
+ }
+
+ storage.flush();
+ storage.shutdown();
+
+ storage = new InterleavedLedgerStorage();
+ storage.initialize(conf, manager, dirs, dirs, null, checkpointSource,
+ Checkpointer.NULL, NullStatsLogger.INSTANCE);
+
+ long startingEntriesCount =
storage.gcThread.entryLogger.getLeastUnflushedLogId()
+ - storage.gcThread.scannedLogId;
+ LOG.info("The old Log Entry count is: " + startingEntriesCount);
+
+ Map<Long, EntryLogMetadata> entryLogMetaData = new HashMap<>();
+ long finalEntriesCount =
storage.gcThread.entryLogger.getLeastUnflushedLogId()
+ - storage.gcThread.scannedLogId;
+ LOG.info("The latest Log Entry count is: " + finalEntriesCount);
+
+ assertTrue("The GC did not clean up entries...", startingEntriesCount
!= finalEntriesCount);
+ assertTrue("Entries Count is zero", finalEntriesCount == 0);
+ }
+
private ByteBuf genEntry(long ledger, long entry, int size) {
ByteBuf bb = Unpooled.buffer(size);
bb.writeLong(ledger);
--
To stop receiving notification emails like this one, please contact
[email protected].