This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 36b5429abc7 [SPARK-44741][CORE] Support regex-based MetricFilter in `StatsdSink` 36b5429abc7 is described below commit 36b5429abc73f59459d1636ebef58a5a679af6c0 Author: Rameshkrishnan Muthusamy <rameshkrishnan_muthus...@apple.com> AuthorDate: Thu Aug 10 07:07:24 2023 -0700 [SPARK-44741][CORE] Support regex-based MetricFilter in `StatsdSink` ### What changes were proposed in this pull request? Adding additional option in StatsdSink to allow passing regex as a filter option for reporting metrics Added unit test case to validate the application of filter in the StatsdSink Updated metrics config to represent the added config for StatsdSink ### Why are the changes needed? In the current state Spark metrics instances send a large range of metrics that are useful for debugging and performance analysis. There are cases where the consumers would like to switch between deeper metrics and only specific custom metrics that are required for system maintenance. This option introduced in StatsdSink extends the existing filtering option in the reporter and exposes the same as an option. This implementation is similar to GraphiteSink that supports a similar feature. ### Does this PR introduce _any_ user-facing change? Yes , This PR exposes an additional config that users can provide optionally to filter the metrics that are being reported. This is a optional field and defaulted to the existing behaviour where the sink reports all the metrics. ### How was this patch tested? Unit tests has been added in the PR to validate the change Closes #42416 from ramesh-muthusamy/SPARK-44741. Authored-by: Rameshkrishnan Muthusamy <rameshkrishnan_muthus...@apple.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- conf/metrics.properties.template | 2 ++ .../org/apache/spark/metrics/sink/StatsdSink.scala | 14 ++++++++-- .../spark/metrics/sink/StatsdSinkSuite.scala | 30 ++++++++++++++++++++++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index f52d33fd642..aa8e0e438e6 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -139,6 +139,7 @@ # period 10 Poll period # unit seconds Units of poll period # prefix EMPTY STRING Prefix to prepend to metric name +# regex NONE Optional filter to send only metrics matching this regex string ## Examples # Enable JmxSink for all instances by class name @@ -150,6 +151,7 @@ # Enable StatsdSink for all instances by class name #*.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink #*.sink.statsd.prefix=spark +#*.sink.statsd.regex=<optional_value> # Polling period for the ConsoleSink #*.sink.console.period=10 diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala index c6e7bcccd4c..c506b86b456 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala @@ -20,7 +20,7 @@ package org.apache.spark.metrics.sink import java.util.{Locale, Properties} import java.util.concurrent.TimeUnit -import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry} import org.apache.spark.internal.Logging import org.apache.spark.metrics.MetricsSystem @@ -31,6 +31,7 @@ private[spark] object StatsdSink { val STATSD_KEY_PERIOD = "period" val STATSD_KEY_UNIT = "unit" val STATSD_KEY_PREFIX = "prefix" + val STATSD_KEY_REGEX = "regex" val STATSD_DEFAULT_HOST = "127.0.0.1" val STATSD_DEFAULT_PORT = "8125" @@ -53,9 +54,18 @@ private[spark] class StatsdSink( val prefix = property.getProperty(STATSD_KEY_PREFIX, STATSD_DEFAULT_PREFIX) + val filter = Option(property.getProperty(STATSD_KEY_REGEX)) match { + case Some(pattern) => new MetricFilter() { + override def matches(name: String, metric: Metric): Boolean = { + pattern.r.findFirstMatchIn(name).isDefined + } + } + case None => MetricFilter.ALL + } + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - val reporter = new StatsdReporter(registry, host, port, prefix) + val reporter = new StatsdReporter(registry, host, port, prefix, filter) override def start(): Unit = { reporter.start(pollPeriod, pollUnit) diff --git a/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala b/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala index ff883633d5e..28bf40e8c93 100644 --- a/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala @@ -22,6 +22,8 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.util.Properties import java.util.concurrent.TimeUnit._ +import scala.collection.JavaConverters._ + import com.codahale.metrics._ import org.apache.spark.SparkFunSuite @@ -59,6 +61,7 @@ class StatsdSinkSuite extends SparkFunSuite { val props = new Properties defaultProps.foreach(e => props.put(e._1, e._2)) props.put(STATSD_KEY_PORT, socket.getLocalPort.toString) + props.put(STATSD_KEY_REGEX, "counter|gauge|histogram|timer") val registry = new MetricRegistry val sink = new StatsdSink(props, registry) try { @@ -171,5 +174,32 @@ class StatsdSinkSuite extends SparkFunSuite { } } } + + + test("metrics StatsD sink with filtered Gauge") { + withSocketAndSink { (socket, sink) => + val gauge = new Gauge[Double] { + override def getValue: Double = 1.23 + } + + val filteredMetricKeys = Set( + "gauge", + "gauge-1" + ) + + filteredMetricKeys.foreach(sink.registry.register(_, gauge)) + + sink.registry.register("excluded-metric", gauge) + sink.report() + + val p = new DatagramPacket(new Array[Byte](maxPayloadSize), maxPayloadSize) + socket.receive(p) + + val metricKeys = sink.registry.getGauges(sink.filter).keySet.asScala + + assert(metricKeys.equals(filteredMetricKeys), + "Should contain only metrics matches regex filter") + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org