This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 81a2cb782e Register the measurements of the bootstrap process as
Dropwizard metrics
81a2cb782e is described below
commit 81a2cb782eed932961b0c89fbd40199c7269c662
Author: Maxim Muzafarov <[email protected]>
AuthorDate: Wed Mar 6 14:54:02 2024 +0100
Register the measurements of the bootstrap process as Dropwizard metrics
patch by Maxim Muzafarov; reviewed by Stefan Miklosovic for CASSANDRA-19447
---
CHANGES.txt | 1 +
.../org/apache/cassandra/dht/BootStrapper.java | 49 +++++++++++++++++-
.../apache/cassandra/metrics/StorageMetrics.java | 4 +-
.../test/hostreplacement/FailedBootstrapTest.java | 14 ++++-
.../distributed/test/ring/BootstrapTest.java | 60 +++++++++++++++++++++-
5 files changed, 122 insertions(+), 6 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index b6b9ab86ba..af985568cd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Register the measurements of the bootstrap process as Dropwizard metrics
(CASSANDRA-19447)
* Add LIST SUPERUSERS CQL statement (CASSANDRA-19417)
* Modernize CQLSH datetime conversions (CASSANDRA-18879)
* Harry model and in-JVM tests for partition-restricted 2i queries
(CASSANDRA-18275)
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java
b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 82f587ed30..6c306a3ff0 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -24,18 +24,20 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
-import org.apache.cassandra.tcm.ownership.MovementMap;
-import org.apache.cassandra.utils.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.codahale.metrics.Gauge;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.tokenallocator.TokenAllocation;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.streaming.StreamEvent;
@@ -44,13 +46,21 @@ import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ownership.MovementMap;
+import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.progress.ProgressEvent;
import org.apache.cassandra.utils.progress.ProgressEventNotifierSupport;
import org.apache.cassandra.utils.progress.ProgressEventType;
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
public class BootStrapper extends ProgressEventNotifierSupport
{
private static final Logger logger =
LoggerFactory.getLogger(BootStrapper.class);
+ private static final AtomicLong bootstrapFilesTotal = new AtomicLong();
+ private static final AtomicLong bootstrapFilesReceived = new AtomicLong();
+ private static final AtomicReference<String> bootstrapLastSeenStatus = new
AtomicReference<>();
+ private static final AtomicReference<String> bootstrapLastSeenError = new
AtomicReference<>();
/* endpoint that needs to be bootstrapped */
protected final InetAddressAndPort address;
@@ -59,6 +69,14 @@ public class BootStrapper extends
ProgressEventNotifierSupport
private final MovementMap movements;
private final MovementMap strictMovements;
+ static
+ {
+
Metrics.<Gauge<Long>>register(StorageMetrics.factory.createMetricName("BootstrapFilesTotal"),
bootstrapFilesTotal::get);
+
Metrics.<Gauge<Long>>register(StorageMetrics.factory.createMetricName("BootstrapFilesReceived"),
bootstrapFilesReceived::get);
+
Metrics.<Gauge<String>>register(StorageMetrics.factory.createMetricName("BootstrapLastSeenStatus"),
bootstrapLastSeenStatus::get);
+
Metrics.<Gauge<String>>register(StorageMetrics.factory.createMetricName("BootstrapLastSeenError"),
bootstrapLastSeenError::get);
+ }
+
public BootStrapper(InetAddressAndPort address,
ClusterMetadata metadata,
MovementMap movements,
@@ -70,6 +88,30 @@ public class BootStrapper extends
ProgressEventNotifierSupport
this.metadata = metadata;
this.movements = movements;
this.strictMovements = strictMovements;
+
+ addProgressListener((tag, event) -> {
+ ProgressEventType type = event.getType();
+ switch (type)
+ {
+ case START:
+ bootstrapFilesTotal.set(0);
+ bootstrapFilesReceived.set(0);
+ bootstrapLastSeenStatus.set(event.getMessage());
+ bootstrapLastSeenError.set("");
+ break;
+ case PROGRESS:
+ bootstrapFilesTotal.set(event.getTotal());
+ bootstrapFilesReceived.set(event.getProgressCount());
+ break;
+ case SUCCESS:
+ case COMPLETE:
+ bootstrapLastSeenStatus.set(event.getMessage());
+ break;
+ case ERROR:
+ bootstrapLastSeenError.set(event.getMessage());
+ break;
+ }
+ });
}
public Future<StreamState> bootstrap(StreamStateStore stateStore, boolean
useStrictConsistency, InetAddressAndPort beingReplaced)
@@ -100,6 +142,8 @@ public class BootStrapper extends
ProgressEventNotifierSupport
streamer.addKeyspaceToFetch(keyspaceName);
}
+ fireProgressEvent("bootstrap", new
ProgressEvent(ProgressEventType.START, 0, 0, "Beginning bootstrap process"));
+
StreamResultFuture bootstrapStreamResult = streamer.fetchAsync();
bootstrapStreamResult.addEventListener(new StreamEventHandler()
{
@@ -122,6 +166,7 @@ public class BootStrapper extends
ProgressEventNotifierSupport
StreamEvent.ProgressEvent progress =
(StreamEvent.ProgressEvent) event;
if (progress.progress.isCompleted())
{
+
StorageMetrics.bootstrapFilesThroughputMetric.mark();
int received = receivedFiles.incrementAndGet();
ProgressEvent currentProgress = new
ProgressEvent(ProgressEventType.PROGRESS, received, totalFilesToReceive.get(),
"received file " + progress.progress.fileName);
fireProgressEvent("bootstrap", currentProgress);
diff --git a/src/java/org/apache/cassandra/metrics/StorageMetrics.java
b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
index ebb031cc4d..57b57e831f 100644
--- a/src/java/org/apache/cassandra/metrics/StorageMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
@@ -22,6 +22,7 @@ import java.util.stream.StreamSupport;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
import org.apache.cassandra.db.Keyspace;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -31,7 +32,7 @@ import static
org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
*/
public class StorageMetrics
{
- private static final MetricNameFactory factory = new
DefaultNameFactory("Storage");
+ public static final MetricNameFactory factory = new
DefaultNameFactory("Storage");
public static final Counter load =
Metrics.counter(factory.createMetricName("Load"));
public static final Counter uncompressedLoad =
Metrics.counter(factory.createMetricName("UncompressedLoad"));
@@ -47,6 +48,7 @@ public class StorageMetrics
public static final Counter repairExceptions =
Metrics.counter(factory.createMetricName("RepairExceptions"));
public static final Counter totalOpsForInvalidToken =
Metrics.counter(factory.createMetricName("TotalOpsForInvalidToken"));
public static final Counter startupOpsForInvalidToken =
Metrics.counter(factory.createMetricName("StartupOpsForInvalidToken"));
+ public static final Meter bootstrapFilesThroughputMetric =
Metrics.meter(factory.createMetricName("BootstrapFilesThroughput"));
private static Gauge<Long> createSummingGauge(String name,
ToLongFunction<KeyspaceMetrics> extractor)
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java
index 5c906a1449..3b28c20c69 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java
@@ -44,10 +44,14 @@ import org.apache.cassandra.streaming.StreamException;
import org.apache.cassandra.streaming.StreamResultFuture;
import static net.bytebuddy.matcher.ElementMatchers.named;
-import static
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
-import static
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster;
import static org.apache.cassandra.distributed.shared.ClusterUtils.addInstance;
import static
org.apache.cassandra.distributed.shared.ClusterUtils.startHostReplacement;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
+import static
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster;
+import static
org.apache.cassandra.distributed.test.ring.BootstrapTest.getMetricGaugeValue;
+import static
org.apache.cassandra.distributed.test.ring.BootstrapTest.getMetricMeterRate;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class FailedBootstrapTest extends TestBaseImpl
{
@@ -82,6 +86,12 @@ public class FailedBootstrapTest extends TestBaseImpl
result.asserts().success();
logger.info("gossipinfo for node{}\n{}", i.config().num(),
result.getStdout());
});
+
+ assertTrue(getMetricGaugeValue(added, "BootstrapFilesTotal",
Long.class) > 0L);
+ assertTrue(getMetricGaugeValue(added, "BootstrapFilesReceived",
Long.class) > 0L);
+ assertEquals("Beginning bootstrap process",
getMetricGaugeValue(added, "BootstrapLastSeenStatus", String.class));
+ assertEquals("Stream failed", getMetricGaugeValue(added,
"BootstrapLastSeenError", String.class));
+ assertTrue(getMetricMeterRate(added, "BootstrapFilesThroughput") >
0);
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
index ecdb152f89..f43c459f8f 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
@@ -19,9 +19,15 @@
package org.apache.cassandra.distributed.test.ring;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
import org.junit.Test;
@@ -36,8 +42,10 @@ import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.JMXUtil;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.metrics.DefaultNameFactory;
import org.apache.cassandra.service.StorageService;
import static net.bytebuddy.matcher.ElementMatchers.named;
@@ -45,6 +53,7 @@ import static
org.apache.cassandra.config.CassandraRelevantProperties.RESET_BOOT
import static
org.apache.cassandra.config.CassandraRelevantProperties.TEST_WRITE_SURVEY;
import static
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.JMX;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -136,7 +145,7 @@ public class BootstrapTest extends TestBaseImpl
try (Cluster cluster = builder().withNodes(originalNodeCount)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount,
"dc0", "rack0"))
- .withConfig(config ->
config.with(NETWORK, GOSSIP))
+ .withConfig(config ->
config.with(NETWORK, GOSSIP, JMX))
.withInstanceInitializer(BootstrapTest.BB::install)
.start())
{
@@ -154,9 +163,58 @@ public class BootstrapTest extends TestBaseImpl
assertEquals("COMPLETED",
StorageService.instance.getBootstrapState());
assertFalse(StorageService.instance.isBootstrapFailed());
});
+
+ assertEquals(Long.valueOf(0L),
getMetricGaugeValue(joiningInstance, "BootstrapFilesTotal", Long.class));
+ assertEquals(Long.valueOf(0L),
getMetricGaugeValue(joiningInstance, "BootstrapFilesReceived", Long.class));
+ assertEquals("Bootstrap streaming success",
getMetricGaugeValue(joiningInstance, "BootstrapLastSeenStatus", String.class));
+ assertEquals("", getMetricGaugeValue(joiningInstance,
"BootstrapLastSeenError", String.class));
}
}
+ public static <T> T getMetricGaugeValue(IInvokableInstance instance,
String metricName, Class<T> gaugeReturnType)
+ {
+ return gaugeReturnType.cast(getMetricAttribute(instance, metricName,
"Value"));
+ }
+
+ public static long getMetricMeterRate(IInvokableInstance instance, String
metricName)
+ {
+ Object raw = getMetricAttribute(instance, metricName, "Count");
+ return raw == null ? 0 : (Long) raw;
+ }
+
+ public static Object getMetricAttribute(IInvokableInstance instance,
String metricName, String attributeName)
+ {
+ if (instance.isShutdown())
+ throw new IllegalStateException("Instance is shutdown");
+
+ try (JMXConnector jmxc = JMXUtil.getJmxConnector(instance.config()))
+ {
+ MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+ ObjectName metric = mbsc.queryNames(null, null)
+ .stream()
+ .filter(objectName ->
objectName.getDomain().equals(DefaultNameFactory.GROUP_NAME))
+ .filter(objectName ->
Objects.nonNull(objectName.getKeyProperty("name")))
+ .filter(objectName ->
metricName.equals(objectName.getKeyProperty("name")))
+ .findFirst()
+ .orElse(null);
+
+ if (metric == null)
+ return null;
+
+ MBeanInfo info = mbsc.getMBeanInfo(metric);
+ for (MBeanAttributeInfo a : info.getAttributes())
+ {
+ if (a.getName().equals(attributeName))
+ return mbsc.getAttribute(metric, a.getName());
+ }
+
+ return null;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
public static void populate(ICluster cluster, int from, int to)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]