This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 4caf3ef9f34 Support dynamic update cache config (#13679)
4caf3ef9f34 is described below
commit 4caf3ef9f34e58aee0e955a03a63cc6282f257b5
Author: LinChen <[email protected]>
AuthorDate: Tue Mar 15 00:59:53 2022 +0800
Support dynamic update cache config (#13679)
(cherry picked from commit b0213b225f194b47a5dcd63ef4a26f55c4c820b6)
---
.../bookkeeper/mledger/ManagedLedgerFactory.java | 18 ++++++++++++++++++
.../bookkeeper/mledger/impl/EntryCacheManager.java | 19 ++++++++++++++++---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 13 ++++++++++++-
.../apache/pulsar/broker/ServiceConfiguration.java | 3 +++
.../pulsar/broker/service/BrokerService.java | 22 ++++++++++++++++++++++
.../apache/pulsar/broker/admin/AdminApiTest.java | 22 ++++++++++++++++++++++
6 files changed, 93 insertions(+), 4 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index 682ce9008f1..a83994d1cd3 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -26,6 +26,7 @@ import
org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
+import org.apache.bookkeeper.mledger.impl.EntryCacheManager;
/**
* A factory to open/create managed ledgers and delete them.
@@ -179,4 +180,21 @@ public interface ManagedLedgerFactory {
*/
CompletableFuture<Boolean> asyncExists(String ledgerName);
+ /**
+ * @return return EntryCacheManager.
+ */
+ EntryCacheManager getEntryCacheManager();
+
+ /**
+ * update cache evictionTimeThreshold.
+ *
+ * @param cacheEvictionTimeThresholdNanos time threshold for eviction.
+ */
+ void updateCacheEvictionTimeThreshold(long
cacheEvictionTimeThresholdNanos);
+
+ /**
+ * @return time threshold for eviction.
+ * */
+ long getCacheEvictionTimeThreshold();
+
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index 9a1d6317e4d..0165fc5b197 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -46,9 +46,9 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("checkstyle:javadoctype")
public class EntryCacheManager {
- private final long maxSize;
- private final long evictionTriggerThreshold;
- private final double cacheEvictionWatermark;
+ private volatile long maxSize;
+ private volatile long evictionTriggerThreshold;
+ private volatile double cacheEvictionWatermark;
private final AtomicLong currentSize = new AtomicLong(0);
private final ConcurrentMap<String, EntryCache> caches =
Maps.newConcurrentMap();
private final EntryCacheEvictionPolicy evictionPolicy;
@@ -89,6 +89,15 @@ public class EntryCacheManager {
}
}
+ public void updateCacheSizeAndThreshold(long maxSize) {
+ this.maxSize = maxSize;
+ this.evictionTriggerThreshold = (long) (maxSize *
evictionTriggerThresholdPercent);
+ }
+
+ public void updateCacheEvictionWatermark(double cacheEvictionWatermark) {
+ this.cacheEvictionWatermark = cacheEvictionWatermark;
+ }
+
void removeEntryCache(String name) {
EntryCache entryCache = caches.remove(name);
if (entryCache == null) {
@@ -150,6 +159,10 @@ public class EntryCacheManager {
return maxSize;
}
+ public double getCacheEvictionWatermark() {
+ return cacheEvictionWatermark;
+ }
+
public void clear() {
caches.values().forEach(EntryCache::clear);
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index a66545fd3e5..c3e391ffab2 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -104,7 +104,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
private final ScheduledFuture<?> statsTask;
private final ScheduledFuture<?> flushCursorsTask;
- private final long cacheEvictionTimeThresholdNanos;
+ private volatile long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;
//indicate whether shutdown() is called.
@@ -943,10 +943,21 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
return config;
}
+ @Override
public EntryCacheManager getEntryCacheManager() {
return entryCacheManager;
}
+ @Override
+ public void updateCacheEvictionTimeThreshold(long
cacheEvictionTimeThresholdNanos){
+ this.cacheEvictionTimeThresholdNanos = cacheEvictionTimeThresholdNanos;
+ }
+
+ @Override
+ public long getCacheEvictionTimeThreshold(){
+ return cacheEvictionTimeThresholdNanos;
+ }
+
public ManagedLedgerFactoryMXBean getCacheStats() {
return this.mbean;
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f3fe8e86441..ee7ed9e5d7c 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1545,6 +1545,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private int managedLedgerMaxAckQuorum = 5;
@FieldContext(
category = CATEGORY_STORAGE_ML,
+ dynamic = true,
doc = "Amount of memory to use for caching data payload in managed
ledger. \n\nThis"
+ " memory is allocated from JVM direct memory and it's shared
across all the topics"
+ " running in the same broker. By default, uses 1/5th of
available direct memory")
@@ -1554,6 +1555,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private boolean managedLedgerCacheCopyEntries = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
+ dynamic = true,
doc = "Threshold to which bring down the cache level when eviction is
triggered"
)
private double managedLedgerCacheEvictionWatermark = 0.9f;
@@ -1561,6 +1563,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "Configure the cache eviction frequency for the managed
ledger cache. Default is 100/s")
private double managedLedgerCacheEvictionFrequency = 100.0;
@FieldContext(category = CATEGORY_STORAGE_ML,
+ dynamic = true,
doc = "All entries that have stayed in cache for more than the
configured time, will be evicted")
private long managedLedgerCacheEvictionTimeThresholdMillis = 1000;
@FieldContext(category = CATEGORY_STORAGE_ML,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 110eccbf0ab..39eeed7ae02 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2151,6 +2151,28 @@ public class BrokerService implements Closeable {
}
});
});
+
+ // add listener to notify broker managedLedgerCacheSizeMB dynamic
config
+ registerConfigurationListener("managedLedgerCacheSizeMB",
(managedLedgerCacheSizeMB) -> {
+ managedLedgerFactory.getEntryCacheManager()
+ .updateCacheSizeAndThreshold(((int)
managedLedgerCacheSizeMB) * 1024L * 1024L);
+ });
+
+ // add listener to notify broker managedLedgerCacheEvictionWatermark
dynamic config
+ registerConfigurationListener(
+ "managedLedgerCacheEvictionWatermark",
(cacheEvictionWatermark) -> {
+ managedLedgerFactory.getEntryCacheManager()
+ .updateCacheEvictionWatermark((double)
cacheEvictionWatermark);
+ });
+
+ // add listener to notify broker
managedLedgerCacheEvictionTimeThresholdMillis dynamic config
+ registerConfigurationListener(
+ "managedLedgerCacheEvictionTimeThresholdMillis",
(cacheEvictionTimeThresholdMills) -> {
+
managedLedgerFactory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+ .toNanos((long) cacheEvictionTimeThresholdMills));
+ });
+
+
// add listener to update message-dispatch-rate in msg for topic
registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg",
(dispatchRatePerTopicInMsg) -> {
updateTopicMessageDispatchRate();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index f768bf60d9d..7ea016dffae 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -495,6 +495,28 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
}
+ @Test
+ public void testUpdateDynamicCacheConfigurationWithZkWatch() throws
Exception {
+ // update configuration
+ admin.brokers().updateDynamicConfiguration("managedLedgerCacheSizeMB",
"1");
+
admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionWatermark",
"0.8");
+
admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionTimeThresholdMillis",
"2000");
+
+ // wait config to be updated
+ Awaitility.await().until(() -> {
+ return
pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize() == 1 *
1024L * 1024L
+ &&
pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark()
== 0.8
+ &&
pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold() ==
TimeUnit.MILLISECONDS
+ .toNanos(2000);
+ });
+
+ // verify value is updated
+
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(),
1 * 1024L * 1024L);
+
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(),
0.8);
+
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(),
TimeUnit.MILLISECONDS
+ .toNanos(2000));
+ }
+
/**
* <pre>
* Verifies: zk-update configuration updates service-config