This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d32dee  [SPARK-28475][CORE] Add regex MetricFilter to GraphiteSink
6d32dee is described below

commit 6d32deeecc5a80230158e12982a5c1ea3f70d89d
Author: Nick Karpov <n...@nickkarpov.com>
AuthorDate: Fri Aug 2 17:50:15 2019 +0800

    [SPARK-28475][CORE] Add regex MetricFilter to GraphiteSink
    
    ## What changes were proposed in this pull request?
    
    Today all registered metric sources are reported to GraphiteSink with no 
filtering mechanism, although the codahale project does support it.
    
    GraphiteReporter (ScheduledReporter) from the codahale project requires you 
implement and supply the MetricFilter interface (there is only a single 
implementation by default in the codahale project, MetricFilter.ALL).
    
    Propose to add an additional regex config to match and filter metrics to 
the GraphiteSink
    
    ## How was this patch tested?
    
    Included a GraphiteSinkSuite that tests:
    
    1. Absence of regex filter (existing default behavior maintained)
    2. Presence of `regex=<regexexpr>` correctly filters metric keys
    
    Closes #25232 from nkarpov/graphite_regex.
    
    Authored-by: Nick Karpov <n...@nickkarpov.com>
    Signed-off-by: jerryshao <jerrys...@tencent.com>
---
 conf/metrics.properties.template                   |  1 +
 .../apache/spark/metrics/sink/GraphiteSink.scala   | 13 +++-
 .../spark/metrics/sink/GraphiteSinkSuite.scala     | 84 ++++++++++++++++++++++
 docs/monitoring.md                                 |  1 +
 4 files changed, 98 insertions(+), 1 deletion(-)

diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index 23407e1..da0b06d 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -121,6 +121,7 @@
 #   unit      seconds       Unit of the poll period
 #   prefix    EMPTY STRING  Prefix to prepend to every metric's name
 #   protocol  tcp           Protocol ("tcp" or "udp") to use
+#   regex     NONE          Optional filter to send only metrics matching this 
regex string
 
 # org.apache.spark.metrics.sink.StatsdSink
 #   Name:     Default:      Description:
diff --git 
a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala 
b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
index 21b4dfb..05d553e 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
@@ -20,7 +20,7 @@ package org.apache.spark.metrics.sink
 import java.util.{Locale, Properties}
 import java.util.concurrent.TimeUnit
 
-import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
 import com.codahale.metrics.graphite.{Graphite, GraphiteReporter, GraphiteUDP}
 
 import org.apache.spark.SecurityManager
@@ -38,6 +38,7 @@ private[spark] class GraphiteSink(val property: Properties, 
val registry: Metric
   val GRAPHITE_KEY_UNIT = "unit"
   val GRAPHITE_KEY_PREFIX = "prefix"
   val GRAPHITE_KEY_PROTOCOL = "protocol"
+  val GRAPHITE_KEY_REGEX = "regex"
 
   def propertyToOption(prop: String): Option[String] = 
Option(property.getProperty(prop))
 
@@ -72,10 +73,20 @@ private[spark] class GraphiteSink(val property: Properties, 
val registry: Metric
     case Some(p) => throw new Exception(s"Invalid Graphite protocol: $p")
   }
 
+  val filter = propertyToOption(GRAPHITE_KEY_REGEX) match {
+    case Some(pattern) => new MetricFilter() {
+      override def matches(name: String, metric: Metric): Boolean = {
+        pattern.r.findFirstMatchIn(name).isDefined
+      }
+    }
+    case None => MetricFilter.ALL
+  }
+
   val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry)
       .convertDurationsTo(TimeUnit.MILLISECONDS)
       .convertRatesTo(TimeUnit.SECONDS)
       .prefixedWith(prefix)
+      .filter(filter)
       .build(graphite)
 
   override def start() {
diff --git 
a/core/src/test/scala/org/apache/spark/metrics/sink/GraphiteSinkSuite.scala 
b/core/src/test/scala/org/apache/spark/metrics/sink/GraphiteSinkSuite.scala
new file mode 100644
index 0000000..2369218
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/metrics/sink/GraphiteSinkSuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics._
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+
+class GraphiteSinkSuite extends SparkFunSuite {
+
+  test("GraphiteSink with default MetricsFilter") {
+    val props = new Properties
+    props.put("host", "127.0.0.1")
+    props.put("port", "54321")
+    val registry = new MetricRegistry
+    val securityMgr = new SecurityManager(new SparkConf(false))
+
+    val sink = new GraphiteSink(props, registry, securityMgr)
+
+    val gauge = new Gauge[Double] {
+      override def getValue: Double = 1.23
+    }
+    sink.registry.register("gauge", gauge)
+    sink.registry.register("anothergauge", gauge)
+    sink.registry.register("streaminggauge", gauge)
+
+    val metricKeys = sink.registry.getGauges(sink.filter).keySet.asScala
+
+    assert(metricKeys.equals(Set("gauge", "anothergauge", "streaminggauge")),
+      "Should contain all metrics registered")
+  }
+
+  test("GraphiteSink with regex MetricsFilter") {
+    val props = new Properties
+    props.put("host", "127.0.0.1")
+    props.put("port", "54321")
+    props.put("regex", "local-[0-9]+.driver.(CodeGenerator|BlockManager)")
+    val registry = new MetricRegistry
+    val securityMgr = new SecurityManager(new SparkConf(false))
+
+    val sink = new GraphiteSink(props, registry, securityMgr)
+
+    val gauge = new Gauge[Double] {
+      override def getValue: Double = 1.23
+    }
+    sink.registry.register("gauge", gauge)
+    sink.registry.register("anothergauge", gauge)
+    sink.registry.register("streaminggauge", gauge)
+    
sink.registry.register("local-1563838109260.driver.CodeGenerator.generatedMethodSize",
 gauge)
+    
sink.registry.register("local-1563838109260.driver.BlockManager.disk.diskSpaceUsed_MB",
 gauge)
+    
sink.registry.register("local-1563813796998.driver.spark.streaming.nicklocal.latency",
 gauge)
+    sink.registry.register("myapp.driver.CodeGenerator.generatedMethodSize", 
gauge)
+    sink.registry.register("myapp.driver.BlockManager.disk.diskSpaceUsed_MB", 
gauge)
+
+    val metricKeys = sink.registry.getGauges(sink.filter).keySet.asScala
+
+    val filteredMetricKeys = Set(
+      "local-1563838109260.driver.CodeGenerator.generatedMethodSize",
+      "local-1563838109260.driver.BlockManager.disk.diskSpaceUsed_MB"
+    )
+
+    assert(metricKeys.equals(filteredMetricKeys),
+      "Should contain only metrics matches regex filter")
+  }
+}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index e1ca6c4..24743b2 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -876,6 +876,7 @@ This example shows a list of Spark configuration parameters 
for a Graphite sink:
 "spark.metrics.conf.*.sink.graphite.period"=10
 "spark.metrics.conf.*.sink.graphite.unit"=seconds
 "spark.metrics.conf.*.sink.graphite.prefix"="optional_prefix"
+"spark.metrics.conf.*.sink.graphite.regex"="optional_regex_to_send_matching_metrics"
 ```
 
 Default values of the Spark metrics configuration are as follows:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to