[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-27 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153321690
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala ---
@@ -17,8 +17,48 @@
 
 package org.apache.spark.metrics.sink
 
-private[spark] trait Sink {
+import java.util.{Locale, Properties}
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.MetricRegistry
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.MetricsSystem
+
+/**
+ * :: DeveloperApi ::
+ * The abstract class of metrics Sink, by achiving the methods and 
registered through metrics
+ * .properties user could register customer metrics Sink into 
MetricsSystem.
+ *
+ * @param properties Properties related this specific Sink, properties are 
read from
+ *   configuration file, user could define their own 
configurations and get
+ *   from this parameter.
+ * @param metricRegistry The MetricRegistry for you to dump the collected 
metrics.
+ */
+@DeveloperApi
+abstract class Sink(properties: Properties, metricRegistry: 
MetricRegistry) {
+
+  protected val pollPeriod = properties.getProperty("period", "10").toInt
+
+  protected val pollUnit = Option(properties.getProperty("unit"))
+.map(s => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)))
+.getOrElse(TimeUnit.SECONDS)
+
+  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+  /**
+   * Start this metrics Sink, this will be called by MetricsSystem. If 
this [[Sink]] is failed to
--- End diff --

fails to start


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-26 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153110760
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
---
@@ -195,18 +196,26 @@ private[spark] class MetricsSystem private (
   val classPath = kv._2.getProperty("class")
   if (null != classPath) {
 try {
-  val sink = Utils.classForName(classPath)
-.getConstructor(classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
-.newInstance(kv._2, registry, securityMgr)
+  val sink = Utils.classForName(classPath).getConstructor(
+classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
+  .newInstance(kv._2, registry, securityMgr)
   if (kv._1 == "servlet") {
 metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
   } else {
 sinks += sink.asInstanceOf[Sink]
   }
 } catch {
-  case e: Exception =>
-logError("Sink class " + classPath + " cannot be instantiated")
-throw e
+  case _: NoSuchMethodException =>
+try {
+  sinks += Utils.classForName(classPath)
--- End diff --

No, not necessary, `MetricsServlet` is a built-in metrics sink which will 
be explicitly added in the above code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153098194
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala ---
@@ -25,27 +25,29 @@ import com.codahale.metrics.{ConsoleReporter, 
MetricRegistry}
 import org.apache.spark.SecurityManager
 import org.apache.spark.metrics.MetricsSystem
 
-private[spark] class ConsoleSink(val property: Properties, val registry: 
MetricRegistry,
-securityMgr: SecurityManager) extends Sink {
+private[spark] class ConsoleSink(
+property: Properties,
+registry: MetricRegistry,
+securityMgr: SecurityManager) extends Sink(property, registry) {
   val CONSOLE_DEFAULT_PERIOD = 10
   val CONSOLE_DEFAULT_UNIT = "SECONDS"
 
   val CONSOLE_KEY_PERIOD = "period"
   val CONSOLE_KEY_UNIT = "unit"
 
-  val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
+  private val pollPeriod = 
Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
--- End diff --

Emmm... These logics seems quite similar among different Sinks, could we 
abstract these logic in `Sink` class, or generate a trait for them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153097744
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
---
@@ -195,18 +196,26 @@ private[spark] class MetricsSystem private (
   val classPath = kv._2.getProperty("class")
   if (null != classPath) {
 try {
-  val sink = Utils.classForName(classPath)
-.getConstructor(classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
-.newInstance(kv._2, registry, securityMgr)
+  val sink = Utils.classForName(classPath).getConstructor(
+classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
+  .newInstance(kv._2, registry, securityMgr)
   if (kv._1 == "servlet") {
 metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
   } else {
 sinks += sink.asInstanceOf[Sink]
   }
 } catch {
-  case e: Exception =>
-logError("Sink class " + classPath + " cannot be instantiated")
-throw e
+  case _: NoSuchMethodException =>
+try {
+  sinks += Utils.classForName(classPath)
--- End diff --

Do we have to handle the case when `kv._1 == "servlet"` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153097850
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala ---
@@ -25,27 +25,29 @@ import com.codahale.metrics.{ConsoleReporter, 
MetricRegistry}
 import org.apache.spark.SecurityManager
 import org.apache.spark.metrics.MetricsSystem
 
-private[spark] class ConsoleSink(val property: Properties, val registry: 
MetricRegistry,
-securityMgr: SecurityManager) extends Sink {
+private[spark] class ConsoleSink(
--- End diff --

nit: add comment for each param here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153097471
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
---
@@ -195,18 +196,26 @@ private[spark] class MetricsSystem private (
   val classPath = kv._2.getProperty("class")
   if (null != classPath) {
 try {
-  val sink = Utils.classForName(classPath)
-.getConstructor(classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
-.newInstance(kv._2, registry, securityMgr)
+  val sink = Utils.classForName(classPath).getConstructor(
+classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
--- End diff --

Why make this format change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153098545
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala ---
@@ -17,8 +17,37 @@
 
 package org.apache.spark.metrics.sink
 
-private[spark] trait Sink {
+import java.util.Properties
+
+import com.codahale.metrics.MetricRegistry
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * The abstract class of metrics Sink, by achiving the methods and 
registered through metrics
+ * .properties user could register customer metrics Sink into 
MetricsSystem.
+ *
+ * @param properties Properties related this specific Sink, properties are 
read from
+ *   configuration file, user could define their own 
configurations and get
+ *   from this parameter.
+ * @param metricRegistry The MetricRegistry for you to dump the collected 
metrics.
+ */
+@DeveloperApi
+abstract class Sink(properties: Properties, metricRegistry: 
MetricRegistry) {
+
+  /**
+   * Start this metrics Sink, this will be called by MetricsSystem
--- End diff --

Do we have to define the behavior when start failed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-24 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153024408
  
--- Diff: project/MimaExcludes.scala ---
@@ -76,7 +76,10 @@ object MimaExcludes {
 
 // [SPARK-14280] Support Scala 2.12
 
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.FutureAction.transformWith"),
-
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.FutureAction.transform")
+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.FutureAction.transform"),
+
+// [SPARK-14151] Expose metrics Source and Sink interface
+  
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.metrics.sink.Sink")
--- End diff --

nit: spacing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-24 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153024461
  
--- Diff: 
external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
 ---
@@ -59,22 +61,22 @@ class GangliaSink(val property: Properties, val 
registry: MetricRegistry,
 throw new Exception("Ganglia sink requires 'port' property.")
   }
 
-  val host = propertyToOption(GANGLIA_KEY_HOST).get
-  val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
-  val ttl = 
propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
-  val dmax = 
propertyToOption(GANGLIA_KEY_DMAX).map(_.toInt).getOrElse(GANGLIA_DEFAULT_DMAX)
-  val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE)
-.map(u => 
GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
-  val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
+  private val host = propertyToOption(GANGLIA_KEY_HOST).get
+  private val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
+  private val ttl = 
propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
+  private val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE)
--- End diff --

`val dmax = 
propertyToOption(GANGLIA_KEY_DMAX).map(_.toInt).getOrElse(GANGLIA_DEFAULT_DMAX)`
 removed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-07-08 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r126284098
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala ---
@@ -50,30 +52,30 @@ private[spark] class GraphiteSink(val property: 
Properties, val registry: Metric
 throw new Exception("Graphite sink requires 'port' property.")
   }
 
-  val host = propertyToOption(GRAPHITE_KEY_HOST).get
-  val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt
+  private val host = propertyToOption(GRAPHITE_KEY_HOST).get
+  private val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt
 
-  val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match {
+  private val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match {
 case Some(s) => s.toInt
 case None => GRAPHITE_DEFAULT_PERIOD
   }
 
-  val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
-case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
+  private val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) 
match {
+case Some(s) => TimeUnit.valueOf(s.toUpperCase())
--- End diff --

Sorry about it. It is just a rebase issue, I will revert it back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-07-08 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r126277653
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala ---
@@ -50,30 +52,30 @@ private[spark] class GraphiteSink(val property: 
Properties, val registry: Metric
 throw new Exception("Graphite sink requires 'port' property.")
   }
 
-  val host = propertyToOption(GRAPHITE_KEY_HOST).get
-  val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt
+  private val host = propertyToOption(GRAPHITE_KEY_HOST).get
+  private val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt
 
-  val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match {
+  private val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match {
 case Some(s) => s.toInt
 case None => GRAPHITE_DEFAULT_PERIOD
   }
 
-  val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
-case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
+  private val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) 
match {
+case Some(s) => TimeUnit.valueOf(s.toUpperCase())
 case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT)
   }
 
-  val prefix = 
propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX)
+  private val prefix = 
propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX)
 
   MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
 
-  val graphite = 
propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase(Locale.ROOT)) match {
+ private val graphite = 
propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase) match {
--- End diff --

Here, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-07-08 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r126277662
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala 
---
@@ -26,29 +26,29 @@ import org.apache.spark.SecurityManager
 import org.apache.spark.metrics.MetricsSystem
 
 private[spark] class Slf4jSink(
-val property: Properties,
-val registry: MetricRegistry,
+property: Properties,
+registry: MetricRegistry,
 securityMgr: SecurityManager)
-  extends Sink {
+  extends Sink(property, registry) {
   val SLF4J_DEFAULT_PERIOD = 10
   val SLF4J_DEFAULT_UNIT = "SECONDS"
 
   val SLF4J_KEY_PERIOD = "period"
   val SLF4J_KEY_UNIT = "unit"
 
-  val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match {
+  private val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) 
match {
 case Some(s) => s.toInt
 case None => SLF4J_DEFAULT_PERIOD
   }
 
-  val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) 
match {
-case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
+  private val pollUnit: TimeUnit = 
Option(property.getProperty(SLF4J_KEY_UNIT)) match {
+case Some(s) => TimeUnit.valueOf(s.toUpperCase())
--- End diff --

Here, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-07-08 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r126277651
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala ---
@@ -50,30 +52,30 @@ private[spark] class GraphiteSink(val property: 
Properties, val registry: Metric
 throw new Exception("Graphite sink requires 'port' property.")
   }
 
-  val host = propertyToOption(GRAPHITE_KEY_HOST).get
-  val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt
+  private val host = propertyToOption(GRAPHITE_KEY_HOST).get
+  private val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt
 
-  val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match {
+  private val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match {
 case Some(s) => s.toInt
 case None => GRAPHITE_DEFAULT_PERIOD
   }
 
-  val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
-case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
+  private val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) 
match {
+case Some(s) => TimeUnit.valueOf(s.toUpperCase())
--- End diff --

Hi, @jerryshao .
Is there any reason to remove Locale.ROOT?
This is added by SPARK-20156 to fix local bug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org