This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 36b5429abc7 [SPARK-44741][CORE] Support regex-based MetricFilter in 
`StatsdSink`
36b5429abc7 is described below

commit 36b5429abc73f59459d1636ebef58a5a679af6c0
Author: Rameshkrishnan Muthusamy <rameshkrishnan_muthus...@apple.com>
AuthorDate: Thu Aug 10 07:07:24 2023 -0700

    [SPARK-44741][CORE] Support regex-based MetricFilter in `StatsdSink`
    
    ### What changes were proposed in this pull request?
    
    Adding additional option in StatsdSink to allow passing regex as a filter 
option for reporting metrics
    Added unit test case to validate the application of filter in the StatsdSink
    Updated metrics config to represent the added config for StatsdSink
    
    ### Why are the changes needed?
    
    In the current state Spark metrics instances send a large range of metrics 
that are useful for debugging and performance analysis. There are cases where 
the consumers would like to switch between deeper metrics and only specific 
custom metrics that are required for system maintenance. This option introduced 
in StatsdSink extends the existing filtering option in the reporter and exposes 
the same as an option. This implementation is similar to GraphiteSink that 
supports a similar feature.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes , This PR exposes an additional config that users can provide 
optionally to filter the metrics that are being reported. This is a optional 
field and defaulted to the existing behaviour where the sink reports all the 
metrics.
    
    ### How was this patch tested?
    
    Unit tests has been added in the PR to validate the change
    
    Closes #42416 from ramesh-muthusamy/SPARK-44741.
    
    Authored-by: Rameshkrishnan Muthusamy <rameshkrishnan_muthus...@apple.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 conf/metrics.properties.template                   |  2 ++
 .../org/apache/spark/metrics/sink/StatsdSink.scala | 14 ++++++++--
 .../spark/metrics/sink/StatsdSinkSuite.scala       | 30 ++++++++++++++++++++++
 3 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index f52d33fd642..aa8e0e438e6 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -139,6 +139,7 @@
 #   period    10            Poll period
 #   unit      seconds       Units of poll period
 #   prefix    EMPTY STRING  Prefix to prepend to metric name
+#   regex     NONE          Optional filter to send only metrics matching this 
regex string
 
 ## Examples
 # Enable JmxSink for all instances by class name
@@ -150,6 +151,7 @@
 # Enable StatsdSink for all instances by class name
 #*.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink
 #*.sink.statsd.prefix=spark
+#*.sink.statsd.regex=<optional_value>
 
 # Polling period for the ConsoleSink
 #*.sink.console.period=10
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala 
b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala
index c6e7bcccd4c..c506b86b456 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala
@@ -20,7 +20,7 @@ package org.apache.spark.metrics.sink
 import java.util.{Locale, Properties}
 import java.util.concurrent.TimeUnit
 
-import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.MetricsSystem
@@ -31,6 +31,7 @@ private[spark] object StatsdSink {
   val STATSD_KEY_PERIOD = "period"
   val STATSD_KEY_UNIT = "unit"
   val STATSD_KEY_PREFIX = "prefix"
+  val STATSD_KEY_REGEX = "regex"
 
   val STATSD_DEFAULT_HOST = "127.0.0.1"
   val STATSD_DEFAULT_PORT = "8125"
@@ -53,9 +54,18 @@ private[spark] class StatsdSink(
 
   val prefix = property.getProperty(STATSD_KEY_PREFIX, STATSD_DEFAULT_PREFIX)
 
+  val filter = Option(property.getProperty(STATSD_KEY_REGEX)) match {
+    case Some(pattern) => new MetricFilter() {
+      override def matches(name: String, metric: Metric): Boolean = {
+        pattern.r.findFirstMatchIn(name).isDefined
+      }
+    }
+    case None => MetricFilter.ALL
+  }
+
   MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
 
-  val reporter = new StatsdReporter(registry, host, port, prefix)
+  val reporter = new StatsdReporter(registry, host, port, prefix, filter)
 
   override def start(): Unit = {
     reporter.start(pollPeriod, pollUnit)
diff --git 
a/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala 
b/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala
index ff883633d5e..28bf40e8c93 100644
--- a/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala
@@ -22,6 +22,8 @@ import java.nio.charset.StandardCharsets.UTF_8
 import java.util.Properties
 import java.util.concurrent.TimeUnit._
 
+import scala.collection.JavaConverters._
+
 import com.codahale.metrics._
 
 import org.apache.spark.SparkFunSuite
@@ -59,6 +61,7 @@ class StatsdSinkSuite extends SparkFunSuite {
     val props = new Properties
     defaultProps.foreach(e => props.put(e._1, e._2))
     props.put(STATSD_KEY_PORT, socket.getLocalPort.toString)
+    props.put(STATSD_KEY_REGEX, "counter|gauge|histogram|timer")
     val registry = new MetricRegistry
     val sink = new StatsdSink(props, registry)
     try {
@@ -171,5 +174,32 @@ class StatsdSinkSuite extends SparkFunSuite {
       }
     }
   }
+
+
+  test("metrics StatsD sink with filtered Gauge") {
+    withSocketAndSink { (socket, sink) =>
+      val gauge = new Gauge[Double] {
+        override def getValue: Double = 1.23
+      }
+
+      val filteredMetricKeys = Set(
+        "gauge",
+        "gauge-1"
+      )
+
+      filteredMetricKeys.foreach(sink.registry.register(_, gauge))
+
+      sink.registry.register("excluded-metric", gauge)
+      sink.report()
+
+      val p = new DatagramPacket(new Array[Byte](maxPayloadSize), 
maxPayloadSize)
+      socket.receive(p)
+
+      val metricKeys = sink.registry.getGauges(sink.filter).keySet.asScala
+
+      assert(metricKeys.equals(filteredMetricKeys),
+        "Should contain only metrics matches regex filter")
+    }
+  }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to