This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new f12f3e6f CASSSIDECAR-347: Improve FilteringMetricRegistry
implementation (#261)
f12f3e6f is described below
commit f12f3e6fd99b0920e9c3b23a4e51ac5f75fee0b8
Author: Francisco Guerrero <[email protected]>
AuthorDate: Fri Sep 26 18:42:18 2025 -0700
CASSSIDECAR-347: Improve FilteringMetricRegistry implementation (#261)
Patch by Francisco Guerrero; reviewed by Yifan Cai for CASSSIDECAR-347
---
CHANGES.txt | 1 +
.../sidecar/metrics/FilteringMetricRegistry.java | 124 ++++++++++++--
.../sidecar/tasks/PeriodicTaskExecutor.java | 4 +-
.../metrics/FilteringMetricRegistryTest.java | 181 +++++++++++++++++++++
4 files changed, 293 insertions(+), 17 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index f57d67b4..0fb5a932 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.3.0
-----
+ * Improve FilteringMetricRegistry implementation (CASSSIDECAR-347)
* Add lifecycle APIs for starting and stopping Cassandra (CASSSIDECAR-266)
* Implementation of CassandraClusterSchemaMonitor (CASSSIDECAR-245)
* Sidecar endpoint for vending statistics related to compaction
(CASSSIDECAR-329)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/FilteringMetricRegistry.java
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/FilteringMetricRegistry.java
index ad08b58b..ddebb085 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/FilteringMetricRegistry.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/FilteringMetricRegistry.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
import java.util.function.Predicate;
import com.codahale.metrics.Counter;
@@ -41,6 +42,7 @@ public class FilteringMetricRegistry extends MetricRegistry
private static final NoopMetricRegistry NO_OP_METRIC_REGISTRY = new
NoopMetricRegistry(); // supplies no-op metrics
private final Predicate<String> isAllowed;
private final Map<String, Metric> excludedMetrics = new
ConcurrentHashMap<>();
+ private volatile Map<String, Metric> allMetrics;
public FilteringMetricRegistry(Predicate<String> isAllowedPredicate)
{
@@ -54,7 +56,7 @@ public class FilteringMetricRegistry extends MetricRegistry
{
return super.counter(name);
}
- return typeChecked(excludedMetrics.computeIfAbsent(name,
NO_OP_METRIC_REGISTRY::counter), Counter.class);
+ return typeChecked(addExcludedMetricIfNotExists(name,
NO_OP_METRIC_REGISTRY::counter), Counter.class);
}
@Override
@@ -64,7 +66,7 @@ public class FilteringMetricRegistry extends MetricRegistry
{
return super.counter(name, supplier);
}
- return typeChecked(excludedMetrics.computeIfAbsent(name,
NO_OP_METRIC_REGISTRY::counter), Counter.class);
+ return typeChecked(addExcludedMetricIfNotExists(name,
NO_OP_METRIC_REGISTRY::counter), Counter.class);
}
@Override
@@ -74,7 +76,7 @@ public class FilteringMetricRegistry extends MetricRegistry
{
return super.histogram(name);
}
- return typeChecked(excludedMetrics.computeIfAbsent(name,
NO_OP_METRIC_REGISTRY::histogram), Histogram.class);
+ return typeChecked(addExcludedMetricIfNotExists(name,
NO_OP_METRIC_REGISTRY::histogram), Histogram.class);
}
@Override
@@ -84,7 +86,7 @@ public class FilteringMetricRegistry extends MetricRegistry
{
return super.histogram(name, supplier);
}
- return typeChecked(excludedMetrics.computeIfAbsent(name,
NO_OP_METRIC_REGISTRY::histogram), Histogram.class);
+ return typeChecked(addExcludedMetricIfNotExists(name,
NO_OP_METRIC_REGISTRY::histogram), Histogram.class);
}
@Override
@@ -94,7 +96,7 @@ public class FilteringMetricRegistry extends MetricRegistry
{
return super.meter(name);
}
- return typeChecked(excludedMetrics.computeIfAbsent(name,
NO_OP_METRIC_REGISTRY::meter), Meter.class);
+ return typeChecked(addExcludedMetricIfNotExists(name,
NO_OP_METRIC_REGISTRY::meter), Meter.class);
}
@Override
@@ -104,7 +106,7 @@ public class FilteringMetricRegistry extends MetricRegistry
{
return super.meter(name, supplier);
}
- return typeChecked(excludedMetrics.computeIfAbsent(name,
NO_OP_METRIC_REGISTRY::meter), Meter.class);
+ return typeChecked(addExcludedMetricIfNotExists(name,
NO_OP_METRIC_REGISTRY::meter), Meter.class);
}
@Override
@@ -114,7 +116,7 @@ public class FilteringMetricRegistry extends MetricRegistry
{
return super.timer(name);
}
- return typeChecked(excludedMetrics.computeIfAbsent(name,
NO_OP_METRIC_REGISTRY::timer), Timer.class);
+ return typeChecked(addExcludedMetricIfNotExists(name,
NO_OP_METRIC_REGISTRY::timer), Timer.class);
}
@Override
@@ -124,7 +126,7 @@ public class FilteringMetricRegistry extends MetricRegistry
{
return super.timer(name, supplier);
}
- return typeChecked(excludedMetrics.computeIfAbsent(name,
NO_OP_METRIC_REGISTRY::timer), Timer.class);
+ return typeChecked(addExcludedMetricIfNotExists(name,
NO_OP_METRIC_REGISTRY::timer), Timer.class);
}
@Override
@@ -135,7 +137,7 @@ public class FilteringMetricRegistry extends MetricRegistry
{
return super.gauge(name);
}
- return (T) typeChecked(excludedMetrics.computeIfAbsent(name,
NO_OP_METRIC_REGISTRY::gauge), Gauge.class);
+ return (T) typeChecked(addExcludedMetricIfNotExists(name,
NO_OP_METRIC_REGISTRY::gauge), Gauge.class);
}
@Override
@@ -146,20 +148,42 @@ public class FilteringMetricRegistry extends
MetricRegistry
{
return super.gauge(name, supplier);
}
- return (T) typeChecked(excludedMetrics.computeIfAbsent(name, k ->
supplier.newMetric() /* unregistered metric */), Gauge.class);
+ return (T) typeChecked(addExcludedMetricIfNotExists(name, k ->
supplier.newMetric() /* unregistered metric */), Gauge.class);
}
/**
+ * The performance characteristics of this method depend on the frequency
of changes to the
+ * metrics. If modifications to the registry are infrequent, this
implementation will perform
+ * best with low garbage being created. If modifications to the registry
are frequent, more garbage
+ * will be created.
+ *
* @return all the metrics including the allowed and disallowed metrics.
This is to prevent re-registering of
* excluded metrics
*/
@Override
public Map<String, Metric> getMetrics()
{
- Map<String, Metric> allMetrics = new HashMap<>();
- allMetrics.putAll(super.getMetrics());
- allMetrics.putAll(excludedMetrics);
- return Collections.unmodifiableMap(allMetrics);
+ Map<String, Metric> existingAllMetrics = allMetrics;
+ if (existingAllMetrics != null)
+ {
+ return existingAllMetrics;
+ }
+
+ synchronized (this)
+ {
+ if (allMetrics == null)
+ {
+ existingAllMetrics = new HashMap<>();
+ existingAllMetrics.putAll(super.getMetrics());
+ existingAllMetrics.putAll(excludedMetrics);
+ allMetrics = existingAllMetrics =
Collections.unmodifiableMap(existingAllMetrics);
+ }
+ else
+ {
+ existingAllMetrics = allMetrics;
+ }
+ }
+ return existingAllMetrics;
}
/**
@@ -175,14 +199,59 @@ public class FilteringMetricRegistry extends
MetricRegistry
* Metric specific retrieve methods such as {@code counter(name)} retrieve
a noop instance if metric is filtered.
* Prefer calling those over register method, register method returns an
unregistered metric if the metric is
* filtered. In some cases Noop metric instance has a performance
advantage.
+ *
+ * @param name the name of the metric
+ * @param metric the metric
+ * @param <T> the type of the metric
+ * @return {@code metric}
+ * @throws IllegalArgumentException if the name is already registered or
metric variable is null
*/
+ @Override
public <T extends Metric> T register(String name, T metric) throws
IllegalArgumentException
+ {
+ T registeredMetric = registerInternal(name, metric);
+
+ // allMetrics needs to be recomputed every time a metric is registered
+ synchronized (this)
+ {
+ allMetrics = null;
+ }
+
+ return registeredMetric;
+ }
+
+ /**
+ * Removes the metric with the given name from the set of excluded
metrics, and if the metric
+ * does not exists it removes it from the underlying metrics.
+ *
+ * @param name the name of the metric
+ * @return whether or not the metric was removed
+ */
+ @Override
+ public boolean remove(String name)
+ {
+ boolean removeResult = true;
+ Metric removedMetric = excludedMetrics.remove(name);
+ if (removedMetric == null)
+ {
+ removeResult = super.remove(name);
+ }
+
+ // force allMetrics to be recomputed every time a metric is removed
+ synchronized (this)
+ {
+ allMetrics = null;
+ }
+
+ return removeResult;
+ }
+
+ private <T extends Metric> T registerInternal(String name, T metric)
{
if (metric == null)
{
throw new IllegalArgumentException("Metric can not be null");
}
-
// The metric is registered by calling the register() directly
// We need to test whether it is allowed first
if (isAllowed.test(name))
@@ -190,7 +259,7 @@ public class FilteringMetricRegistry extends MetricRegistry
return super.register(name, metric);
}
- return (T) typeChecked(excludedMetrics.computeIfAbsent(name, key ->
metric), metric.getClass());
+ return (T) typeChecked(addExcludedMetricIfNotExists(name, key ->
metric), metric.getClass());
}
private <T extends Metric> T typeChecked(Metric metric, Class<T> type)
@@ -202,6 +271,29 @@ public class FilteringMetricRegistry extends MetricRegistry
throw new IllegalArgumentException("Metric already present with type "
+ metric.getClass());
}
+ /**
+ * Adds excluded metric if it does not exist in the excluded list yet.
This method ensures
+ * that when adding a metric to the list of excluded metrics, a
recomputation is forced
+ * for the metrics returned by the {@link #getMetrics()} method.
+ *
+ * @param name the name of the metric
+ * @param mappingFunction the mapping function to compute the value of the
metric
+ * @return the current value of the excluded metric if it exists, or the
new computed
+ * value if it doesn't
+ */
+ private Metric addExcludedMetricIfNotExists(String name, Function<String,
? extends Metric> mappingFunction)
+ {
+ return excludedMetrics.computeIfAbsent(name, k -> {
+ // allMetrics needs to be recomputed when an excluded metric is
registered
+ synchronized (this)
+ {
+ allMetrics = null;
+ }
+
+ return mappingFunction.apply(k);
+ });
+ }
+
/**
* {@link CachedPredicate} remembers results of the {@link Predicate} it
maintains. This is to avoid
* redundant calls to delegate predicate.
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTaskExecutor.java
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTaskExecutor.java
index 5d950436..2807b5b8 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTaskExecutor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTaskExecutor.java
@@ -299,17 +299,19 @@ public class PeriodicTaskExecutor implements Closeable
{
private final String identifier;
private final PeriodicTask task;
+ private final int hashCode;
PeriodicTaskKey(PeriodicTask task)
{
this.identifier = task.identifier();
this.task = task;
+ this.hashCode = identifier.hashCode();
}
@Override
public int hashCode()
{
- return identifier.hashCode();
+ return hashCode;
}
@Override
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/metrics/FilteringMetricRegistryTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/metrics/FilteringMetricRegistryTest.java
index cf8126c4..d8585374 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/metrics/FilteringMetricRegistryTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/metrics/FilteringMetricRegistryTest.java
@@ -21,7 +21,13 @@ package org.apache.cassandra.sidecar.metrics;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.junit.jupiter.api.AfterEach;
@@ -30,6 +36,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import com.codahale.metrics.DefaultSettableGauge;
+import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.NoopMetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
@@ -243,4 +250,178 @@ class FilteringMetricRegistryTest
context.completeNow();
});
}
+
+ @Test
+ void testGetMetrics()
+ {
+ FilteringMetricRegistry registry = new FilteringMetricRegistry(s ->
s.endsWith("Include"));
+
+ registry.gauge("gaugeInclude", () -> new DefaultSettableGauge<>(0L));
+ assertThat(registry.getMetrics()).hasSize(1)
+ .containsKey("gaugeInclude");
+ assertThat(registry.getIncludedMetrics()).hasSize(1)
+ .containsKey("gaugeInclude");
+ registry.gauge("gaugeIgnore", () -> new DefaultSettableGauge<>(1L));
+ assertThat(registry.getMetrics()).hasSize(2)
+ .containsKey("gaugeIgnore");
+ assertThat(registry.getIncludedMetrics()).hasSize(1)
+ .containsKey("gaugeInclude");
+
+ registry.counter("counterInclude");
+ assertThat(registry.getMetrics()).hasSize(3)
+ .containsKey("counterInclude");
+ assertThat(registry.getIncludedMetrics()).hasSize(2)
+
.containsKey("counterInclude");
+ registry.counter("counterIgnore");
+ assertThat(registry.getMetrics()).hasSize(4)
+ .containsKey("counterIgnore");
+ assertThat(registry.getIncludedMetrics()).hasSize(2)
+
.containsKey("counterInclude");
+
+ registry.histogram("histogramInclude");
+ assertThat(registry.getMetrics()).hasSize(5)
+ .containsKey("histogramInclude");
+ assertThat(registry.getIncludedMetrics()).hasSize(3)
+
.containsKey("histogramInclude");
+ registry.histogram("histogramIgnore");
+ assertThat(registry.getMetrics()).hasSize(6)
+ .containsKey("histogramIgnore");
+ assertThat(registry.getIncludedMetrics()).hasSize(3)
+
.containsKey("histogramInclude");
+
+ registry.meter("meterInclude");
+ assertThat(registry.getMetrics()).hasSize(7)
+ .containsKey("meterInclude");
+ assertThat(registry.getIncludedMetrics()).hasSize(4)
+ .containsKey("meterInclude");
+ registry.meter("meterIgnore");
+ assertThat(registry.getMetrics()).hasSize(8)
+ .containsKey("meterIgnore");
+ assertThat(registry.getIncludedMetrics()).hasSize(4)
+ .containsKey("meterInclude");
+
+ registry.timer("timerInclude");
+ assertThat(registry.getMetrics()).hasSize(9)
+ .containsKey("timerInclude");
+ assertThat(registry.getIncludedMetrics()).hasSize(5)
+ .containsKey("timerInclude");
+ registry.timer("timerIgnore");
+ assertThat(registry.getMetrics()).hasSize(10)
+ .containsKey("timerIgnore");
+ assertThat(registry.getIncludedMetrics()).hasSize(5)
+ .containsKey("timerInclude");
+
+ registry.register("throughputInclude", new ThroughputMeter());
+ assertThat(registry.getMetrics()).hasSize(11)
+ .containsKey("throughputInclude");
+ assertThat(registry.getIncludedMetrics()).hasSize(6)
+
.containsKey("throughputInclude");
+ registry.register("throughputIgnore", new ThroughputMeter());
+ assertThat(registry.getMetrics()).hasSize(12)
+ .containsKey("throughputIgnore");
+ assertThat(registry.getIncludedMetrics()).hasSize(6)
+
.containsKey("throughputInclude");
+ }
+
+ @Test
+ void testGetMetricsWithConcurrentRegistration() throws InterruptedException
+ {
+ FilteringMetricRegistry registry = new FilteringMetricRegistry(s ->
s.endsWith("odd"));
+
+ // Let's get some concurrent updates to the registry
+ int nThreads = 100;
+ ExecutorService pool = Executors.newFixedThreadPool(nThreads);
+ CountDownLatch latch = new CountDownLatch(nThreads);
+ for (int i = 0; i < nThreads; i++)
+ {
+ int finalI = i;
+ pool.submit(() -> {
+ try
+ {
+ // Invoke register roughly at the same time
+ latch.countDown();
+ latch.await();
+
+ registry.register("testMetricThroughputMeter_" + finalI +
"_" + ((finalI % 2 == 0) ? "even" : "odd"), new ThroughputMeter());
+ assertThat(registry.getMetrics()).isNotEmpty();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ pool.shutdown();
+ assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+
+ // Let's make sure that all metrics are returned
+ Map<String, Metric> allMetrics = registry.getMetrics();
+ assertThat(allMetrics).hasSize(nThreads);
+ assertThat(registry.getIncludedMetrics()).as("Our filter filters out
half of the metrics, so we expect this value to be half")
+ .hasSize(nThreads / 2);
+
+ // Validate that all metric names are in the set of all metric names
+ Set<String> allMetricNames = allMetrics.keySet();
+ for (int i = 0; i < nThreads; i++)
+ {
+ String expectedMetricName = "testMetricThroughputMeter_" + i + "_"
+ ((i % 2 == 0) ? "even" : "odd");
+ assertThat(allMetricNames).as("Expected metric %s",
expectedMetricName).contains(expectedMetricName);
+ }
+ }
+
+ @Test
+ void testGetMetricsWithConcurrentRegistrationAndRemoval() throws
InterruptedException
+ {
+ FilteringMetricRegistry registry = new FilteringMetricRegistry(s -> {
+ int lastIndexOfUnderscore = s.lastIndexOf("_") + 1;
+ int i = Integer.parseInt(s.substring(lastIndexOfUnderscore));
+ return i % 4 != 0;
+ });
+
+ int nThreads = 100;
+
+ // First populate the registry
+ for (int i = 0; i < nThreads; i++)
+ {
+ String registryName = "testMetricThroughputMeter_" + i;
+ registry.register(registryName, new ThroughputMeter());
+ assertThat(registry.getMetrics()).hasSize(i + 1);
+ }
+
+ // Let's get some concurrent removals to the registry
+ ExecutorService pool = Executors.newFixedThreadPool(nThreads);
+ CountDownLatch latch = new CountDownLatch(nThreads);
+ for (int i = 0; i < nThreads; i++)
+ {
+ int finalI = i;
+ pool.submit(() -> {
+ try
+ {
+ String registryName = "testMetricThroughputMeter_" +
finalI;
+ boolean removeFromRegistry = finalI % 2 == 0;
+ // Invoke register roughly at the same time
+ latch.countDown();
+ latch.await();
+
+ if (removeFromRegistry)
+ {
+ registry.remove(registryName);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ pool.shutdown();
+ assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+
+ // Let's make sure that all metrics are returned
+ Map<String, Metric> allMetrics = registry.getMetrics();
+ assertThat(allMetrics).as("About half the metrics are
removed").hasSize(nThreads / 2);
+ assertThat(registry.getIncludedMetrics()).hasSize(nThreads / 2);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]