[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18083 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r121031573 --- Diff: core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala --- @@ -138,6 +162,44 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(drained) } + test("metrics for dropped listener events") { +val bus = new LiveListenerBus(new SparkConf().set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1)) + +val listenerStarted = new Semaphore(0) +val listenerWait = new Semaphore(0) + +bus.addListener(new SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +listenerStarted.release() +listenerWait.acquire() + } +}) + +bus.start(mockSparkContext, mockMetricsSystem) + +// Post a message to the listener bus and wait for processing to begin: +bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) +listenerStarted.acquire() --- End diff -- This should be fine: - If this code runs before `listenerStarted.release()` then it will block until `listenerStarted.release()` is hit. - The listener will block in `listenerWait.acquire()` until we call `listenerWait.release()` further down in this method. This synchronization pattern is already used elsewhere in this suite in https://github.com/JoshRosen/spark/blob/76b669ca6eb35a0cce4291702baa5d1f60adb467/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala#L113 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r121031515 --- Diff: core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala --- @@ -138,6 +162,44 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(drained) } + test("metrics for dropped listener events") { +val bus = new LiveListenerBus(new SparkConf().set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1)) + +val listenerStarted = new Semaphore(0) +val listenerWait = new Semaphore(0) + +bus.addListener(new SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +listenerStarted.release() +listenerWait.acquire() + } +}) + +bus.start(mockSparkContext, mockMetricsSystem) + +// Post a message to the listener bus and wait for processing to begin: +bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) +listenerStarted.acquire() --- End diff -- actually the order doesn't matter, if `release` is called first, `acquire` won't block --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r121021258 --- Diff: core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala --- @@ -138,6 +162,44 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(drained) } + test("metrics for dropped listener events") { +val bus = new LiveListenerBus(new SparkConf().set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1)) + +val listenerStarted = new Semaphore(0) +val listenerWait = new Semaphore(0) + +bus.addListener(new SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +listenerStarted.release() +listenerWait.acquire() + } +}) + +bus.start(mockSparkContext, mockMetricsSystem) + +// Post a message to the listener bus and wait for processing to begin: +bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) +listenerStarted.acquire() --- End diff -- is this deterministic that this line will be run before `listenerStarted.release()` in `onJobEnd`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r121003615 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -217,3 +243,61 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] +class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging { + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This is a count of the total number + * of events which have been produced by the application and sent to the listener bus, NOT a + * count of the number of events which have been processed and delivered to listeners (or dropped + * without being delivered). + */ + val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) + + /** + * The total number of events that were dropped without being delivered to listeners. + */ + val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) + + /** + * The amount of time taken to post a single event to all listeners. + */ + val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) + + /** + * The number of messages waiting in the queue. + */ + val queueSize: Gauge[Int] = { +metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ + override def getValue: Int = queue.size() +}) + } + + // Guarded by synchronization. + private val perListenerClassTimers = mutable.Map[String, Timer]() + + /** + * Returns a timer tracking the processing time of the given listener class. + * events processed by that listener. This method is thread-safe. + */ + def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = { +synchronized { + val className = cls.getName + val maxTimed = 128 --- End diff -- *Shrug*. Maybe, but note that this would be 128 _separate listener classes_. Let me put in an undocumented configuration. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r120999648 --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala --- @@ -23,29 +23,41 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal +import com.codahale.metrics.Timer + import org.apache.spark.internal.Logging /** * An event bus which posts events to its listeners. */ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { + private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)] + // Marked `private[spark]` for access in tests. - private[spark] val listeners = new CopyOnWriteArrayList[L] + private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava + + /** + * Returns a CodaHale metrics Timer for measuring the listener's event processing time. + * This method is intended to be overridden by subclasses. + */ + protected def getTimer(listener: L): Option[Timer] = None /** * Add a listener to listen events. This method is thread-safe and can be called in any thread. */ final def addListener(listener: L): Unit = { -listeners.add(listener) +listenersPlusTimers.add((listener, getTimer(listener).orNull)) } /** * Remove a listener and it won't receive any events. This method is thread-safe and can be called * in any thread. */ final def removeListener(listener: L): Unit = { -listeners.remove(listener) +listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer => + listenersPlusTimers.remove(listenerAndTimer) --- End diff -- I think the only reason that `CopyOnWriteArrayList` was used was for thread-safety and fast performance for readers interleaved with very rare mutations / writes. If we were to replace the array list then we'd need to add a `synchronized` to guard the `listenersPlusTimers` field itself. Given the workload and access patterns here, I'm not sure that it's worth it to attempt to optimize this `removeListener()` method any further. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r120973638 --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala --- @@ -23,29 +23,41 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal +import com.codahale.metrics.Timer + import org.apache.spark.internal.Logging /** * An event bus which posts events to its listeners. */ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { + private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)] + // Marked `private[spark]` for access in tests. - private[spark] val listeners = new CopyOnWriteArrayList[L] + private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava + + /** + * Returns a CodaHale metrics Timer for measuring the listener's event processing time. + * This method is intended to be overridden by subclasses. + */ + protected def getTimer(listener: L): Option[Timer] = None /** * Add a listener to listen events. This method is thread-safe and can be called in any thread. */ final def addListener(listener: L): Unit = { -listeners.add(listener) +listenersPlusTimers.add((listener, getTimer(listener).orNull)) } /** * Remove a listener and it won't receive any events. This method is thread-safe and can be called * in any thread. */ final def removeListener(listener: L): Unit = { -listeners.remove(listener) +listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer => + listenersPlusTimers.remove(listenerAndTimer) --- End diff -- since this is a `CopyOnWriteArrayList`, shall we just do a filter and create a new array? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r120972760 --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala --- @@ -23,29 +23,41 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal +import com.codahale.metrics.Timer + import org.apache.spark.internal.Logging /** * An event bus which posts events to its listeners. */ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { + private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)] --- End diff -- shall we use `Option[Timmer]` as value type? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r120972548 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -217,3 +243,61 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] +class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging { + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This is a count of the total number + * of events which have been produced by the application and sent to the listener bus, NOT a + * count of the number of events which have been processed and delivered to listeners (or dropped + * without being delivered). + */ + val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) + + /** + * The total number of events that were dropped without being delivered to listeners. + */ + val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) + + /** + * The amount of time taken to post a single event to all listeners. + */ + val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) + + /** + * The number of messages waiting in the queue. + */ + val queueSize: Gauge[Int] = { +metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ + override def getValue: Int = queue.size() +}) + } + + // Guarded by synchronization. + private val perListenerClassTimers = mutable.Map[String, Timer]() + + /** + * Returns a timer tracking the processing time of the given listener class. + * events processed by that listener. This method is thread-safe. + */ + def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = { --- End diff -- why not just pass class name as a parameter? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r120972483 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -217,3 +243,61 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] +class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging { + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This is a count of the total number + * of events which have been produced by the application and sent to the listener bus, NOT a + * count of the number of events which have been processed and delivered to listeners (or dropped + * without being delivered). + */ + val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) + + /** + * The total number of events that were dropped without being delivered to listeners. + */ + val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) + + /** + * The amount of time taken to post a single event to all listeners. + */ + val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) + + /** + * The number of messages waiting in the queue. + */ + val queueSize: Gauge[Int] = { +metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ + override def getValue: Int = queue.size() +}) + } + + // Guarded by synchronization. + private val perListenerClassTimers = mutable.Map[String, Timer]() + + /** + * Returns a timer tracking the processing time of the given listener class. + * events processed by that listener. This method is thread-safe. + */ + def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = { +synchronized { + val className = cls.getName + val maxTimed = 128 --- End diff -- should this be configurable? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user bOOm-X commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r120619672 --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala --- @@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { // JavaConverters can create a JIterableWrapper if we use asScala. // However, this method will be called frequently. To avoid the wrapper cost, here we use // Java Iterator directly. -val iter = listeners.iterator +val iter = listenersPlusTimers.iterator while (iter.hasNext) { - val listener = iter.next() + val listenerAndMaybeTimer = iter.next() + val listener = listenerAndMaybeTimer._1 + val maybeTimer = listenerAndMaybeTimer._2 + var maybeTimerContext = if (maybeTimer != null) { --- End diff -- Indeed ! But you can put the option in the collection listenersPlusTimers (instead of doing a orNull when you create the timer) and so you can use it without having to recreate one each time in the postToAll method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r120197916 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -111,6 +112,15 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { } } + override protected def createTimer(listener: SparkListenerInterface): Option[Timer] = { +if (listener.getClass.getName.startsWith("org.apache.spark")) { --- End diff -- This is accounted for in a later commit. All listeners are now captured. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r120197363 --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala --- @@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { // JavaConverters can create a JIterableWrapper if we use asScala. // However, this method will be called frequently. To avoid the wrapper cost, here we use // Java Iterator directly. -val iter = listeners.iterator +val iter = listenersPlusTimers.iterator while (iter.hasNext) { - val listener = iter.next() + val listenerAndMaybeTimer = iter.next() + val listener = listenerAndMaybeTimer._1 + val maybeTimer = listenerAndMaybeTimer._2 + var maybeTimerContext = if (maybeTimer != null) { --- End diff -- Actually, there is a cost here: allocating a new Option on every `postToAll` is going to create more allocations and method calls. Thus I'm going to leave this unchanged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r120194864 --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala --- @@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { // JavaConverters can create a JIterableWrapper if we use asScala. // However, this method will be called frequently. To avoid the wrapper cost, here we use // Java Iterator directly. -val iter = listeners.iterator +val iter = listenersPlusTimers.iterator while (iter.hasNext) { - val listener = iter.next() + val listenerAndMaybeTimer = iter.next() + val listener = listenerAndMaybeTimer._1 + val maybeTimer = listenerAndMaybeTimer._2 + var maybeTimerContext = if (maybeTimer != null) { --- End diff -- Yeah, this is just premature optimization. I'll undo. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user bOOm-X commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r119585097 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -111,6 +112,15 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { } } + override protected def createTimer(listener: SparkListenerInterface): Option[Timer] = { +if (listener.getClass.getName.startsWith("org.apache.spark")) { --- End diff -- Why creating listener just for "spark" listener ? We may want timings even for "third-party" listeners. It is even more important in my mind, for these listeners because they can be much less optimized and so bring a huge performance penalty --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user bOOm-X commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r119583821 --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala --- @@ -23,29 +23,41 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal +import com.codahale.metrics.Timer + import org.apache.spark.internal.Logging /** * An event bus which posts events to its listeners. */ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { + private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Timer)] + // Marked `private[spark]` for access in tests. - private[spark] val listeners = new CopyOnWriteArrayList[L] + private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava + + /** + * Returns a CodaHale metrics Timer for measuring the listener's event processing time. + * This method is intended to be overridden by subclasses. + */ + protected def createTimer(listener: L): Option[Timer] = None /** * Add a listener to listen events. This method is thread-safe and can be called in any thread. */ final def addListener(listener: L): Unit = { -listeners.add(listener) +listenersPlusTimers.add((listener, createTimer(listener).orNull)) --- End diff -- Why not keeping the option in the collection instead of putting null ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user bOOm-X commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r119584011 --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala --- @@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { // JavaConverters can create a JIterableWrapper if we use asScala. // However, this method will be called frequently. To avoid the wrapper cost, here we use // Java Iterator directly. -val iter = listeners.iterator +val iter = listenersPlusTimers.iterator while (iter.hasNext) { - val listener = iter.next() + val listenerAndMaybeTimer = iter.next() + val listener = listenerAndMaybeTimer._1 + val maybeTimer = listenerAndMaybeTimer._2 + var maybeTimerContext = if (maybeTimer != null) { +maybeTimer.time() + } else { +null + } try { doPostEvent(listener, event) } catch { case NonFatal(e) => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) + } finally { +if (maybeTimerContext != null) { --- End diff -- Same. simpler with an option --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user bOOm-X commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r119583956 --- Diff: core/src/main/scala/org/apache/spark/util/ListenerBus.scala --- @@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { // JavaConverters can create a JIterableWrapper if we use asScala. // However, this method will be called frequently. To avoid the wrapper cost, here we use // Java Iterator directly. -val iter = listeners.iterator +val iter = listenersPlusTimers.iterator while (iter.hasNext) { - val listener = iter.next() + val listenerAndMaybeTimer = iter.next() + val listener = listenerAndMaybeTimer._1 + val maybeTimer = listenerAndMaybeTimer._2 + var maybeTimerContext = if (maybeTimer != null) { --- End diff -- With an option (instead of null value) it would be much simpler --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118629022 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -226,3 +246,61 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] +class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging { + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This is a count of the total number + * of events which have been produced by the application and sent to the listener bus, NOT a + * count of the number of events which have been processed and delivered to listeners (or dropped + * without being delivered). + */ + val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) + + /** + * The total number of events that were dropped without being delivered to listeners. + */ + val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) + + /** + * The amount of time taken to post a single event to all listeners. + */ + val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) + + /** + * The number of messages waiting in the queue. + */ + val queueSize: Gauge[Int] = { +metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ + override def getValue: Int = queue.size() +}) + } + + // Guarded by synchronization. + private val perListenerClassTimers = mutable.Map[String, Timer]() + + /** + * Returns a timer tracking the processing time of the given listener class. + * events processed by that listener. This method is thread-safe. + */ + def getTimerForListener(listener: SparkListenerInterface): Option[Timer] = { +synchronized { + val className = listener.getClass.getName --- End diff -- I'll update the PR description to discuss this per-listener metric. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118628985 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -226,3 +246,61 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] +class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging { + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This is a count of the total number + * of events which have been produced by the application and sent to the listener bus, NOT a + * count of the number of events which have been processed and delivered to listeners (or dropped + * without being delivered). + */ + val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) + + /** + * The total number of events that were dropped without being delivered to listeners. + */ + val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) + + /** + * The amount of time taken to post a single event to all listeners. + */ + val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) + + /** + * The number of messages waiting in the queue. + */ + val queueSize: Gauge[Int] = { +metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ + override def getValue: Int = queue.size() +}) + } + + // Guarded by synchronization. + private val perListenerClassTimers = mutable.Map[String, Timer]() + + /** + * Returns a timer tracking the processing time of the given listener class. + * events processed by that listener. This method is thread-safe. + */ + def getTimerForListener(listener: SparkListenerInterface): Option[Timer] = { +synchronized { + val className = listener.getClass.getName --- End diff -- Yes, but my goal with these metrics is to be able to identify which listeners are causing performance problems and for that purpose it's more useful to group listeners by class rather than to instrument individual listeners. Most (all?) of Spark's internal listeners have one instance per driver / SparkContext, so in practice keeping track of stats on a per-instance basis wouldn't actually be a meaningful difference in typical cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118623202 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -226,3 +246,61 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] +class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging { + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This is a count of the total number + * of events which have been produced by the application and sent to the listener bus, NOT a + * count of the number of events which have been processed and delivered to listeners (or dropped + * without being delivered). + */ + val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) + + /** + * The total number of events that were dropped without being delivered to listeners. + */ + val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) + + /** + * The amount of time taken to post a single event to all listeners. + */ + val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) + + /** + * The number of messages waiting in the queue. + */ + val queueSize: Gauge[Int] = { +metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ + override def getValue: Int = queue.size() +}) + } + + // Guarded by synchronization. + private val perListenerClassTimers = mutable.Map[String, Timer]() + + /** + * Returns a timer tracking the processing time of the given listener class. + * events processed by that listener. This method is thread-safe. + */ + def getTimerForListener(listener: SparkListenerInterface): Option[Timer] = { +synchronized { + val className = listener.getClass.getName --- End diff -- is it possible that users register the same listener twice? Then the class name may not be a good identifier for listeners. I think this is the main problem of having listener-wise metrics, how to identify each listener? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118434907 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -226,3 +240,34 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source { + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This counts the number of times + * that `post()` is called, which might be less than the total number of events processed in + * case events are dropped. + */ + val numEventsReceived: Counter = metricRegistry.counter(MetricRegistry.name("numEventsReceived")) + + /** + * The total number of events that were dropped without being delivered to listeners. + */ + val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) + + /** + * The amount of time taken to post a single event to all listeners. + */ + val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) + + /** + * The number of of messages waiting in the queue. + */ + val queueSize: Gauge[Int] = { --- End diff -- ah i see --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118432570 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa logError(s"$name has already stopped! Dropping event $event") return } +metrics.numEventsReceived.inc() val eventAdded = eventQueue.offer(event) if (eventAdded) { eventLock.release() } else { onDropEvent(event) + metrics.numDroppedEvents.inc() --- End diff -- Sure, will do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118432135 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -226,3 +240,34 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source { + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This counts the number of times + * that `post()` is called, which might be less than the total number of events processed in + * case events are dropped. + */ + val numEventsReceived: Counter = metricRegistry.counter(MetricRegistry.name("numEventsReceived")) + + /** + * The total number of events that were dropped without being delivered to listeners. + */ + val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) + + /** + * The amount of time taken to post a single event to all listeners. + */ + val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) + + /** + * The number of of messages waiting in the queue. + */ + val queueSize: Gauge[Int] = { --- End diff -- It's the queue's _capacity_ that's fixed. In https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html size refers to the number of items currently in the queue, whereas capacity refers to the maximum number of items that the queue can hold. I think the `spark.scheduler.listenerbus.eventqueue.size` configuration is confusingly named. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118431897 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -226,3 +240,34 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source { + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This counts the number of times + * that `post()` is called, which might be less than the total number of events processed in + * case events are dropped. --- End diff -- Yes. I was perhaps wasn't explicit enough in the comment, so I'll reword it or just drop the second confusing half. Is it clearer if I say > This counts the number of times that `post()` has been called called, not the total number of events that have completed processing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118431688 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa logError(s"$name has already stopped! Dropping event $event") return } +metrics.numEventsReceived.inc() --- End diff -- Yes. My idea was to have a counter which is incremented whenever an event is received, regardless of how it ends up being processed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118431597 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -33,25 +37,24 @@ import org.apache.spark.util.Utils * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. */ -private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus { +private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { self => import LiveListenerBus._ + private var sparkContext: SparkContext = _ + // Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize() - private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) - - private def validateAndGetQueueSize(): Int = { -val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) -if (queueSize <= 0) { - throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!") -} -queueSize + private val eventQueue = { +val capacity = conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) +require(capacity > 0, s"${LISTENER_BUS_EVENT_QUEUE_SIZE.key} must be > 0!") --- End diff -- Nice, I didn't know about that. I'll move it in my next update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118413353 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa logError(s"$name has already stopped! Dropping event $event") return } +metrics.numEventsReceived.inc() val eventAdded = eventQueue.offer(event) if (eventAdded) { eventLock.release() } else { onDropEvent(event) + metrics.numDroppedEvents.inc() --- End diff -- is it better to move this to `onDropEvent`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118413314 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa logError(s"$name has already stopped! Dropping event $event") return } +metrics.numEventsReceived.inc() --- End diff -- here we also count dropped events? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118413299 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -226,3 +240,34 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source { + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This counts the number of times + * that `post()` is called, which might be less than the total number of events processed in + * case events are dropped. --- End diff -- according to the code, we also count dropped events, isn't it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118413115 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -226,3 +240,34 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source { + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This counts the number of times + * that `post()` is called, which might be less than the total number of events processed in + * case events are dropped. + */ + val numEventsReceived: Counter = metricRegistry.counter(MetricRegistry.name("numEventsReceived")) + + /** + * The total number of events that were dropped without being delivered to listeners. + */ + val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) + + /** + * The amount of time taken to post a single event to all listeners. + */ + val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) + + /** + * The number of of messages waiting in the queue. + */ + val queueSize: Gauge[Int] = { --- End diff -- do we need this metric? Users can easily get it by looking at the `spark.scheduler.listenerbus.eventqueue.size` config. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118413024 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -226,3 +240,34 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source { + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This counts the number of times + * that `post()` is called, which might be less than the total number of events processed in + * case events are dropped. + */ + val numEventsReceived: Counter = metricRegistry.counter(MetricRegistry.name("numEventsReceived")) + + /** + * The total number of events that were dropped without being delivered to listeners. + */ + val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) + + /** + * The amount of time taken to post a single event to all listeners. + */ + val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) + + /** + * The number of of messages waiting in the queue. --- End diff -- nit: double `of` here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18083#discussion_r118412901 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -33,25 +37,24 @@ import org.apache.spark.util.Utils * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. */ -private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus { +private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { self => import LiveListenerBus._ + private var sparkContext: SparkContext = _ + // Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize() - private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) - - private def validateAndGetQueueSize(): Int = { -val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) -if (queueSize <= 0) { - throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!") -} -queueSize + private val eventQueue = { +val capacity = conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) +require(capacity > 0, s"${LISTENER_BUS_EVENT_QUEUE_SIZE.key} must be > 0!") --- End diff -- this constraint can be put in `LISTENER_BUS_EVENT_QUEUE_SIZE` with `TypedConfigBuilder.checkValue` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18083: [SPARK-20863] Add metrics/instrumentation to Live...
GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/18083 [SPARK-20863] Add metrics/instrumentation to LiveListenerBus ## What changes were proposed in this pull request? This patch adds Coda Hale metrics for instrumenting the `LiveListenerBus` in order to track the number of events received, dropped, and processed, as well as a timer to track the processing time per event. See the new `LiveListenerBusMetrics` for a complete description of the new metrics. ## How was this patch tested? New tests in SparkListenerSuite, including a test to ensure proper counting of dropped listener events. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JoshRosen/spark listener-bus-metrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18083.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18083 commit a1fb5a8f2e58fe774aabc76e9d1a6859cfa99370 Author: Josh RosenDate: 2017-05-23T21:49:35Z WIP --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org