Github user smurakozi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r166892802
  
    --- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.metrics.sink
    +
    +import java.net.URI
    +import java.util
    +import java.util.Properties
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.JavaConverters._
    +import scala.util.Try
    +
    +import com.codahale.metrics._
    +import io.prometheus.client.CollectorRegistry
    +import io.prometheus.client.dropwizard.DropwizardExports
    +
    +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, 
SparkEnv}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config.METRICS_NAMESPACE
    +import org.apache.spark.metrics.MetricsSystem
    +import 
org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp
    +
    +
    +private[spark] class PrometheusSink(
    +                                     val property: Properties,
    +                                     val registry: MetricRegistry,
    +                                     securityMgr: SecurityManager)
    +  extends Sink with Logging {
    +
    +  protected class Reporter(registry: MetricRegistry)
    +    extends ScheduledReporter(
    +      registry,
    +      "prometheus-reporter",
    +      MetricFilter.ALL,
    +      TimeUnit.SECONDS,
    +      TimeUnit.MILLISECONDS) {
    +
    +    val defaultSparkConf: SparkConf = new SparkConf(true)
    +
    +    override def report(
    +                         gauges: util.SortedMap[String, Gauge[_]],
    +                         counters: util.SortedMap[String, Counter],
    +                         histograms: util.SortedMap[String, Histogram],
    +                         meters: util.SortedMap[String, Meter],
    +                         timers: util.SortedMap[String, Timer]): Unit = {
    +
    +      // SparkEnv may become available only after metrics sink creation 
thus retrieving
    +      // SparkConf from spark env here and not during the 
creation/initialisation of PrometheusSink.
    +      val sparkConf: SparkConf = 
Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf)
    +
    +      val metricsNamespace: Option[String] = 
sparkConf.get(METRICS_NAMESPACE)
    +      val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id")
    +      val executorId: Option[String] = 
sparkConf.getOption("spark.executor.id")
    +
    +      logInfo(s"metricsNamespace=$metricsNamespace, 
sparkAppId=$sparkAppId, " +
    +        s"executorId=$executorId")
    +
    +      val role: String = (sparkAppId, executorId) match {
    +        case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver"
    +        case (Some(_), Some(_)) => "executor"
    +        case _ => "shuffle"
    +      }
    +
    +      val job: String = role match {
    +        case "driver" => metricsNamespace.getOrElse(sparkAppId.get)
    +        case "executor" => metricsNamespace.getOrElse(sparkAppId.get)
    +        case _ => metricsNamespace.getOrElse("shuffle")
    +      }
    +      logInfo(s"role=$role, job=$job")
    +
    +      val groupingKey: Map[String, String] = (role, executorId) match {
    +        case ("driver", _) => Map("role" -> role)
    +        case ("executor", Some(id)) => Map ("role" -> role, "number" -> id)
    +        case _ => Map("role" -> role)
    +      }
    +
    +
    --- End diff --
    
    Nit: extra line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to