This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 81a2cb782e Register the measurements of the bootstrap process as 
Dropwizard metrics
81a2cb782e is described below

commit 81a2cb782eed932961b0c89fbd40199c7269c662
Author: Maxim Muzafarov <maxmu...@gmail.com>
AuthorDate: Wed Mar 6 14:54:02 2024 +0100

    Register the measurements of the bootstrap process as Dropwizard metrics
    
    patch by Maxim Muzafarov; reviewed by Stefan Miklosovic for CASSANDRA-19447
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/dht/BootStrapper.java     | 49 +++++++++++++++++-
 .../apache/cassandra/metrics/StorageMetrics.java   |  4 +-
 .../test/hostreplacement/FailedBootstrapTest.java  | 14 ++++-
 .../distributed/test/ring/BootstrapTest.java       | 60 +++++++++++++++++++++-
 5 files changed, 122 insertions(+), 6 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index b6b9ab86ba..af985568cd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Register the measurements of the bootstrap process as Dropwizard metrics 
(CASSANDRA-19447)
  * Add LIST SUPERUSERS CQL statement (CASSANDRA-19417)
  * Modernize CQLSH datetime conversions (CASSANDRA-18879)
  * Harry model and in-JVM tests for partition-restricted 2i queries 
(CASSANDRA-18275)
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java 
b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 82f587ed30..6c306a3ff0 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -24,18 +24,20 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.cassandra.tcm.ownership.MovementMap;
-import org.apache.cassandra.utils.concurrent.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.codahale.metrics.Gauge;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.tokenallocator.TokenAllocation;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.streaming.StreamEvent;
@@ -44,13 +46,21 @@ import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ownership.MovementMap;
+import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.progress.ProgressEvent;
 import org.apache.cassandra.utils.progress.ProgressEventNotifierSupport;
 import org.apache.cassandra.utils.progress.ProgressEventType;
 
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
 public class BootStrapper extends ProgressEventNotifierSupport
 {
     private static final Logger logger = 
LoggerFactory.getLogger(BootStrapper.class);
+    private static final AtomicLong bootstrapFilesTotal = new AtomicLong();
+    private static final AtomicLong bootstrapFilesReceived = new AtomicLong();
+    private static final AtomicReference<String> bootstrapLastSeenStatus = new 
AtomicReference<>();
+    private static final AtomicReference<String> bootstrapLastSeenError = new 
AtomicReference<>();
 
     /* endpoint that needs to be bootstrapped */
     protected final InetAddressAndPort address;
@@ -59,6 +69,14 @@ public class BootStrapper extends 
ProgressEventNotifierSupport
     private final MovementMap movements;
     private final MovementMap strictMovements;
 
+    static
+    {
+        
Metrics.<Gauge<Long>>register(StorageMetrics.factory.createMetricName("BootstrapFilesTotal"),
 bootstrapFilesTotal::get);
+        
Metrics.<Gauge<Long>>register(StorageMetrics.factory.createMetricName("BootstrapFilesReceived"),
 bootstrapFilesReceived::get);
+        
Metrics.<Gauge<String>>register(StorageMetrics.factory.createMetricName("BootstrapLastSeenStatus"),
 bootstrapLastSeenStatus::get);
+        
Metrics.<Gauge<String>>register(StorageMetrics.factory.createMetricName("BootstrapLastSeenError"),
 bootstrapLastSeenError::get);
+    }
+
     public BootStrapper(InetAddressAndPort address,
                         ClusterMetadata metadata,
                         MovementMap movements,
@@ -70,6 +88,30 @@ public class BootStrapper extends 
ProgressEventNotifierSupport
         this.metadata = metadata;
         this.movements = movements;
         this.strictMovements = strictMovements;
+
+        addProgressListener((tag, event) -> {
+            ProgressEventType type = event.getType();
+            switch (type)
+            {
+                case START:
+                    bootstrapFilesTotal.set(0);
+                    bootstrapFilesReceived.set(0);
+                    bootstrapLastSeenStatus.set(event.getMessage());
+                    bootstrapLastSeenError.set("");
+                    break;
+                case PROGRESS:
+                    bootstrapFilesTotal.set(event.getTotal());
+                    bootstrapFilesReceived.set(event.getProgressCount());
+                    break;
+                case SUCCESS:
+                case COMPLETE:
+                    bootstrapLastSeenStatus.set(event.getMessage());
+                    break;
+                case ERROR:
+                    bootstrapLastSeenError.set(event.getMessage());
+                    break;
+            }
+        });
     }
 
     public Future<StreamState> bootstrap(StreamStateStore stateStore, boolean 
useStrictConsistency, InetAddressAndPort beingReplaced)
@@ -100,6 +142,8 @@ public class BootStrapper extends 
ProgressEventNotifierSupport
             streamer.addKeyspaceToFetch(keyspaceName);
         }
 
+        fireProgressEvent("bootstrap", new 
ProgressEvent(ProgressEventType.START, 0, 0, "Beginning bootstrap process"));
+
         StreamResultFuture bootstrapStreamResult = streamer.fetchAsync();
         bootstrapStreamResult.addEventListener(new StreamEventHandler()
         {
@@ -122,6 +166,7 @@ public class BootStrapper extends 
ProgressEventNotifierSupport
                         StreamEvent.ProgressEvent progress = 
(StreamEvent.ProgressEvent) event;
                         if (progress.progress.isCompleted())
                         {
+                            
StorageMetrics.bootstrapFilesThroughputMetric.mark();
                             int received = receivedFiles.incrementAndGet();
                             ProgressEvent currentProgress = new 
ProgressEvent(ProgressEventType.PROGRESS, received, totalFilesToReceive.get(), 
"received file " + progress.progress.fileName);
                             fireProgressEvent("bootstrap", currentProgress);
diff --git a/src/java/org/apache/cassandra/metrics/StorageMetrics.java 
b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
index ebb031cc4d..57b57e831f 100644
--- a/src/java/org/apache/cassandra/metrics/StorageMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
@@ -22,6 +22,7 @@ import java.util.stream.StreamSupport;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
 import org.apache.cassandra.db.Keyspace;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -31,7 +32,7 @@ import static 
org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
  */
 public class StorageMetrics
 {
-    private static final MetricNameFactory factory = new 
DefaultNameFactory("Storage");
+    public static final MetricNameFactory factory = new 
DefaultNameFactory("Storage");
 
     public static final Counter load = 
Metrics.counter(factory.createMetricName("Load"));
     public static final Counter uncompressedLoad = 
Metrics.counter(factory.createMetricName("UncompressedLoad"));
@@ -47,6 +48,7 @@ public class StorageMetrics
     public static final Counter repairExceptions = 
Metrics.counter(factory.createMetricName("RepairExceptions"));
     public static final Counter totalOpsForInvalidToken = 
Metrics.counter(factory.createMetricName("TotalOpsForInvalidToken"));
     public static final Counter startupOpsForInvalidToken = 
Metrics.counter(factory.createMetricName("StartupOpsForInvalidToken"));
+    public static final Meter bootstrapFilesThroughputMetric = 
Metrics.meter(factory.createMetricName("BootstrapFilesThroughput"));
 
     private static Gauge<Long> createSummingGauge(String name, 
ToLongFunction<KeyspaceMetrics> extractor)
     {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java
index 5c906a1449..3b28c20c69 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java
@@ -44,10 +44,14 @@ import org.apache.cassandra.streaming.StreamException;
 import org.apache.cassandra.streaming.StreamResultFuture;
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
-import static 
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
-import static 
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster;
 import static org.apache.cassandra.distributed.shared.ClusterUtils.addInstance;
 import static 
org.apache.cassandra.distributed.shared.ClusterUtils.startHostReplacement;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
+import static 
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster;
+import static 
org.apache.cassandra.distributed.test.ring.BootstrapTest.getMetricGaugeValue;
+import static 
org.apache.cassandra.distributed.test.ring.BootstrapTest.getMetricMeterRate;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class FailedBootstrapTest extends TestBaseImpl
 {
@@ -82,6 +86,12 @@ public class FailedBootstrapTest extends TestBaseImpl
                 result.asserts().success();
                 logger.info("gossipinfo for node{}\n{}", i.config().num(), 
result.getStdout());
             });
+
+            assertTrue(getMetricGaugeValue(added, "BootstrapFilesTotal", 
Long.class) > 0L);
+            assertTrue(getMetricGaugeValue(added, "BootstrapFilesReceived", 
Long.class) > 0L);
+            assertEquals("Beginning bootstrap process", 
getMetricGaugeValue(added, "BootstrapLastSeenStatus", String.class));
+            assertEquals("Stream failed", getMetricGaugeValue(added, 
"BootstrapLastSeenError", String.class));
+            assertTrue(getMetricMeterRate(added, "BootstrapFilesThroughput") > 
0);
         }
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
index ecdb152f89..f43c459f8f 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
@@ -19,9 +19,15 @@
 package org.apache.cassandra.distributed.test.ring;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
 
 import org.junit.Test;
 
@@ -36,8 +42,10 @@ import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.JMXUtil;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.metrics.DefaultNameFactory;
 import org.apache.cassandra.service.StorageService;
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
@@ -45,6 +53,7 @@ import static 
org.apache.cassandra.config.CassandraRelevantProperties.RESET_BOOT
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.TEST_WRITE_SURVEY;
 import static 
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.JMX;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -136,7 +145,7 @@ public class BootstrapTest extends TestBaseImpl
         try (Cluster cluster = builder().withNodes(originalNodeCount)
                                         
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
                                         
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
-                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
+                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP, JMX))
                                         
.withInstanceInitializer(BootstrapTest.BB::install)
                                         .start())
         {
@@ -154,9 +163,58 @@ public class BootstrapTest extends TestBaseImpl
                 assertEquals("COMPLETED", 
StorageService.instance.getBootstrapState());
                 assertFalse(StorageService.instance.isBootstrapFailed());
             });
+
+            assertEquals(Long.valueOf(0L), 
getMetricGaugeValue(joiningInstance, "BootstrapFilesTotal", Long.class));
+            assertEquals(Long.valueOf(0L), 
getMetricGaugeValue(joiningInstance, "BootstrapFilesReceived", Long.class));
+            assertEquals("Bootstrap streaming success", 
getMetricGaugeValue(joiningInstance, "BootstrapLastSeenStatus", String.class));
+            assertEquals("", getMetricGaugeValue(joiningInstance, 
"BootstrapLastSeenError", String.class));
         }
     }
 
+    public static <T> T getMetricGaugeValue(IInvokableInstance instance, 
String metricName, Class<T> gaugeReturnType)
+    {
+        return gaugeReturnType.cast(getMetricAttribute(instance, metricName, 
"Value"));
+    }
+
+    public static long getMetricMeterRate(IInvokableInstance instance, String 
metricName)
+    {
+        Object raw = getMetricAttribute(instance, metricName, "Count");
+        return raw == null ? 0 : (Long) raw;
+    }
+
+    public static Object getMetricAttribute(IInvokableInstance instance, 
String metricName, String attributeName)
+    {
+        if (instance.isShutdown())
+            throw new IllegalStateException("Instance is shutdown");
+
+        try (JMXConnector jmxc = JMXUtil.getJmxConnector(instance.config()))
+        {
+            MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+            ObjectName metric = mbsc.queryNames(null, null)
+                                    .stream()
+                                    .filter(objectName -> 
objectName.getDomain().equals(DefaultNameFactory.GROUP_NAME))
+                                    .filter(objectName -> 
Objects.nonNull(objectName.getKeyProperty("name")))
+                                    .filter(objectName -> 
metricName.equals(objectName.getKeyProperty("name")))
+                                    .findFirst()
+                                    .orElse(null);
+
+            if (metric == null)
+                return null;
+
+            MBeanInfo info = mbsc.getMBeanInfo(metric);
+            for (MBeanAttributeInfo a : info.getAttributes())
+            {
+                if (a.getName().equals(attributeName))
+                    return mbsc.getAttribute(metric, a.getName());
+            }
+
+            return null;
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
 
     public static void populate(ICluster cluster, int from, int to)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to