This is an automated email from the ASF dual-hosted git repository. mmuzaf pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 81a2cb782e Register the measurements of the bootstrap process as Dropwizard metrics 81a2cb782e is described below commit 81a2cb782eed932961b0c89fbd40199c7269c662 Author: Maxim Muzafarov <maxmu...@gmail.com> AuthorDate: Wed Mar 6 14:54:02 2024 +0100 Register the measurements of the bootstrap process as Dropwizard metrics patch by Maxim Muzafarov; reviewed by Stefan Miklosovic for CASSANDRA-19447 --- CHANGES.txt | 1 + .../org/apache/cassandra/dht/BootStrapper.java | 49 +++++++++++++++++- .../apache/cassandra/metrics/StorageMetrics.java | 4 +- .../test/hostreplacement/FailedBootstrapTest.java | 14 ++++- .../distributed/test/ring/BootstrapTest.java | 60 +++++++++++++++++++++- 5 files changed, 122 insertions(+), 6 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index b6b9ab86ba..af985568cd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Register the measurements of the bootstrap process as Dropwizard metrics (CASSANDRA-19447) * Add LIST SUPERUSERS CQL statement (CASSANDRA-19417) * Modernize CQLSH datetime conversions (CASSANDRA-18879) * Harry model and in-JVM tests for partition-restricted 2i queries (CASSANDRA-18275) diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index 82f587ed30..6c306a3ff0 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -24,18 +24,20 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; -import org.apache.cassandra.tcm.ownership.MovementMap; -import org.apache.cassandra.utils.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.codahale.metrics.Gauge; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.tokenallocator.TokenAllocation; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.streaming.StreamEvent; @@ -44,13 +46,21 @@ import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.streaming.StreamState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ownership.MovementMap; +import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventNotifierSupport; import org.apache.cassandra.utils.progress.ProgressEventType; +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; + public class BootStrapper extends ProgressEventNotifierSupport { private static final Logger logger = LoggerFactory.getLogger(BootStrapper.class); + private static final AtomicLong bootstrapFilesTotal = new AtomicLong(); + private static final AtomicLong bootstrapFilesReceived = new AtomicLong(); + private static final AtomicReference<String> bootstrapLastSeenStatus = new AtomicReference<>(); + private static final AtomicReference<String> bootstrapLastSeenError = new AtomicReference<>(); /* endpoint that needs to be bootstrapped */ protected final InetAddressAndPort address; @@ -59,6 +69,14 @@ public class BootStrapper extends ProgressEventNotifierSupport private final MovementMap movements; private final MovementMap strictMovements; + static + { + Metrics.<Gauge<Long>>register(StorageMetrics.factory.createMetricName("BootstrapFilesTotal"), bootstrapFilesTotal::get); + Metrics.<Gauge<Long>>register(StorageMetrics.factory.createMetricName("BootstrapFilesReceived"), bootstrapFilesReceived::get); + Metrics.<Gauge<String>>register(StorageMetrics.factory.createMetricName("BootstrapLastSeenStatus"), bootstrapLastSeenStatus::get); + Metrics.<Gauge<String>>register(StorageMetrics.factory.createMetricName("BootstrapLastSeenError"), bootstrapLastSeenError::get); + } + public BootStrapper(InetAddressAndPort address, ClusterMetadata metadata, MovementMap movements, @@ -70,6 +88,30 @@ public class BootStrapper extends ProgressEventNotifierSupport this.metadata = metadata; this.movements = movements; this.strictMovements = strictMovements; + + addProgressListener((tag, event) -> { + ProgressEventType type = event.getType(); + switch (type) + { + case START: + bootstrapFilesTotal.set(0); + bootstrapFilesReceived.set(0); + bootstrapLastSeenStatus.set(event.getMessage()); + bootstrapLastSeenError.set(""); + break; + case PROGRESS: + bootstrapFilesTotal.set(event.getTotal()); + bootstrapFilesReceived.set(event.getProgressCount()); + break; + case SUCCESS: + case COMPLETE: + bootstrapLastSeenStatus.set(event.getMessage()); + break; + case ERROR: + bootstrapLastSeenError.set(event.getMessage()); + break; + } + }); } public Future<StreamState> bootstrap(StreamStateStore stateStore, boolean useStrictConsistency, InetAddressAndPort beingReplaced) @@ -100,6 +142,8 @@ public class BootStrapper extends ProgressEventNotifierSupport streamer.addKeyspaceToFetch(keyspaceName); } + fireProgressEvent("bootstrap", new ProgressEvent(ProgressEventType.START, 0, 0, "Beginning bootstrap process")); + StreamResultFuture bootstrapStreamResult = streamer.fetchAsync(); bootstrapStreamResult.addEventListener(new StreamEventHandler() { @@ -122,6 +166,7 @@ public class BootStrapper extends ProgressEventNotifierSupport StreamEvent.ProgressEvent progress = (StreamEvent.ProgressEvent) event; if (progress.progress.isCompleted()) { + StorageMetrics.bootstrapFilesThroughputMetric.mark(); int received = receivedFiles.incrementAndGet(); ProgressEvent currentProgress = new ProgressEvent(ProgressEventType.PROGRESS, received, totalFilesToReceive.get(), "received file " + progress.progress.fileName); fireProgressEvent("bootstrap", currentProgress); diff --git a/src/java/org/apache/cassandra/metrics/StorageMetrics.java b/src/java/org/apache/cassandra/metrics/StorageMetrics.java index ebb031cc4d..57b57e831f 100644 --- a/src/java/org/apache/cassandra/metrics/StorageMetrics.java +++ b/src/java/org/apache/cassandra/metrics/StorageMetrics.java @@ -22,6 +22,7 @@ import java.util.stream.StreamSupport; import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; import org.apache.cassandra.db.Keyspace; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -31,7 +32,7 @@ import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; */ public class StorageMetrics { - private static final MetricNameFactory factory = new DefaultNameFactory("Storage"); + public static final MetricNameFactory factory = new DefaultNameFactory("Storage"); public static final Counter load = Metrics.counter(factory.createMetricName("Load")); public static final Counter uncompressedLoad = Metrics.counter(factory.createMetricName("UncompressedLoad")); @@ -47,6 +48,7 @@ public class StorageMetrics public static final Counter repairExceptions = Metrics.counter(factory.createMetricName("RepairExceptions")); public static final Counter totalOpsForInvalidToken = Metrics.counter(factory.createMetricName("TotalOpsForInvalidToken")); public static final Counter startupOpsForInvalidToken = Metrics.counter(factory.createMetricName("StartupOpsForInvalidToken")); + public static final Meter bootstrapFilesThroughputMetric = Metrics.meter(factory.createMetricName("BootstrapFilesThroughput")); private static Gauge<Long> createSummingGauge(String name, ToLongFunction<KeyspaceMetrics> extractor) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java index 5c906a1449..3b28c20c69 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java @@ -44,10 +44,14 @@ import org.apache.cassandra.streaming.StreamException; import org.apache.cassandra.streaming.StreamResultFuture; import static net.bytebuddy.matcher.ElementMatchers.named; -import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked; -import static org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster; import static org.apache.cassandra.distributed.shared.ClusterUtils.addInstance; import static org.apache.cassandra.distributed.shared.ClusterUtils.startHostReplacement; +import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked; +import static org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster; +import static org.apache.cassandra.distributed.test.ring.BootstrapTest.getMetricGaugeValue; +import static org.apache.cassandra.distributed.test.ring.BootstrapTest.getMetricMeterRate; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class FailedBootstrapTest extends TestBaseImpl { @@ -82,6 +86,12 @@ public class FailedBootstrapTest extends TestBaseImpl result.asserts().success(); logger.info("gossipinfo for node{}\n{}", i.config().num(), result.getStdout()); }); + + assertTrue(getMetricGaugeValue(added, "BootstrapFilesTotal", Long.class) > 0L); + assertTrue(getMetricGaugeValue(added, "BootstrapFilesReceived", Long.class) > 0L); + assertEquals("Beginning bootstrap process", getMetricGaugeValue(added, "BootstrapLastSeenStatus", String.class)); + assertEquals("Stream failed", getMetricGaugeValue(added, "BootstrapLastSeenError", String.class)); + assertTrue(getMetricMeterRate(added, "BootstrapFilesThroughput") > 0); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java index ecdb152f89..f43c459f8f 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java @@ -19,9 +19,15 @@ package org.apache.cassandra.distributed.test.ring; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Callable; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; import org.junit.Test; @@ -36,8 +42,10 @@ import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.JMXUtil; import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.metrics.DefaultNameFactory; import org.apache.cassandra.service.StorageService; import static net.bytebuddy.matcher.ElementMatchers.named; @@ -45,6 +53,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.RESET_BOOT import static org.apache.cassandra.config.CassandraRelevantProperties.TEST_WRITE_SURVEY; import static org.apache.cassandra.distributed.action.GossipHelper.withProperty; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.JMX; import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -136,7 +145,7 @@ public class BootstrapTest extends TestBaseImpl try (Cluster cluster = builder().withNodes(originalNodeCount) .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) - .withConfig(config -> config.with(NETWORK, GOSSIP)) + .withConfig(config -> config.with(NETWORK, GOSSIP, JMX)) .withInstanceInitializer(BootstrapTest.BB::install) .start()) { @@ -154,9 +163,58 @@ public class BootstrapTest extends TestBaseImpl assertEquals("COMPLETED", StorageService.instance.getBootstrapState()); assertFalse(StorageService.instance.isBootstrapFailed()); }); + + assertEquals(Long.valueOf(0L), getMetricGaugeValue(joiningInstance, "BootstrapFilesTotal", Long.class)); + assertEquals(Long.valueOf(0L), getMetricGaugeValue(joiningInstance, "BootstrapFilesReceived", Long.class)); + assertEquals("Bootstrap streaming success", getMetricGaugeValue(joiningInstance, "BootstrapLastSeenStatus", String.class)); + assertEquals("", getMetricGaugeValue(joiningInstance, "BootstrapLastSeenError", String.class)); } } + public static <T> T getMetricGaugeValue(IInvokableInstance instance, String metricName, Class<T> gaugeReturnType) + { + return gaugeReturnType.cast(getMetricAttribute(instance, metricName, "Value")); + } + + public static long getMetricMeterRate(IInvokableInstance instance, String metricName) + { + Object raw = getMetricAttribute(instance, metricName, "Count"); + return raw == null ? 0 : (Long) raw; + } + + public static Object getMetricAttribute(IInvokableInstance instance, String metricName, String attributeName) + { + if (instance.isShutdown()) + throw new IllegalStateException("Instance is shutdown"); + + try (JMXConnector jmxc = JMXUtil.getJmxConnector(instance.config())) + { + MBeanServerConnection mbsc = jmxc.getMBeanServerConnection(); + ObjectName metric = mbsc.queryNames(null, null) + .stream() + .filter(objectName -> objectName.getDomain().equals(DefaultNameFactory.GROUP_NAME)) + .filter(objectName -> Objects.nonNull(objectName.getKeyProperty("name"))) + .filter(objectName -> metricName.equals(objectName.getKeyProperty("name"))) + .findFirst() + .orElse(null); + + if (metric == null) + return null; + + MBeanInfo info = mbsc.getMBeanInfo(metric); + for (MBeanAttributeInfo a : info.getAttributes()) + { + if (a.getName().equals(attributeName)) + return mbsc.getAttribute(metric, a.getName()); + } + + return null; + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } public static void populate(ICluster cluster, int from, int to) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org