This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a58f11ff13 Change masterKeyCache to dynamic size (#3522)
a58f11ff13 is described below

commit a58f11ff13b7690fbb1e7da92be4b65cded5d4bf
Author: StevenLuMT <[email protected]>
AuthorDate: Tue Oct 25 16:20:30 2022 +0800

    Change masterKeyCache to dynamic size (#3522)
---
 .../org/apache/bookkeeper/bookie/BookieImpl.java   | 12 ++++++--
 .../bookkeeper/bookie/GarbageCollectorThread.java  |  7 +++++
 .../bookie/InterleavedLedgerStorage.java           |  5 ++++
 .../apache/bookkeeper/bookie/LedgerStorage.java    |  1 +
 .../bookie/LedgerStorageNotificationListener.java  | 35 ++++++++++++++++++++++
 .../bookkeeper/bookie/SortedLedgerStorage.java     |  5 ++++
 .../bookie/storage/ldb/DbLedgerStorage.java        |  6 ++++
 .../ldb/SingleDirectoryDbLedgerStorage.java        |  6 ++++
 .../bookkeeper/bookie/MockLedgerStorage.java       |  5 ++++
 .../apache/bookkeeper/bookie/SyncThreadTest.java   |  5 ++++
 .../org/apache/bookkeeper/meta/GcLedgersTest.java  |  6 ++++
 .../bookkeeper/meta/LedgerManagerTestCase.java     |  6 ++++
 .../collections/ConcurrentLongHashMapTest.java     | 35 ++++++++++++++++++++++
 13 files changed, 132 insertions(+), 2 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index df11852235..535f742126 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -113,7 +113,7 @@ public class BookieImpl extends BookieCriticalThread 
implements Bookie {
     private int exitCode = ExitCode.OK;
 
     private final ConcurrentLongHashMap<byte[]> masterKeyCache =
-            ConcurrentLongHashMap.<byte[]>newBuilder().build();
+            
ConcurrentLongHashMap.<byte[]>newBuilder().autoShrink(true).build();
 
     protected StateManager stateManager;
 
@@ -375,6 +375,7 @@ public class BookieImpl extends BookieCriticalThread 
implements Bookie {
                 }
             });
         ledgerStorage.setCheckpointer(Checkpointer.NULL);
+        
ledgerStorage.setStorageStorageNotificationListener(LedgerStorageNotificationListener.NULL);
         return ledgerStorage;
     }
 
@@ -477,10 +478,17 @@ public class BookieImpl extends BookieCriticalThread 
implements Bookie {
             syncThread = new SyncThread(conf, getLedgerDirsListener(), 
ledgerStorage, checkpointSource, statsLogger);
         }
 
+        LedgerStorageNotificationListener storageNotificationListener = new 
LedgerStorageNotificationListener() {
+            @Override
+            public void ledgerRemovedFromStorage(long ledgerId) {
+                masterKeyCache.remove(ledgerId);
+            }
+        };
+
         ledgerStorage.setStateManager(stateManager);
         ledgerStorage.setCheckpointSource(checkpointSource);
         ledgerStorage.setCheckpointer(syncThread);
-
+        
ledgerStorage.setStorageStorageNotificationListener(storageNotificationListener);
         handles = new HandleFactoryImpl(ledgerStorage);
 
         // Expose Stats
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index e7549aaddb..2e153f7ab7 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -124,6 +124,8 @@ public class GarbageCollectorThread extends SafeRunnable {
 
     private static final AtomicLong threadNum = new AtomicLong(0);
     final AbstractLogCompactor.Throttler throttler;
+
+    private LedgerStorageNotificationListener storageNotificationListener = 
LedgerStorageNotificationListener.NULL;
     /**
      * Create a garbage collector thread.
      *
@@ -181,6 +183,7 @@ public class GarbageCollectorThread extends SafeRunnable {
                 }
                 gcStats.getDeletedLedgerCounter().inc();
                 ledgerStorage.deleteLedger(ledgerId);
+                storageNotificationListener.ledgerRemovedFromStorage(ledgerId);
             } catch (IOException e) {
                 LOG.error("Exception when deleting the ledger index file on 
the Bookie: ", e);
             }
@@ -771,4 +774,8 @@ public class GarbageCollectorThread extends SafeRunnable {
             .minorCompactionCounter(gcStats.getMinorCompactionCounter().get())
             .build();
     }
+
+    public void 
setStorageStorageNotificationListener(LedgerStorageNotificationListener 
storageNotificationListener) {
+        this.storageNotificationListener = storageNotificationListener;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 6954f65d0a..7597acc0a6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -173,6 +173,11 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
         this.checkpointer = checkpointer;
     }
 
+    @Override
+    public void 
setStorageStorageNotificationListener(LedgerStorageNotificationListener 
storageNotificationListener) {
+        
this.gcThread.setStorageStorageNotificationListener(storageNotificationListener);
+    }
+
     public void initializeWithEntryLogger(ServerConfiguration conf,
                 LedgerManager ledgerManager,
                 LedgerDirsManager ledgerDirsManager,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 372cd926c0..c409112eef 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -60,6 +60,7 @@ public interface LedgerStorage {
     void setStateManager(StateManager stateManager);
     void setCheckpointSource(CheckpointSource checkpointSource);
     void setCheckpointer(Checkpointer checkpointer);
+    void 
setStorageStorageNotificationListener(LedgerStorageNotificationListener 
ledgerStorageNotificationListener);
 
     /**
      * Start any background threads belonging to the storage system. For 
example, garbage collection.
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageNotificationListener.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageNotificationListener.java
new file mode 100644
index 0000000000..af2729dd16
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageNotificationListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+/**
+ * The instance that is responsible for checkpointing ledger storage.
+ */
+public interface LedgerStorageNotificationListener {
+
+    LedgerStorageNotificationListener NULL = new 
LedgerStorageNotificationListener(){
+        @Override
+        public void ledgerRemovedFromStorage(long ledgerId) {
+            // no-op
+        }
+    };
+
+    void ledgerRemovedFromStorage(long ledgerId);
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index a7080ba4f3..b9a4b7cfd8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -118,6 +118,11 @@ public class SortedLedgerStorage
         interleavedLedgerStorage.setCheckpointer(checkpointer);
     }
 
+    @Override
+    public void 
setStorageStorageNotificationListener(LedgerStorageNotificationListener 
storageNotificationListener) {
+        
this.interleavedLedgerStorage.setStorageStorageNotificationListener(storageNotificationListener);
+    }
+
     @VisibleForTesting
     ScheduledExecutorService getScheduler() {
         return scheduler;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 24a52dc81a..d1404e0179 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -51,6 +51,7 @@ import 
org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerCache;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorageNotificationListener;
 import org.apache.bookkeeper.bookie.StateManager;
 import org.apache.bookkeeper.bookie.storage.EntryLogIdsImpl;
 import org.apache.bookkeeper.bookie.storage.EntryLogger;
@@ -299,6 +300,11 @@ public class DbLedgerStorage implements LedgerStorage {
         ledgerStorageList.forEach(s -> s.setCheckpointer(checkpointer));
     }
 
+    @Override
+    public void 
setStorageStorageNotificationListener(LedgerStorageNotificationListener 
storageNotificationListener) {
+        ledgerStorageList.forEach(s -> 
s.setStorageStorageNotificationListener(storageNotificationListener));
+    }
+
     @Override
     public void start() {
         ledgerStorageList.forEach(LedgerStorage::start);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index d330ea0366..d15eaf10c5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -63,6 +63,7 @@ import org.apache.bookkeeper.bookie.LedgerCache;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerEntryPage;
+import org.apache.bookkeeper.bookie.LedgerStorageNotificationListener;
 import org.apache.bookkeeper.bookie.StateManager;
 import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import 
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
@@ -238,6 +239,11 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
     @Override
     public void setCheckpointer(Checkpointer checkpointer) { }
 
+    @Override
+    public void 
setStorageStorageNotificationListener(LedgerStorageNotificationListener 
storageNotificationListener) {
+        
this.gcThread.setStorageStorageNotificationListener(storageNotificationListener);
+    }
+
     /**
      * Evict all the ledger info object that were not used recently.
      */
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java
index 5b49ca7706..f5b94e422c 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java
@@ -75,6 +75,11 @@ public class MockLedgerStorage implements 
CompactableLedgerStorage {
     @Override
     public void setCheckpointer(Checkpointer checkpointer) {}
 
+    @Override
+    public void 
setStorageStorageNotificationListener(LedgerStorageNotificationListener 
storageNotificationListener) {
+
+    }
+
     @Override
     public void start() {}
     @Override
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SyncThreadTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SyncThreadTest.java
index d44447e620..82bb762be5 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SyncThreadTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SyncThreadTest.java
@@ -285,6 +285,11 @@ public class SyncThreadTest {
         @Override
         public void setCheckpointer(Checkpointer checkpointer) {}
 
+        @Override
+        public void 
setStorageStorageNotificationListener(LedgerStorageNotificationListener 
storageListener) {
+
+        }
+
         @Override
         public void deleteLedger(long ledgerId) throws IOException {
         }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index fec74a8202..2f48a87609 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -61,6 +61,7 @@ import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.GarbageCollector;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.LedgerStorageNotificationListener;
 import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
 import org.apache.bookkeeper.bookie.StateManager;
 import org.apache.bookkeeper.client.BKException;
@@ -589,6 +590,11 @@ public class GcLedgersTest extends LedgerManagerTestCase {
         @Override
         public void setCheckpointer(Checkpointer checkpointer) {}
 
+        @Override
+        public void 
setStorageStorageNotificationListener(LedgerStorageNotificationListener 
storageListener) {
+
+        }
+
         @Override
         public void start() {
         }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 13272ab51c..799fe4badb 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
 import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.LedgerStorageNotificationListener;
 import org.apache.bookkeeper.bookie.StateManager;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.common.util.Watcher;
@@ -183,6 +184,11 @@ public abstract class LedgerManagerTestCase extends 
BookKeeperClusterTestCase {
         @Override
         public void setCheckpointer(Checkpointer checkpointer) {}
 
+        @Override
+        public void 
setStorageStorageNotificationListener(LedgerStorageNotificationListener 
storageListener) {
+
+        }
+
         @Override
         public void start() {
         }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
index e4caa4f92e..f1372b2894 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -20,8 +20,10 @@
  */
 package org.apache.bookkeeper.util.collections;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -117,6 +119,39 @@ public class ConcurrentLongHashMapTest {
         assertTrue(map.capacity() == 4);
     }
 
+    private void addSpecifyIncrement(ConcurrentLongHashMap<byte[]> mkc, int 
start, int end) {
+        for (int i = start; i <= end; i++) {
+            assertNull(mkc.put(i, ("comment:" + i).getBytes(UTF_8)));
+        }
+    }
+
+    private void removeSpecifyIncrement(ConcurrentLongHashMap<byte[]> mkc, int 
start, int end) {
+        for (int i = end; i >= start; i--) {
+            assertNotNull(mkc.remove(i));
+        }
+    }
+
+    @Test
+    public void testAutoShrinkWithByte() {
+        final int defaultExpectedItems = 256;
+        final int defaultConcurrencyLevel = 16;
+        final float defaultExpandFactor = 2;
+        final float defaultShrinkFactor = 2;
+
+        ConcurrentLongHashMap<byte[]> mkc = 
ConcurrentLongHashMap.<byte[]>newBuilder().autoShrink(true).build();
+        assertTrue(mkc.capacity() == defaultExpectedItems * 2);
+
+        addSpecifyIncrement(mkc, 1, defaultExpectedItems * 2);
+        // expand hashmap
+        assertTrue(mkc.capacity() == defaultExpectedItems * 2
+                + defaultConcurrencyLevel * defaultExpandFactor * 15);
+
+        removeSpecifyIncrement(mkc, 220, defaultExpectedItems * 2);
+        // shrink hashmap
+        assertTrue(mkc.capacity() == defaultExpectedItems * 2
+                + defaultConcurrencyLevel * defaultExpandFactor * 15 - 
defaultConcurrencyLevel * defaultShrinkFactor);
+    }
+
     @Test
     public void testExpandAndShrink() {
         ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()

Reply via email to