This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a58f11ff13 Change masterKeyCache to dynamic size (#3522)
a58f11ff13 is described below
commit a58f11ff13b7690fbb1e7da92be4b65cded5d4bf
Author: StevenLuMT <[email protected]>
AuthorDate: Tue Oct 25 16:20:30 2022 +0800
Change masterKeyCache to dynamic size (#3522)
---
.../org/apache/bookkeeper/bookie/BookieImpl.java | 12 ++++++--
.../bookkeeper/bookie/GarbageCollectorThread.java | 7 +++++
.../bookie/InterleavedLedgerStorage.java | 5 ++++
.../apache/bookkeeper/bookie/LedgerStorage.java | 1 +
.../bookie/LedgerStorageNotificationListener.java | 35 ++++++++++++++++++++++
.../bookkeeper/bookie/SortedLedgerStorage.java | 5 ++++
.../bookie/storage/ldb/DbLedgerStorage.java | 6 ++++
.../ldb/SingleDirectoryDbLedgerStorage.java | 6 ++++
.../bookkeeper/bookie/MockLedgerStorage.java | 5 ++++
.../apache/bookkeeper/bookie/SyncThreadTest.java | 5 ++++
.../org/apache/bookkeeper/meta/GcLedgersTest.java | 6 ++++
.../bookkeeper/meta/LedgerManagerTestCase.java | 6 ++++
.../collections/ConcurrentLongHashMapTest.java | 35 ++++++++++++++++++++++
13 files changed, 132 insertions(+), 2 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index df11852235..535f742126 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -113,7 +113,7 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
private int exitCode = ExitCode.OK;
private final ConcurrentLongHashMap<byte[]> masterKeyCache =
- ConcurrentLongHashMap.<byte[]>newBuilder().build();
+
ConcurrentLongHashMap.<byte[]>newBuilder().autoShrink(true).build();
protected StateManager stateManager;
@@ -375,6 +375,7 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
}
});
ledgerStorage.setCheckpointer(Checkpointer.NULL);
+
ledgerStorage.setStorageStorageNotificationListener(LedgerStorageNotificationListener.NULL);
return ledgerStorage;
}
@@ -477,10 +478,17 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
syncThread = new SyncThread(conf, getLedgerDirsListener(),
ledgerStorage, checkpointSource, statsLogger);
}
+ LedgerStorageNotificationListener storageNotificationListener = new
LedgerStorageNotificationListener() {
+ @Override
+ public void ledgerRemovedFromStorage(long ledgerId) {
+ masterKeyCache.remove(ledgerId);
+ }
+ };
+
ledgerStorage.setStateManager(stateManager);
ledgerStorage.setCheckpointSource(checkpointSource);
ledgerStorage.setCheckpointer(syncThread);
-
+
ledgerStorage.setStorageStorageNotificationListener(storageNotificationListener);
handles = new HandleFactoryImpl(ledgerStorage);
// Expose Stats
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index e7549aaddb..2e153f7ab7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -124,6 +124,8 @@ public class GarbageCollectorThread extends SafeRunnable {
private static final AtomicLong threadNum = new AtomicLong(0);
final AbstractLogCompactor.Throttler throttler;
+
+ private LedgerStorageNotificationListener storageNotificationListener =
LedgerStorageNotificationListener.NULL;
/**
* Create a garbage collector thread.
*
@@ -181,6 +183,7 @@ public class GarbageCollectorThread extends SafeRunnable {
}
gcStats.getDeletedLedgerCounter().inc();
ledgerStorage.deleteLedger(ledgerId);
+ storageNotificationListener.ledgerRemovedFromStorage(ledgerId);
} catch (IOException e) {
LOG.error("Exception when deleting the ledger index file on
the Bookie: ", e);
}
@@ -771,4 +774,8 @@ public class GarbageCollectorThread extends SafeRunnable {
.minorCompactionCounter(gcStats.getMinorCompactionCounter().get())
.build();
}
+
+ public void
setStorageStorageNotificationListener(LedgerStorageNotificationListener
storageNotificationListener) {
+ this.storageNotificationListener = storageNotificationListener;
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 6954f65d0a..7597acc0a6 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -173,6 +173,11 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
this.checkpointer = checkpointer;
}
+ @Override
+ public void
setStorageStorageNotificationListener(LedgerStorageNotificationListener
storageNotificationListener) {
+
this.gcThread.setStorageStorageNotificationListener(storageNotificationListener);
+ }
+
public void initializeWithEntryLogger(ServerConfiguration conf,
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 372cd926c0..c409112eef 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -60,6 +60,7 @@ public interface LedgerStorage {
void setStateManager(StateManager stateManager);
void setCheckpointSource(CheckpointSource checkpointSource);
void setCheckpointer(Checkpointer checkpointer);
+ void
setStorageStorageNotificationListener(LedgerStorageNotificationListener
ledgerStorageNotificationListener);
/**
* Start any background threads belonging to the storage system. For
example, garbage collection.
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageNotificationListener.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageNotificationListener.java
new file mode 100644
index 0000000000..af2729dd16
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageNotificationListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+/**
+ * The instance that is responsible for checkpointing ledger storage.
+ */
+public interface LedgerStorageNotificationListener {
+
+ LedgerStorageNotificationListener NULL = new
LedgerStorageNotificationListener(){
+ @Override
+ public void ledgerRemovedFromStorage(long ledgerId) {
+ // no-op
+ }
+ };
+
+ void ledgerRemovedFromStorage(long ledgerId);
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index a7080ba4f3..b9a4b7cfd8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -118,6 +118,11 @@ public class SortedLedgerStorage
interleavedLedgerStorage.setCheckpointer(checkpointer);
}
+ @Override
+ public void
setStorageStorageNotificationListener(LedgerStorageNotificationListener
storageNotificationListener) {
+
this.interleavedLedgerStorage.setStorageStorageNotificationListener(storageNotificationListener);
+ }
+
@VisibleForTesting
ScheduledExecutorService getScheduler() {
return scheduler;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 24a52dc81a..d1404e0179 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -51,6 +51,7 @@ import
org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerCache;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorageNotificationListener;
import org.apache.bookkeeper.bookie.StateManager;
import org.apache.bookkeeper.bookie.storage.EntryLogIdsImpl;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
@@ -299,6 +300,11 @@ public class DbLedgerStorage implements LedgerStorage {
ledgerStorageList.forEach(s -> s.setCheckpointer(checkpointer));
}
+ @Override
+ public void
setStorageStorageNotificationListener(LedgerStorageNotificationListener
storageNotificationListener) {
+ ledgerStorageList.forEach(s ->
s.setStorageStorageNotificationListener(storageNotificationListener));
+ }
+
@Override
public void start() {
ledgerStorageList.forEach(LedgerStorage::start);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index d330ea0366..d15eaf10c5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -63,6 +63,7 @@ import org.apache.bookkeeper.bookie.LedgerCache;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerEntryPage;
+import org.apache.bookkeeper.bookie.LedgerStorageNotificationListener;
import org.apache.bookkeeper.bookie.StateManager;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
import
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
@@ -238,6 +239,11 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
@Override
public void setCheckpointer(Checkpointer checkpointer) { }
+ @Override
+ public void
setStorageStorageNotificationListener(LedgerStorageNotificationListener
storageNotificationListener) {
+
this.gcThread.setStorageStorageNotificationListener(storageNotificationListener);
+ }
+
/**
* Evict all the ledger info object that were not used recently.
*/
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java
index 5b49ca7706..f5b94e422c 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java
@@ -75,6 +75,11 @@ public class MockLedgerStorage implements
CompactableLedgerStorage {
@Override
public void setCheckpointer(Checkpointer checkpointer) {}
+ @Override
+ public void
setStorageStorageNotificationListener(LedgerStorageNotificationListener
storageNotificationListener) {
+
+ }
+
@Override
public void start() {}
@Override
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SyncThreadTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SyncThreadTest.java
index d44447e620..82bb762be5 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SyncThreadTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SyncThreadTest.java
@@ -285,6 +285,11 @@ public class SyncThreadTest {
@Override
public void setCheckpointer(Checkpointer checkpointer) {}
+ @Override
+ public void
setStorageStorageNotificationListener(LedgerStorageNotificationListener
storageListener) {
+
+ }
+
@Override
public void deleteLedger(long ledgerId) throws IOException {
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index fec74a8202..2f48a87609 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -61,6 +61,7 @@ import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.GarbageCollector;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.LedgerStorageNotificationListener;
import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
import org.apache.bookkeeper.bookie.StateManager;
import org.apache.bookkeeper.client.BKException;
@@ -589,6 +590,11 @@ public class GcLedgersTest extends LedgerManagerTestCase {
@Override
public void setCheckpointer(Checkpointer checkpointer) {}
+ @Override
+ public void
setStorageStorageNotificationListener(LedgerStorageNotificationListener
storageListener) {
+
+ }
+
@Override
public void start() {
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 13272ab51c..799fe4badb 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.LedgerStorageNotificationListener;
import org.apache.bookkeeper.bookie.StateManager;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.Watcher;
@@ -183,6 +184,11 @@ public abstract class LedgerManagerTestCase extends
BookKeeperClusterTestCase {
@Override
public void setCheckpointer(Checkpointer checkpointer) {}
+ @Override
+ public void
setStorageStorageNotificationListener(LedgerStorageNotificationListener
storageListener) {
+
+ }
+
@Override
public void start() {
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
index e4caa4f92e..f1372b2894 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -20,8 +20,10 @@
*/
package org.apache.bookkeeper.util.collections;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -117,6 +119,39 @@ public class ConcurrentLongHashMapTest {
assertTrue(map.capacity() == 4);
}
+ private void addSpecifyIncrement(ConcurrentLongHashMap<byte[]> mkc, int
start, int end) {
+ for (int i = start; i <= end; i++) {
+ assertNull(mkc.put(i, ("comment:" + i).getBytes(UTF_8)));
+ }
+ }
+
+ private void removeSpecifyIncrement(ConcurrentLongHashMap<byte[]> mkc, int
start, int end) {
+ for (int i = end; i >= start; i--) {
+ assertNotNull(mkc.remove(i));
+ }
+ }
+
+ @Test
+ public void testAutoShrinkWithByte() {
+ final int defaultExpectedItems = 256;
+ final int defaultConcurrencyLevel = 16;
+ final float defaultExpandFactor = 2;
+ final float defaultShrinkFactor = 2;
+
+ ConcurrentLongHashMap<byte[]> mkc =
ConcurrentLongHashMap.<byte[]>newBuilder().autoShrink(true).build();
+ assertTrue(mkc.capacity() == defaultExpectedItems * 2);
+
+ addSpecifyIncrement(mkc, 1, defaultExpectedItems * 2);
+ // expand hashmap
+ assertTrue(mkc.capacity() == defaultExpectedItems * 2
+ + defaultConcurrencyLevel * defaultExpandFactor * 15);
+
+ removeSpecifyIncrement(mkc, 220, defaultExpectedItems * 2);
+ // shrink hashmap
+ assertTrue(mkc.capacity() == defaultExpectedItems * 2
+ + defaultConcurrencyLevel * defaultExpandFactor * 15 -
defaultConcurrencyLevel * defaultShrinkFactor);
+ }
+
@Test
public void testExpandAndShrink() {
ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()