[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18083


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r121031573
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
@@ -138,6 +162,44 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
 assert(drained)
   }
 
+  test("metrics for dropped listener events") {
+val bus = new LiveListenerBus(new 
SparkConf().set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1))
+
+val listenerStarted = new Semaphore(0)
+val listenerWait = new Semaphore(0)
+
+bus.addListener(new SparkListener {
+  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+listenerStarted.release()
+listenerWait.acquire()
+  }
+})
+
+bus.start(mockSparkContext, mockMetricsSystem)
+
+// Post a message to the listener bus and wait for processing to begin:
+bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+listenerStarted.acquire()
--- End diff --

This should be fine:

- If this code runs before `listenerStarted.release()` then it will block 
until `listenerStarted.release()` is hit.
- The listener will block in `listenerWait.acquire()` until we call 
`listenerWait.release()` further down in this method.

This synchronization pattern is already used elsewhere in this suite in 
https://github.com/JoshRosen/spark/blob/76b669ca6eb35a0cce4291702baa5d1f60adb467/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala#L113



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r121031515
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
@@ -138,6 +162,44 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
 assert(drained)
   }
 
+  test("metrics for dropped listener events") {
+val bus = new LiveListenerBus(new 
SparkConf().set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1))
+
+val listenerStarted = new Semaphore(0)
+val listenerWait = new Semaphore(0)
+
+bus.addListener(new SparkListener {
+  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+listenerStarted.release()
+listenerWait.acquire()
+  }
+})
+
+bus.start(mockSparkContext, mockMetricsSystem)
+
+// Post a message to the listener bus and wait for processing to begin:
+bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+listenerStarted.acquire()
--- End diff --

actually the order doesn't matter, if `release` is called first, `acquire` 
won't block


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r121021258
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
@@ -138,6 +162,44 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
 assert(drained)
   }
 
+  test("metrics for dropped listener events") {
+val bus = new LiveListenerBus(new 
SparkConf().set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1))
+
+val listenerStarted = new Semaphore(0)
+val listenerWait = new Semaphore(0)
+
+bus.addListener(new SparkListener {
+  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+listenerStarted.release()
+listenerWait.acquire()
+  }
+})
+
+bus.start(mockSparkContext, mockMetricsSystem)
+
+// Post a message to the listener bus and wait for processing to begin:
+bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+listenerStarted.acquire()
--- End diff --

is this deterministic that this line will be run before 
`listenerStarted.release()` in `onJobEnd`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r121003615
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -217,3 +243,61 @@ private[spark] object LiveListenerBus {
   val name = "SparkListenerBus"
 }
 
+private[spark]
+class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source 
with Logging {
+  override val sourceName: String = "LiveListenerBus"
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * The total number of events posted to the LiveListenerBus. This is a 
count of the total number
+   * of events which have been produced by the application and sent to the 
listener bus, NOT a
+   * count of the number of events which have been processed and delivered 
to listeners (or dropped
+   * without being delivered).
+   */
+  val numEventsPosted: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
+
+  /**
+   * The total number of events that were dropped without being delivered 
to listeners.
+   */
+  val numDroppedEvents: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
+
+  /**
+   * The amount of time taken to post a single event to all listeners.
+   */
+  val eventProcessingTime: Timer = 
metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
+
+  /**
+   * The number of messages waiting in the queue.
+   */
+  val queueSize: Gauge[Int] = {
+metricRegistry.register(MetricRegistry.name("queueSize"), new 
Gauge[Int]{
+  override def getValue: Int = queue.size()
+})
+  }
+
+  // Guarded by synchronization.
+  private val perListenerClassTimers = mutable.Map[String, Timer]()
+
+  /**
+   * Returns a timer tracking the processing time of the given listener 
class.
+   * events processed by that listener. This method is thread-safe.
+   */
+  def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): 
Option[Timer] = {
+synchronized {
+  val className = cls.getName
+  val maxTimed = 128
--- End diff --

*Shrug*. Maybe, but note that this would be 128 _separate listener 
classes_. Let me put in an undocumented configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-08 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r120999648
  
--- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
@@ -23,29 +23,41 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
+import com.codahale.metrics.Timer
+
 import org.apache.spark.internal.Logging
 
 /**
  * An event bus which posts events to its listeners.
  */
 private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
 
+  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, 
Timer)]
+
   // Marked `private[spark]` for access in tests.
-  private[spark] val listeners = new CopyOnWriteArrayList[L]
+  private[spark] def listeners = 
listenersPlusTimers.asScala.map(_._1).asJava
+
+  /**
+   * Returns a CodaHale metrics Timer for measuring the listener's event 
processing time.
+   * This method is intended to be overridden by subclasses.
+   */
+  protected def getTimer(listener: L): Option[Timer] = None
 
   /**
* Add a listener to listen events. This method is thread-safe and can 
be called in any thread.
*/
   final def addListener(listener: L): Unit = {
-listeners.add(listener)
+listenersPlusTimers.add((listener, getTimer(listener).orNull))
   }
 
   /**
* Remove a listener and it won't receive any events. This method is 
thread-safe and can be called
* in any thread.
*/
   final def removeListener(listener: L): Unit = {
-listeners.remove(listener)
+listenersPlusTimers.asScala.find(_._1 eq listener).foreach { 
listenerAndTimer =>
+  listenersPlusTimers.remove(listenerAndTimer)
--- End diff --

I think the only reason that `CopyOnWriteArrayList` was used was for 
thread-safety and fast performance for readers interleaved with very rare 
mutations / writes. If we were to replace the array list then we'd need to add 
a `synchronized` to guard the `listenersPlusTimers` field itself.

Given the workload and access patterns here, I'm not sure that it's worth 
it to attempt to optimize this `removeListener()` method any further.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r120973638
  
--- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
@@ -23,29 +23,41 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
+import com.codahale.metrics.Timer
+
 import org.apache.spark.internal.Logging
 
 /**
  * An event bus which posts events to its listeners.
  */
 private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
 
+  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, 
Timer)]
+
   // Marked `private[spark]` for access in tests.
-  private[spark] val listeners = new CopyOnWriteArrayList[L]
+  private[spark] def listeners = 
listenersPlusTimers.asScala.map(_._1).asJava
+
+  /**
+   * Returns a CodaHale metrics Timer for measuring the listener's event 
processing time.
+   * This method is intended to be overridden by subclasses.
+   */
+  protected def getTimer(listener: L): Option[Timer] = None
 
   /**
* Add a listener to listen events. This method is thread-safe and can 
be called in any thread.
*/
   final def addListener(listener: L): Unit = {
-listeners.add(listener)
+listenersPlusTimers.add((listener, getTimer(listener).orNull))
   }
 
   /**
* Remove a listener and it won't receive any events. This method is 
thread-safe and can be called
* in any thread.
*/
   final def removeListener(listener: L): Unit = {
-listeners.remove(listener)
+listenersPlusTimers.asScala.find(_._1 eq listener).foreach { 
listenerAndTimer =>
+  listenersPlusTimers.remove(listenerAndTimer)
--- End diff --

since this is a `CopyOnWriteArrayList`, shall we just do a filter and 
create a new array?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r120972760
  
--- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
@@ -23,29 +23,41 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
+import com.codahale.metrics.Timer
+
 import org.apache.spark.internal.Logging
 
 /**
  * An event bus which posts events to its listeners.
  */
 private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
 
+  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, 
Timer)]
--- End diff --

shall we use `Option[Timmer]` as value type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r120972548
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -217,3 +243,61 @@ private[spark] object LiveListenerBus {
   val name = "SparkListenerBus"
 }
 
+private[spark]
+class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source 
with Logging {
+  override val sourceName: String = "LiveListenerBus"
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * The total number of events posted to the LiveListenerBus. This is a 
count of the total number
+   * of events which have been produced by the application and sent to the 
listener bus, NOT a
+   * count of the number of events which have been processed and delivered 
to listeners (or dropped
+   * without being delivered).
+   */
+  val numEventsPosted: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
+
+  /**
+   * The total number of events that were dropped without being delivered 
to listeners.
+   */
+  val numDroppedEvents: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
+
+  /**
+   * The amount of time taken to post a single event to all listeners.
+   */
+  val eventProcessingTime: Timer = 
metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
+
+  /**
+   * The number of messages waiting in the queue.
+   */
+  val queueSize: Gauge[Int] = {
+metricRegistry.register(MetricRegistry.name("queueSize"), new 
Gauge[Int]{
+  override def getValue: Int = queue.size()
+})
+  }
+
+  // Guarded by synchronization.
+  private val perListenerClassTimers = mutable.Map[String, Timer]()
+
+  /**
+   * Returns a timer tracking the processing time of the given listener 
class.
+   * events processed by that listener. This method is thread-safe.
+   */
+  def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): 
Option[Timer] = {
--- End diff --

why not just pass class name as a parameter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r120972483
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -217,3 +243,61 @@ private[spark] object LiveListenerBus {
   val name = "SparkListenerBus"
 }
 
+private[spark]
+class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source 
with Logging {
+  override val sourceName: String = "LiveListenerBus"
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * The total number of events posted to the LiveListenerBus. This is a 
count of the total number
+   * of events which have been produced by the application and sent to the 
listener bus, NOT a
+   * count of the number of events which have been processed and delivered 
to listeners (or dropped
+   * without being delivered).
+   */
+  val numEventsPosted: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
+
+  /**
+   * The total number of events that were dropped without being delivered 
to listeners.
+   */
+  val numDroppedEvents: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
+
+  /**
+   * The amount of time taken to post a single event to all listeners.
+   */
+  val eventProcessingTime: Timer = 
metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
+
+  /**
+   * The number of messages waiting in the queue.
+   */
+  val queueSize: Gauge[Int] = {
+metricRegistry.register(MetricRegistry.name("queueSize"), new 
Gauge[Int]{
+  override def getValue: Int = queue.size()
+})
+  }
+
+  // Guarded by synchronization.
+  private val perListenerClassTimers = mutable.Map[String, Timer]()
+
+  /**
+   * Returns a timer tracking the processing time of the given listener 
class.
+   * events processed by that listener. This method is thread-safe.
+   */
+  def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): 
Option[Timer] = {
+synchronized {
+  val className = cls.getName
+  val maxTimed = 128
--- End diff --

should this be configurable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-07 Thread bOOm-X
Github user bOOm-X commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r120619672
  
--- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
@@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] 
extends Logging {
 // JavaConverters can create a JIterableWrapper if we use asScala.
 // However, this method will be called frequently. To avoid the 
wrapper cost, here we use
 // Java Iterator directly.
-val iter = listeners.iterator
+val iter = listenersPlusTimers.iterator
 while (iter.hasNext) {
-  val listener = iter.next()
+  val listenerAndMaybeTimer = iter.next()
+  val listener = listenerAndMaybeTimer._1
+  val maybeTimer = listenerAndMaybeTimer._2
+  var maybeTimerContext = if (maybeTimer != null) {
--- End diff --

Indeed ! But you can put the option in the collection listenersPlusTimers 
(instead of doing a orNull when you create the timer) and so you can use it 
without having to recreate one each time in the postToAll method 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-05 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r120197916
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -111,6 +112,15 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
 }
   }
 
+  override protected def createTimer(listener: SparkListenerInterface): 
Option[Timer] = {
+if (listener.getClass.getName.startsWith("org.apache.spark")) {
--- End diff --

This is accounted for in a later commit. All listeners are now captured.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-05 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r120197363
  
--- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
@@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] 
extends Logging {
 // JavaConverters can create a JIterableWrapper if we use asScala.
 // However, this method will be called frequently. To avoid the 
wrapper cost, here we use
 // Java Iterator directly.
-val iter = listeners.iterator
+val iter = listenersPlusTimers.iterator
 while (iter.hasNext) {
-  val listener = iter.next()
+  val listenerAndMaybeTimer = iter.next()
+  val listener = listenerAndMaybeTimer._1
+  val maybeTimer = listenerAndMaybeTimer._2
+  var maybeTimerContext = if (maybeTimer != null) {
--- End diff --

Actually, there is a cost here: allocating a new Option on every 
`postToAll` is going to create more allocations and method calls. Thus I'm 
going to leave this unchanged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-05 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r120194864
  
--- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
@@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] 
extends Logging {
 // JavaConverters can create a JIterableWrapper if we use asScala.
 // However, this method will be called frequently. To avoid the 
wrapper cost, here we use
 // Java Iterator directly.
-val iter = listeners.iterator
+val iter = listenersPlusTimers.iterator
 while (iter.hasNext) {
-  val listener = iter.next()
+  val listenerAndMaybeTimer = iter.next()
+  val listener = listenerAndMaybeTimer._1
+  val maybeTimer = listenerAndMaybeTimer._2
+  var maybeTimerContext = if (maybeTimer != null) {
--- End diff --

Yeah, this is just premature optimization. I'll undo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-01 Thread bOOm-X
Github user bOOm-X commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r119585097
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -111,6 +112,15 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
 }
   }
 
+  override protected def createTimer(listener: SparkListenerInterface): 
Option[Timer] = {
+if (listener.getClass.getName.startsWith("org.apache.spark")) {
--- End diff --

Why creating listener just for "spark" listener ? We may want timings even 
for "third-party" listeners. It is even more important in my mind, for these 
listeners because they can be much less optimized and so bring a huge 
performance  penalty 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-01 Thread bOOm-X
Github user bOOm-X commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r119583821
  
--- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
@@ -23,29 +23,41 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
+import com.codahale.metrics.Timer
+
 import org.apache.spark.internal.Logging
 
 /**
  * An event bus which posts events to its listeners.
  */
 private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
 
+  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, 
Timer)]
+
   // Marked `private[spark]` for access in tests.
-  private[spark] val listeners = new CopyOnWriteArrayList[L]
+  private[spark] def listeners = 
listenersPlusTimers.asScala.map(_._1).asJava
+
+  /**
+   * Returns a CodaHale metrics Timer for measuring the listener's event 
processing time.
+   * This method is intended to be overridden by subclasses.
+   */
+  protected def createTimer(listener: L): Option[Timer] = None
 
   /**
* Add a listener to listen events. This method is thread-safe and can 
be called in any thread.
*/
   final def addListener(listener: L): Unit = {
-listeners.add(listener)
+listenersPlusTimers.add((listener, createTimer(listener).orNull))
--- End diff --

Why not keeping the option in the collection instead of putting null ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-01 Thread bOOm-X
Github user bOOm-X commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r119584011
  
--- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
@@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] 
extends Logging {
 // JavaConverters can create a JIterableWrapper if we use asScala.
 // However, this method will be called frequently. To avoid the 
wrapper cost, here we use
 // Java Iterator directly.
-val iter = listeners.iterator
+val iter = listenersPlusTimers.iterator
 while (iter.hasNext) {
-  val listener = iter.next()
+  val listenerAndMaybeTimer = iter.next()
+  val listener = listenerAndMaybeTimer._1
+  val maybeTimer = listenerAndMaybeTimer._2
+  var maybeTimerContext = if (maybeTimer != null) {
+maybeTimer.time()
+  } else {
+null
+  }
   try {
 doPostEvent(listener, event)
   } catch {
 case NonFatal(e) =>
   logError(s"Listener ${Utils.getFormattedClassName(listener)} 
threw an exception", e)
+  } finally {
+if (maybeTimerContext != null) {
--- End diff --

Same. simpler with an option


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-06-01 Thread bOOm-X
Github user bOOm-X commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r119583956
  
--- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---
@@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] 
extends Logging {
 // JavaConverters can create a JIterableWrapper if we use asScala.
 // However, this method will be called frequently. To avoid the 
wrapper cost, here we use
 // Java Iterator directly.
-val iter = listeners.iterator
+val iter = listenersPlusTimers.iterator
 while (iter.hasNext) {
-  val listener = iter.next()
+  val listenerAndMaybeTimer = iter.next()
+  val listener = listenerAndMaybeTimer._1
+  val maybeTimer = listenerAndMaybeTimer._2
+  var maybeTimerContext = if (maybeTimer != null) {
--- End diff --

With an option (instead of null value) it would be much simpler


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-25 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118629022
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -226,3 +246,61 @@ private[spark] object LiveListenerBus {
   val name = "SparkListenerBus"
 }
 
+private[spark]
+class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source 
with Logging {
+  override val sourceName: String = "LiveListenerBus"
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * The total number of events posted to the LiveListenerBus. This is a 
count of the total number
+   * of events which have been produced by the application and sent to the 
listener bus, NOT a
+   * count of the number of events which have been processed and delivered 
to listeners (or dropped
+   * without being delivered).
+   */
+  val numEventsPosted: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
+
+  /**
+   * The total number of events that were dropped without being delivered 
to listeners.
+   */
+  val numDroppedEvents: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
+
+  /**
+   * The amount of time taken to post a single event to all listeners.
+   */
+  val eventProcessingTime: Timer = 
metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
+
+  /**
+   * The number of messages waiting in the queue.
+   */
+  val queueSize: Gauge[Int] = {
+metricRegistry.register(MetricRegistry.name("queueSize"), new 
Gauge[Int]{
+  override def getValue: Int = queue.size()
+})
+  }
+
+  // Guarded by synchronization.
+  private val perListenerClassTimers = mutable.Map[String, Timer]()
+
+  /**
+   * Returns a timer tracking the processing time of the given listener 
class.
+   * events processed by that listener. This method is thread-safe.
+   */
+  def getTimerForListener(listener: SparkListenerInterface): Option[Timer] 
= {
+synchronized {
+  val className = listener.getClass.getName
--- End diff --

I'll update the PR description to discuss this per-listener metric.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-25 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118628985
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -226,3 +246,61 @@ private[spark] object LiveListenerBus {
   val name = "SparkListenerBus"
 }
 
+private[spark]
+class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source 
with Logging {
+  override val sourceName: String = "LiveListenerBus"
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * The total number of events posted to the LiveListenerBus. This is a 
count of the total number
+   * of events which have been produced by the application and sent to the 
listener bus, NOT a
+   * count of the number of events which have been processed and delivered 
to listeners (or dropped
+   * without being delivered).
+   */
+  val numEventsPosted: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
+
+  /**
+   * The total number of events that were dropped without being delivered 
to listeners.
+   */
+  val numDroppedEvents: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
+
+  /**
+   * The amount of time taken to post a single event to all listeners.
+   */
+  val eventProcessingTime: Timer = 
metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
+
+  /**
+   * The number of messages waiting in the queue.
+   */
+  val queueSize: Gauge[Int] = {
+metricRegistry.register(MetricRegistry.name("queueSize"), new 
Gauge[Int]{
+  override def getValue: Int = queue.size()
+})
+  }
+
+  // Guarded by synchronization.
+  private val perListenerClassTimers = mutable.Map[String, Timer]()
+
+  /**
+   * Returns a timer tracking the processing time of the given listener 
class.
+   * events processed by that listener. This method is thread-safe.
+   */
+  def getTimerForListener(listener: SparkListenerInterface): Option[Timer] 
= {
+synchronized {
+  val className = listener.getClass.getName
--- End diff --

Yes, but my goal with these metrics is to be able to identify which 
listeners are causing performance problems and for that purpose it's more 
useful to group listeners by class rather than to instrument individual 
listeners. Most (all?) of Spark's internal listeners have one instance per 
driver / SparkContext, so in practice keeping track of stats on a per-instance 
basis wouldn't actually be a meaningful difference in typical cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118623202
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -226,3 +246,61 @@ private[spark] object LiveListenerBus {
   val name = "SparkListenerBus"
 }
 
+private[spark]
+class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source 
with Logging {
+  override val sourceName: String = "LiveListenerBus"
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * The total number of events posted to the LiveListenerBus. This is a 
count of the total number
+   * of events which have been produced by the application and sent to the 
listener bus, NOT a
+   * count of the number of events which have been processed and delivered 
to listeners (or dropped
+   * without being delivered).
+   */
+  val numEventsPosted: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
+
+  /**
+   * The total number of events that were dropped without being delivered 
to listeners.
+   */
+  val numDroppedEvents: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
+
+  /**
+   * The amount of time taken to post a single event to all listeners.
+   */
+  val eventProcessingTime: Timer = 
metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
+
+  /**
+   * The number of messages waiting in the queue.
+   */
+  val queueSize: Gauge[Int] = {
+metricRegistry.register(MetricRegistry.name("queueSize"), new 
Gauge[Int]{
+  override def getValue: Int = queue.size()
+})
+  }
+
+  // Guarded by synchronization.
+  private val perListenerClassTimers = mutable.Map[String, Timer]()
+
+  /**
+   * Returns a timer tracking the processing time of the given listener 
class.
+   * events processed by that listener. This method is thread-safe.
+   */
+  def getTimerForListener(listener: SparkListenerInterface): Option[Timer] 
= {
+synchronized {
+  val className = listener.getClass.getName
--- End diff --

is it possible that users register the same listener twice? Then the class 
name may not be a good identifier for listeners. I think this is the main 
problem of having listener-wise metrics, how to identify each listener?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118434907
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -226,3 +240,34 @@ private[spark] object LiveListenerBus {
   val name = "SparkListenerBus"
 }
 
+private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) 
extends Source {
+  override val sourceName: String = "LiveListenerBus"
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * The total number of events posted to the LiveListenerBus. This counts 
the number of times
+   * that `post()` is called, which might be less than the total number of 
events processed in
+   * case events are dropped.
+   */
+  val numEventsReceived: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsReceived"))
+
+  /**
+   * The total number of events that were dropped without being delivered 
to listeners.
+   */
+  val numDroppedEvents: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
+
+  /**
+   * The amount of time taken to post a single event to all listeners.
+   */
+  val eventProcessingTime: Timer = 
metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
+
+  /**
+   * The number of of messages waiting in the queue.
+   */
+  val queueSize: Gauge[Int] = {
--- End diff --

ah i see


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-25 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118432570
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val 
sparkContext: SparkContext) extends Spa
   logError(s"$name has already stopped! Dropping event $event")
   return
 }
+metrics.numEventsReceived.inc()
 val eventAdded = eventQueue.offer(event)
 if (eventAdded) {
   eventLock.release()
 } else {
   onDropEvent(event)
+  metrics.numDroppedEvents.inc()
--- End diff --

Sure, will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-25 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118432135
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -226,3 +240,34 @@ private[spark] object LiveListenerBus {
   val name = "SparkListenerBus"
 }
 
+private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) 
extends Source {
+  override val sourceName: String = "LiveListenerBus"
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * The total number of events posted to the LiveListenerBus. This counts 
the number of times
+   * that `post()` is called, which might be less than the total number of 
events processed in
+   * case events are dropped.
+   */
+  val numEventsReceived: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsReceived"))
+
+  /**
+   * The total number of events that were dropped without being delivered 
to listeners.
+   */
+  val numDroppedEvents: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
+
+  /**
+   * The amount of time taken to post a single event to all listeners.
+   */
+  val eventProcessingTime: Timer = 
metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
+
+  /**
+   * The number of of messages waiting in the queue.
+   */
+  val queueSize: Gauge[Int] = {
--- End diff --

It's the queue's _capacity_ that's fixed. In 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html
 size refers to the number of items currently in the queue, whereas capacity 
refers to the maximum number of items that the queue can hold. I think the 
`spark.scheduler.listenerbus.eventqueue.size` configuration is confusingly 
named.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-25 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118431897
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -226,3 +240,34 @@ private[spark] object LiveListenerBus {
   val name = "SparkListenerBus"
 }
 
+private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) 
extends Source {
+  override val sourceName: String = "LiveListenerBus"
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * The total number of events posted to the LiveListenerBus. This counts 
the number of times
+   * that `post()` is called, which might be less than the total number of 
events processed in
+   * case events are dropped.
--- End diff --

Yes. I was perhaps wasn't explicit enough in the comment, so I'll reword it 
or just drop the second confusing half.

Is it clearer if I say

> This counts the number of times that `post()` has been called called, not 
the total number of events that have completed processing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-25 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118431688
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val 
sparkContext: SparkContext) extends Spa
   logError(s"$name has already stopped! Dropping event $event")
   return
 }
+metrics.numEventsReceived.inc()
--- End diff --

Yes. My idea was to have a counter which is incremented whenever an event 
is received, regardless of how it ends up being processed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-25 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118431597
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -33,25 +37,24 @@ import org.apache.spark.util.Utils
  * has started will events be actually propagated to all attached 
listeners. This listener bus
  * is stopped when `stop()` is called, and it will drop further events 
after stopping.
  */
-private[spark] class LiveListenerBus(val sparkContext: SparkContext) 
extends SparkListenerBus {
+private[spark] class LiveListenerBus(conf: SparkConf) extends 
SparkListenerBus {
 
   self =>
 
   import LiveListenerBus._
 
+  private var sparkContext: SparkContext = _
+
   // Cap the capacity of the event queue so we get an explicit error 
(rather than
   // an OOM exception) if it's perpetually being added to more quickly 
than it's being drained.
-  private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
-  private lazy val eventQueue = new 
LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
-
-  private def validateAndGetQueueSize(): Int = {
-val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
-if (queueSize <= 0) {
-  throw new 
SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
-}
-queueSize
+  private val eventQueue = {
+val capacity = conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
+require(capacity > 0, s"${LISTENER_BUS_EVENT_QUEUE_SIZE.key} must be > 
0!")
--- End diff --

Nice, I didn't know about that. I'll move it in my next update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118413353
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val 
sparkContext: SparkContext) extends Spa
   logError(s"$name has already stopped! Dropping event $event")
   return
 }
+metrics.numEventsReceived.inc()
 val eventAdded = eventQueue.offer(event)
 if (eventAdded) {
   eventLock.release()
 } else {
   onDropEvent(event)
+  metrics.numDroppedEvents.inc()
--- End diff --

is it better to move this to `onDropEvent`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118413314
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val 
sparkContext: SparkContext) extends Spa
   logError(s"$name has already stopped! Dropping event $event")
   return
 }
+metrics.numEventsReceived.inc()
--- End diff --

here we also count dropped events?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118413299
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -226,3 +240,34 @@ private[spark] object LiveListenerBus {
   val name = "SparkListenerBus"
 }
 
+private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) 
extends Source {
+  override val sourceName: String = "LiveListenerBus"
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * The total number of events posted to the LiveListenerBus. This counts 
the number of times
+   * that `post()` is called, which might be less than the total number of 
events processed in
+   * case events are dropped.
--- End diff --

according to the code, we also count dropped events, isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118413115
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -226,3 +240,34 @@ private[spark] object LiveListenerBus {
   val name = "SparkListenerBus"
 }
 
+private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) 
extends Source {
+  override val sourceName: String = "LiveListenerBus"
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * The total number of events posted to the LiveListenerBus. This counts 
the number of times
+   * that `post()` is called, which might be less than the total number of 
events processed in
+   * case events are dropped.
+   */
+  val numEventsReceived: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsReceived"))
+
+  /**
+   * The total number of events that were dropped without being delivered 
to listeners.
+   */
+  val numDroppedEvents: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
+
+  /**
+   * The amount of time taken to post a single event to all listeners.
+   */
+  val eventProcessingTime: Timer = 
metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
+
+  /**
+   * The number of of messages waiting in the queue.
+   */
+  val queueSize: Gauge[Int] = {
--- End diff --

do we need this metric? Users can easily get it by looking at the 
`spark.scheduler.listenerbus.eventqueue.size` config.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118413024
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -226,3 +240,34 @@ private[spark] object LiveListenerBus {
   val name = "SparkListenerBus"
 }
 
+private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) 
extends Source {
+  override val sourceName: String = "LiveListenerBus"
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  /**
+   * The total number of events posted to the LiveListenerBus. This counts 
the number of times
+   * that `post()` is called, which might be less than the total number of 
events processed in
+   * case events are dropped.
+   */
+  val numEventsReceived: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsReceived"))
+
+  /**
+   * The total number of events that were dropped without being delivered 
to listeners.
+   */
+  val numDroppedEvents: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
+
+  /**
+   * The amount of time taken to post a single event to all listeners.
+   */
+  val eventProcessingTime: Timer = 
metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
+
+  /**
+   * The number of of messages waiting in the queue.
--- End diff --

nit: double `of` here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18083#discussion_r118412901
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -33,25 +37,24 @@ import org.apache.spark.util.Utils
  * has started will events be actually propagated to all attached 
listeners. This listener bus
  * is stopped when `stop()` is called, and it will drop further events 
after stopping.
  */
-private[spark] class LiveListenerBus(val sparkContext: SparkContext) 
extends SparkListenerBus {
+private[spark] class LiveListenerBus(conf: SparkConf) extends 
SparkListenerBus {
 
   self =>
 
   import LiveListenerBus._
 
+  private var sparkContext: SparkContext = _
+
   // Cap the capacity of the event queue so we get an explicit error 
(rather than
   // an OOM exception) if it's perpetually being added to more quickly 
than it's being drained.
-  private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
-  private lazy val eventQueue = new 
LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
-
-  private def validateAndGetQueueSize(): Int = {
-val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
-if (queueSize <= 0) {
-  throw new 
SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
-}
-queueSize
+  private val eventQueue = {
+val capacity = conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
+require(capacity > 0, s"${LISTENER_BUS_EVENT_QUEUE_SIZE.key} must be > 
0!")
--- End diff --

this constraint can be put in `LISTENER_BUS_EVENT_QUEUE_SIZE` with 
`TypedConfigBuilder.checkValue`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...

2017-05-23 Thread JoshRosen
GitHub user JoshRosen opened a pull request:

https://github.com/apache/spark/pull/18083

[SPARK-20863] Add metrics/instrumentation to LiveListenerBus

## What changes were proposed in this pull request?

This patch adds Coda Hale metrics for instrumenting the `LiveListenerBus` 
in order to track the number of events received, dropped, and processed, as 
well as a timer to track the processing time per event. See the new 
`LiveListenerBusMetrics` for a complete description of the new metrics.

## How was this patch tested?

New tests in SparkListenerSuite, including a test to ensure proper counting 
of dropped listener events.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JoshRosen/spark listener-bus-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18083


commit a1fb5a8f2e58fe774aabc76e9d1a6859cfa99370
Author: Josh Rosen 
Date:   2017-05-23T21:49:35Z

WIP




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org