[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu closed the pull request at: https://github.com/apache/spark/pull/20940 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r181943630 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- Thanks for your thoughts on this. Size of message, and also logging, but it is only an extra few longs per heartbeat, and and similarly for logging. Task end would help with minimizing communication for longer running tasks. The heartbeats are only every 10 seconds, so perhaps not so bad. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r181896603 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- is your concern the size of the msg from the executors to the driver? that certainly is valid, but I wonder if we should think a bit harder about this if that is going to be a common concern, as I think we'll want to add more metrics. One possibility is for the executor to do the peak calculation itself, and then only send an update for the metrics with a new peak. Also that would let us just send the peak on task end events. I'm just brainstorming at the moment, not saying it should be changed one way or the other ... need to think about it more --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r181611071 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- ExecutorMetrics right now has: jvmUsedHeapMemory, jvmUsedNonHeapMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory. For logging at stage end, we can log the peak for each of these, but unified memory is more problematic. We could add new fields for on heap/off heap unified memory, but I'm inclined to remove unified memory (from all the places it is currently used), rather than add more fields. Users can still sum peak execution and peak storage values, which may be larger than the actual peak unified memory if they are not at peak values at the same time, but should still be a reasonable estimate for sizing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180446530 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- I will make the change to log at stage end, and will update the design doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180426692 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- yeah logging an event per executor at stage end seems good to me. It would be great if we could see how much that version affects log size as well, if you can get those metrics. also these tradeoffs should go into the design doc, its harder to find comments from a PR after this feature has been merged. For now, it would also be nice if you could post a version that everyone can comment on, eg. a google doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180287725 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- The original analysis was focused on some test applications and larger applications. Looking at a broader sample of our applications, the logging overhead is higher, averaging about 14% per application, and 4% overall (sum of logging for ExecutorMetricsUpdate / sum of Spark history logs). The overhead for larger Spark history logs is mostly pretty small, but increases significantly for smaller ones (there's one at 49%). There's often some logging before the first stage starts, which is extra overhead especially for smaller applications/history logs, that doesn't contain useful information. It can also be high for the case where the stage takes a long time to run and memory is increasing rather than reaching the peak quickly -- logging at stage end would work better for this case. I should also note that these numbers are for the case where only 4 longs are recorded, and with more metrics, the overhead would be higher, both in the size of each logged event, and the number of potential peaks, since a new peak for any metric would be logged. Since there will be more metrics added, and the cost is higher than originally added, logging at stage end could be a better choice. We would lose some information about overlapping stages, but this information wouldn't be visible anyway with the currently planned REST APIs or web UI, which just show the peaks for stages and executors. For logging at stage end, we can log an ExecutorMetricsUpdate event for each executor that has sent a heartbeat for the stage just before logging the stage end -- this would have the peak value for each metric. This should be the minimum amount of logging needed to have information about peak values per stage per executor. Alternatively, the information could be added to the StageCompleted event for more compaction, but the code would be more complicated, with 2 paths for reading in values. Logging an event per executor at stage end seems like a reasonable choice, not too much extra logging or too much extra complexity. What are your thoughts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180204845 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -772,6 +772,12 @@ private[spark] class Executor( val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() val curGCTime = computeTotalGcTime() +// get executor level memory metrics +val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(), + ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(), --- End diff -- Thanks, I will play around with it a bit. If it seems more complicated or expensive, I'll file a separate subtask. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180199472 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -772,6 +772,12 @@ private[spark] class Executor( val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() val curGCTime = computeTotalGcTime() +// get executor level memory metrics +val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(), + ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(), --- End diff -- to be honest I haven't tried this out myself, I only knew this was possible. The version I was familiar with is whats used by the dropwizard metrics https://github.com/dropwizard/metrics/blob/4.1-development/metrics-jvm/src/main/java/com/codahale/metrics/jvm/BufferPoolMetricSet.java sorry you'll need to experiment with it a bit. (again, not a blocker for this) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180197576 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- ok that makes sense to me. If the logging overhead is small, logging on every new peak certainly seems simpler. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180180795 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -772,6 +772,12 @@ private[spark] class Executor( val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() val curGCTime = computeTotalGcTime() +// get executor level memory metrics +val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(), + ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(), --- End diff -- We could add ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed(), for total non-heap memory used by the JVM. For direct and memory mapped usage, would collecting these be similar to https://gist.github.com/t3rmin4t0r/1a753ccdcfa8d111f07c ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180179243 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- We're not constructing a timeline currently, and yes, with only peak values, it could be rather sparse. We would get new values with new stages, but would not see decreases in memory during a stage. The situation where there is a stage 1 with peak X, and then stage 2 starts with peak Y > X is interesting though, and it would be useful to have this information, since we would then know to focus on stage 2 for memory consumption, even though both 1 and 2 could have the same peak values. Logging at stage start would double the amount of logging, and would be trickier, so I'd prefer either keeping the current approach or only logging at stage end. The higher logging was for smaller applications (and smaller logs). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180117149 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- Also on log size -- was there anything special about the 8% case? Eg. was a tiny application running on a ton of executors, so the logs were small to begin with? If so, then its probably fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180114405 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- You could tell which stages are active from the other events. Do you think that timeline would be useful? Since its only keeping the peak, I'm not sure how interesting that graph is. With overlapping stages, you might lose some information by only logging at stage end -- eg. if first stage 1 is running for a while, with peak X, and then stage 2 starts up with peak Y > X, you would only ever log peak Y for stage 1. You could address this by also logging at stage start, but then you're back to more logging (and its really tricky to figure out a minimal set of things of log when stage 2 starts, as you don't know which executor its going to run on). But I'm still not sure what we'd do with those extra values, and if there is any value in capturing them. (some this design discussion probably belongs on jira -- I'll also go reread the design doc now) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180110683 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -772,6 +772,12 @@ private[spark] class Executor( val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() val curGCTime = computeTotalGcTime() +// get executor level memory metrics +val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(), + ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(), --- End diff -- I also think its totally fine to not have every metric possible now, but if this one is easy to add here, it would be nice. In particular I'm thinking we'd also like to capture the memory associated with python if its a pyspark app, though that is significantly more complicated so we don't need to do that now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179978448 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -772,6 +772,12 @@ private[spark] class Executor( val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() val curGCTime = computeTotalGcTime() +// get executor level memory metrics +val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(), + ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(), --- End diff -- It would be useful to have more information about offheap memory usage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179978186 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// list of events and if they should be logged +val events = Array( + (SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), true), + (createExecutorAddedEvent(1), true), + (createExecutorAddedEvent(2), true), + (createStageSubmittedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), true), // onheap storage + (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), true), // JVM used + (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), true), // onheap unified + (createStageSubmittedEvent(1), true), + (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), true), // new stage + (createStageCompletedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), true), // onheap execution + (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), true), // offheap execution + (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), true), // offheap storage + (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), false), + (createStageCompletedEvent(1), true), + (SparkListenerApplicationEnd(1000L), true)) + +// play the events for the event logger +eventLogger.start() +listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) +listenerBus.addToEventLogQueue(eventLogger) +for ((event, included) <- events) { + listenerBus.post(event) +} +listenerBus.stop() +eventLogger.stop() + +// Verify the log file contains the expected events +val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) +try { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 19) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + var i = 1 + for ((event, included) <- events) { +if (included) { + checkEvent(lines(i), event) + i += 1 +} + } +} finally { + logData.close() +} + } + + /** Create a stage submitted event for the specified stage Id. */ + private def createStageSubmittedEvent(stageId: Int) = +SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + + /** Create a stage completed event for the specified stage Id. */ + private def createStageCompletedEvent(stageId: Int) = +SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + + /** Create an executor added event for the specified executor Id. */ + private def createExecutorAddedEvent(executorId: Int) = +SparkListenerExecutorAdded(0L, executorId.toString, new ExecutorInfo("host1", 1, Map.empty)) + + /** Create an executor metrics update
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179978222 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// list of events and if they should be logged +val events = Array( + (SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), true), + (createExecutorAddedEvent(1), true), + (createExecutorAddedEvent(2), true), + (createStageSubmittedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), true), // onheap storage + (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), true), // JVM used + (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), true), // onheap unified + (createStageSubmittedEvent(1), true), + (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), true), // new stage + (createStageCompletedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), true), // onheap execution + (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), true), // offheap execution + (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), true), // offheap storage + (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), false), + (createStageCompletedEvent(1), true), + (SparkListenerApplicationEnd(1000L), true)) + +// play the events for the event logger +eventLogger.start() +listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) +listenerBus.addToEventLogQueue(eventLogger) +for ((event, included) <- events) { + listenerBus.post(event) +} +listenerBus.stop() +eventLogger.stop() + +// Verify the log file contains the expected events +val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) +try { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 19) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + var i = 1 + for ((event, included) <- events) { +if (included) { + checkEvent(lines(i), event) + i += 1 +} + } +} finally { + logData.close() +} + } + + /** Create a stage submitted event for the specified stage Id. */ + private def createStageSubmittedEvent(stageId: Int) = +SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + + /** Create a stage completed event for the specified stage Id. */ + private def createStageCompletedEvent(stageId: Int) = +SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + + /** Create an executor added event for the specified executor Id. */ + private def createExecutorAddedEvent(executorId: Int) = +SparkListenerExecutorAdded(0L, executorId.toString, new ExecutorInfo("host1", 1, Map.empty)) + + /** Create an executor metrics update
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179978192 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// list of events and if they should be logged +val events = Array( + (SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), true), + (createExecutorAddedEvent(1), true), + (createExecutorAddedEvent(2), true), + (createStageSubmittedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), true), // onheap storage + (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), true), // JVM used + (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), true), // onheap unified + (createStageSubmittedEvent(1), true), + (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), true), // new stage + (createStageCompletedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), true), // onheap execution + (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), true), // offheap execution + (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), true), // offheap storage + (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), false), + (createStageCompletedEvent(1), true), + (SparkListenerApplicationEnd(1000L), true)) + +// play the events for the event logger +eventLogger.start() +listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) +listenerBus.addToEventLogQueue(eventLogger) +for ((event, included) <- events) { + listenerBus.post(event) +} +listenerBus.stop() +eventLogger.stop() + +// Verify the log file contains the expected events +val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) +try { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 19) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + var i = 1 + for ((event, included) <- events) { +if (included) { + checkEvent(lines(i), event) + i += 1 +} + } +} finally { + logData.close() +} + } + + /** Create a stage submitted event for the specified stage Id. */ + private def createStageSubmittedEvent(stageId: Int) = --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179978182 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -268,6 +268,9 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE def hasMemoryInfo: Boolean = totalOnHeap >= 0L + // peak values for executor level metrics + var peakExecutorMetrics = new PeakExecutorMetrics --- End diff -- Yes, thanks for catching this, it should be val. This is more properly part of SPARK-23431, and I can remove for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179978102 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- For a longer running stage, once it ramps up, hopefully there wouldn't be a lot of new peak values. Looking at a subset of our applications, the extra logging overhead has mostly been between 0.25% to 1%, but it can be 8%. By logging each peak value at the time they occur (and reinitializing when a stage starts), it's possible to tell which stages are active at the time, and it would potentially be possible to graph these changes on a timeline -- this information wouldn't be available if the metrics are only logged at stage end, and the times are lost. Logging at stage end would limit the amount of extra logging. If we add more metrics (such as for offheap), then there could be more new peaks and more extra logging with the current approach. Excess logging is a concern, and I can move to stage end if the overhead is too much. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179802392 --- Diff: core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala --- @@ -654,6 +681,25 @@ private[spark] object JsonProtocolSuite extends Assertions { assert(metrics1.bytesRead === metrics2.bytesRead) } + private def assertEquals(metrics1: Option[ExecutorMetrics], metrics2: Option[ExecutorMetrics]) { +metrics1 match { + case Some(m1) => +metrics2 match { + case Some(m2) => +assert(m1.timestamp === m2.timestamp) +assert(m1.jvmUsedMemory === m2.jvmUsedMemory) +assert(m1.onHeapExecutionMemory === m2.onHeapExecutionMemory) +assert(m1.offHeapExecutionMemory === m2.offHeapExecutionMemory) +assert(m1.onHeapStorageMemory === m2.onHeapStorageMemory) +assert(m1.offHeapStorageMemory === m2.offHeapStorageMemory) + case None => +assert(false) +} + case None => +assert(metrics2.isEmpty) --- End diff -- this version looks correct, but I think the matching I mentioned above is a little cleaner. And then you should be able to have `EventLoggingListenerSuite` jsut use this method rather than repeating. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179795171 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- I wouldn't think you'd want to log for every new peak, as I'd expect it would be natural for the peak to keep growing, so you'd just end up with a lot of logs. I'd expect you'd just log the peak when the stage ended, or when the executor died. the downside of that approach is that you never log a peak if the driver dies ... but then you've got to figure out the driver issue anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179797803 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// list of events and if they should be logged +val events = Array( + (SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), true), + (createExecutorAddedEvent(1), true), + (createExecutorAddedEvent(2), true), + (createStageSubmittedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), true), // onheap storage + (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), true), // JVM used + (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), true), // onheap unified + (createStageSubmittedEvent(1), true), + (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), true), // new stage + (createStageCompletedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), true), // onheap execution + (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), true), // offheap execution + (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), true), // offheap storage + (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), false), + (createStageCompletedEvent(1), true), + (SparkListenerApplicationEnd(1000L), true)) + +// play the events for the event logger +eventLogger.start() +listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) +listenerBus.addToEventLogQueue(eventLogger) +for ((event, included) <- events) { + listenerBus.post(event) +} +listenerBus.stop() +eventLogger.stop() + +// Verify the log file contains the expected events +val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) +try { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 19) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + var i = 1 + for ((event, included) <- events) { +if (included) { + checkEvent(lines(i), event) + i += 1 +} + } +} finally { + logData.close() +} + } + + /** Create a stage submitted event for the specified stage Id. */ + private def createStageSubmittedEvent(stageId: Int) = --- End diff -- nit: multiline methods enclosed in `{}`, here and elsewhere, even if body is only one line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179796802 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -268,6 +268,9 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE def hasMemoryInfo: Boolean = totalOnHeap >= 0L + // peak values for executor level metrics + var peakExecutorMetrics = new PeakExecutorMetrics --- End diff -- I think this can be a `val` -- its mutated, not replaced Also I don't see any tests that this is updated correctly, both in a live app and when replayed from the limited logs? OTOH, maybe you should just remove this from this change, and wait till SPARK-23431 as there you'll need to add something else to expose this for each executor / stage, which may be more important than just the overall total for the executor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179796239 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -93,6 +94,9 @@ private[spark] class EventLoggingListener( // Visible for tests only. private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + // Peak metric values for each executor + private var peakExecutorMetrics = new mutable.HashMap[String, PeakExecutorMetrics]() --- End diff -- you need to handle overlapping stages. I think you actually need to key on both executor and stage, and on stage end, you only clear the metrics for that stage. EDIT: ok after I went through everything, I think I see how this works -- since you log on every new peak, you'll also get a logged message for the earlier update. But as I mention below, this strategy seems like it'll result in a lot of extra logging. Maybe I'm wrong, though, would be great to see how much the logs grow this way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179803848 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -772,6 +772,12 @@ private[spark] class Executor( val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() val curGCTime = computeTotalGcTime() +// get executor level memory metrics +val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(), + ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(), --- End diff -- What about including the jvms direct & memory mapped usage as well? see https://issues.apache.org/jira/browse/SPARK-22483 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179797706 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// list of events and if they should be logged +val events = Array( + (SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), true), + (createExecutorAddedEvent(1), true), + (createExecutorAddedEvent(2), true), + (createStageSubmittedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), true), // onheap storage + (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), true), // JVM used + (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), true), // onheap unified + (createStageSubmittedEvent(1), true), + (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), true), // new stage + (createStageCompletedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), true), // onheap execution + (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), true), // offheap execution + (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), true), // offheap storage + (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), false), + (createStageCompletedEvent(1), true), + (SparkListenerApplicationEnd(1000L), true)) + +// play the events for the event logger +eventLogger.start() +listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) +listenerBus.addToEventLogQueue(eventLogger) +for ((event, included) <- events) { + listenerBus.post(event) +} +listenerBus.stop() +eventLogger.stop() + +// Verify the log file contains the expected events +val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) +try { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 19) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + var i = 1 + for ((event, included) <- events) { +if (included) { + checkEvent(lines(i), event) + i += 1 +} + } +} finally { + logData.close() +} + } + + /** Create a stage submitted event for the specified stage Id. */ + private def createStageSubmittedEvent(stageId: Int) = +SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + + /** Create a stage completed event for the specified stage Id. */ + private def createStageCompletedEvent(stageId: Int) = +SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + + /** Create an executor added event for the specified executor Id. */ + private def createExecutorAddedEvent(executorId: Int) = +SparkListenerExecutorAdded(0L, executorId.toString, new ExecutorInfo("host1", 1, Map.empty)) + + /** Create an executor metrics update
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179801207 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// list of events and if they should be logged +val events = Array( + (SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), true), + (createExecutorAddedEvent(1), true), + (createExecutorAddedEvent(2), true), + (createStageSubmittedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), true), // onheap storage + (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), true), // JVM used + (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), true), // onheap unified + (createStageSubmittedEvent(1), true), + (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), true), // new stage + (createStageCompletedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), true), // onheap execution + (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), true), // offheap execution + (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), true), // offheap storage + (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), false), + (createStageCompletedEvent(1), true), + (SparkListenerApplicationEnd(1000L), true)) + +// play the events for the event logger +eventLogger.start() +listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) +listenerBus.addToEventLogQueue(eventLogger) +for ((event, included) <- events) { + listenerBus.post(event) +} +listenerBus.stop() +eventLogger.stop() + +// Verify the log file contains the expected events +val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) +try { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 19) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + var i = 1 + for ((event, included) <- events) { +if (included) { + checkEvent(lines(i), event) + i += 1 +} + } +} finally { + logData.close() +} + } + + /** Create a stage submitted event for the specified stage Id. */ + private def createStageSubmittedEvent(stageId: Int) = +SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + + /** Create a stage completed event for the specified stage Id. */ + private def createStageCompletedEvent(stageId: Int) = +SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + + /** Create an executor added event for the specified executor Id. */ + private def createExecutorAddedEvent(executorId: Int) = +SparkListenerExecutorAdded(0L, executorId.toString, new ExecutorInfo("host1", 1, Map.empty)) + + /** Create an executor metrics update
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
GitHub user edwinalu opened a pull request: https://github.com/apache/spark/pull/20940 [SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction. ## What changes were proposed in this pull request? An ExecutorMetrics class is added, with jvmUsedMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics is sent by executors to the driver as part of Heartbeat. A heartbeat is added for the driver as well, to collect these metrics for the driver. The EventLoggingListener logs ExecutorMetricsUpdate events if there is a new peak value for any of the memory metrics for an executor and stage. Only the ExecutorMetrics will be logged, and not the TaskMetrics, to minimize additional logging. The AppStatusListener records the peak values for each memory metric. The new memory metrics are added to the executors REST API. ## How was this patch tested? New unit tests have been added. This was also tested on our cluster. You can merge this pull request into a Git repository by running: $ git pull https://github.com/edwinalu/spark SPARK-23429 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20940.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20940 commit 48eab1d23fcebb315bbe787bed8e468c7fc69233 Author: Edwina LuDate: 2018-03-09T23:39:36Z SPARK-23429: Add executor memory metrics to heartbeat and expose in executors REST API Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction. Add an ExecutorMetrics class, with jvmUsedMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics will be sent by executors to the driver as part of Heartbeat. A heartbeat will be added for the driver as well, to collect these metrics for the driver. Modify the EventLoggingListener to log ExecutorMetricsUpdate events if there is a new peak value for any of the memory metrics for an executor and stage. Only the ExecutorMetrics will be logged, and not the TaskMetrics, to minimize additional logging. Modify the AppStatusListener to record the peak values for each memory metric. Add the new memory metrics to the executors REST API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org