[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-03 Thread edwinalu
Github user edwinalu closed the pull request at:

https://github.com/apache/spark/pull/20940


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-16 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r181943630
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

Thanks for your thoughts on this. Size of message, and also logging, but it 
is only an extra few longs per heartbeat, and and similarly for logging. Task 
end would help with minimizing communication for longer running tasks. The 
heartbeats are only every 10 seconds, so perhaps not so bad. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r181896603
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

is your concern the size of the msg from the executors to the driver?

that certainly is valid, but I wonder if we should think a bit harder about 
this if that is going to be a common concern, as I think we'll want to add more 
metrics.

One possibility is for the executor to do the peak calculation itself, and 
then only send an update for the metrics with a new peak.  Also that would let 
us just send the peak on task end events.

I'm just brainstorming at the moment, not saying it should be changed one 
way or the other ... need to think about it more


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-15 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r181611071
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

ExecutorMetrics right now has: jvmUsedHeapMemory, jvmUsedNonHeapMemory, 
onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and 
offHeapStorageMemory. For logging at stage end, we can log the peak for each of 
these, but unified memory is more problematic. We could add new fields for on 
heap/off heap unified memory, but I'm inclined to remove unified memory (from 
all the places it is currently used), rather than add more fields. Users can 
still sum peak execution and peak storage values, which may be larger than the 
actual peak unified memory if they are not at peak values at the same time, but 
should still be a reasonable estimate for sizing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-10 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180446530
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

I will make the change to log at stage end, and will update the design doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-10 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180426692
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

yeah logging an event per executor at stage end seems good to me.  It would 
be great if we could see how much that version affects log size as well, if you 
can get those metrics.

also these tradeoffs should go into the design doc, its harder to find 
comments from a PR after this feature has been merged.  For now, it would also 
be nice if you could post a version that everyone can comment on, eg. a google 
doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-09 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180287725
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

The original analysis was focused on some test applications and larger 
applications. Looking at a broader sample of our applications, the logging 
overhead is higher, averaging about 14% per application, and 4% overall (sum of 
logging for ExecutorMetricsUpdate / sum of Spark history logs). The overhead 
for larger Spark history logs is mostly pretty small, but increases 
significantly for smaller ones (there's one at 49%).

There's often some logging before the first stage starts, which is extra 
overhead especially for smaller applications/history logs, that doesn't contain 
useful information. It can also be high for the case where the stage takes a 
long time to run and memory is increasing rather than reaching the peak quickly 
-- logging at stage end would work better for this case. 

I should also note that these numbers are for the case where only 4 longs 
are recorded, and with more metrics, the overhead would be higher, both in the 
size of each logged event, and the number of potential peaks, since a new peak 
for any metric would be logged.

Since there will be more metrics added, and the cost is higher than 
originally added, logging at stage end could be a better choice. We would lose 
some information about overlapping stages, but this information wouldn't be 
visible anyway with the currently planned REST APIs or web UI, which just show 
the peaks for stages and executors. 

For logging at stage end, we can log an ExecutorMetricsUpdate event for 
each executor that has sent a heartbeat for the stage just before logging the 
stage end -- this would have the peak value for each metric. This should be the 
minimum amount of logging needed to have information about peak values per 
stage per executor. Alternatively, the information could be added to the 
StageCompleted event for more compaction, but the code would be more 
complicated, with 2 paths for reading in values. Logging an event per executor 
at stage end seems like a reasonable choice, not too much extra logging or too 
much extra complexity.

What are your thoughts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-09 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180204845
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -772,6 +772,12 @@ private[spark] class Executor(
 val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
 val curGCTime = computeTotalGcTime()
 
+// get executor level memory metrics
+val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(),
+  ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
--- End diff --

Thanks, I will play around with it a bit. If it seems more complicated or 
expensive, I'll file a separate subtask.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180199472
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -772,6 +772,12 @@ private[spark] class Executor(
 val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
 val curGCTime = computeTotalGcTime()
 
+// get executor level memory metrics
+val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(),
+  ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
--- End diff --

to be honest I haven't tried this out myself, I only knew this was 
possible.  The version I was familiar with is whats used by the dropwizard 
metrics


https://github.com/dropwizard/metrics/blob/4.1-development/metrics-jvm/src/main/java/com/codahale/metrics/jvm/BufferPoolMetricSet.java

sorry you'll need to experiment with it a bit.  (again, not a blocker for 
this)




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180197576
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

ok that makes sense to me.  If the logging overhead is small, logging on 
every new peak certainly seems simpler.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-09 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180180795
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -772,6 +772,12 @@ private[spark] class Executor(
 val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
 val curGCTime = computeTotalGcTime()
 
+// get executor level memory metrics
+val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(),
+  ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
--- End diff --

We could add 
ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed(), for total 
non-heap memory used by the JVM.

For direct and memory mapped usage, would collecting these be similar to 
https://gist.github.com/t3rmin4t0r/1a753ccdcfa8d111f07c ? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-09 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180179243
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

We're not constructing a timeline currently, and yes, with only peak 
values, it could be rather sparse. We would get new values with new stages, but 
would not see decreases in memory during a stage.

The situation where there is a stage 1 with peak X, and then stage 2 starts 
with peak Y > X is interesting though, and it would be useful to have this 
information, since we would then know to focus on stage 2 for memory 
consumption, even though both 1 and 2 could have the same peak values. Logging 
at stage start would double the amount of logging, and would be trickier, so 
I'd prefer either keeping the current approach or only logging at stage end.

The higher logging was for smaller applications (and smaller logs).



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180117149
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

Also on log size -- was there anything special about the 8% case?  Eg. was 
a tiny application running on a ton of executors, so the logs were small to 
begin with?  If so, then its probably fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180114405
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

You could tell which stages are active from the other events.  Do you think 
that timeline would be useful?  Since its only keeping the peak, I'm not sure 
how interesting that graph is.

With overlapping stages, you might lose some information by only logging at 
stage end -- eg. if first stage 1 is running for a while, with peak X, and then 
stage 2 starts up with peak Y > X, you would only ever log peak Y for stage 1.  
You could address this by also logging at stage start, but then you're back to 
more logging (and its really tricky to figure out a minimal set of things of 
log when stage 2 starts, as you don't know which executor its going to run on).

But I'm still not sure what we'd do with those extra values, and if there 
is any value in capturing them.

(some this design discussion probably belongs on jira -- I'll also go 
reread the design doc now)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180110683
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -772,6 +772,12 @@ private[spark] class Executor(
 val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
 val curGCTime = computeTotalGcTime()
 
+// get executor level memory metrics
+val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(),
+  ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
--- End diff --

I also think its totally fine to not have every metric possible now, but if 
this one is easy to add here, it would be nice.  In particular I'm thinking 
we'd also like to capture the memory associated with python if its a pyspark 
app, though that is significantly more complicated so we don't need to do that 
now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179978448
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -772,6 +772,12 @@ private[spark] class Executor(
 val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
 val curGCTime = computeTotalGcTime()
 
+// get executor level memory metrics
+val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(),
+  ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
--- End diff --

It would be useful to have more information about offheap memory usage. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179978186
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// list of events and if they should be logged
+val events = Array(
+  (SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None), true),
+  (createExecutorAddedEvent(1), true),
+  (createExecutorAddedEvent(2), true),
+  (createStageSubmittedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), 
true), // onheap storage
+  (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), 
true), // JVM used
+  (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), 
true), // onheap unified
+  (createStageSubmittedEvent(1), true),
+  (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createStageCompletedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), 
true), // onheap execution
+  (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), 
true), // offheap execution
+  (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), 
true), // offheap storage
+  (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), 
false),
+  (createStageCompletedEvent(1), true),
+  (SparkListenerApplicationEnd(1000L), true))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+for ((event, included) <- events) {
+  listenerBus.post(event)
+}
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events
+val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
+try {
+  val lines = readLines(logData)
+  val logStart = SparkListenerLogStart(SPARK_VERSION)
+  assert(lines.size === 19)
+  assert(lines(0).contains("SparkListenerLogStart"))
+  assert(lines(1).contains("SparkListenerApplicationStart"))
+  assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+  var i = 1
+  for ((event, included) <- events) {
+if (included) {
+  checkEvent(lines(i), event)
+  i += 1
+}
+  }
+} finally {
+  logData.close()
+}
+  }
+
+  /** Create a stage submitted event for the specified stage Id. */
+  private def createStageSubmittedEvent(stageId: Int) =
+SparkListenerStageSubmitted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create a stage completed event for the specified stage Id. */
+  private def createStageCompletedEvent(stageId: Int) =
+SparkListenerStageCompleted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create an executor added event for the specified executor Id. */
+  private def createExecutorAddedEvent(executorId: Int) =
+SparkListenerExecutorAdded(0L, executorId.toString, new 
ExecutorInfo("host1", 1, Map.empty))
+
+  /** Create an executor metrics update 

[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179978222
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// list of events and if they should be logged
+val events = Array(
+  (SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None), true),
+  (createExecutorAddedEvent(1), true),
+  (createExecutorAddedEvent(2), true),
+  (createStageSubmittedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), 
true), // onheap storage
+  (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), 
true), // JVM used
+  (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), 
true), // onheap unified
+  (createStageSubmittedEvent(1), true),
+  (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createStageCompletedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), 
true), // onheap execution
+  (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), 
true), // offheap execution
+  (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), 
true), // offheap storage
+  (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), 
false),
+  (createStageCompletedEvent(1), true),
+  (SparkListenerApplicationEnd(1000L), true))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+for ((event, included) <- events) {
+  listenerBus.post(event)
+}
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events
+val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
+try {
+  val lines = readLines(logData)
+  val logStart = SparkListenerLogStart(SPARK_VERSION)
+  assert(lines.size === 19)
+  assert(lines(0).contains("SparkListenerLogStart"))
+  assert(lines(1).contains("SparkListenerApplicationStart"))
+  assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+  var i = 1
+  for ((event, included) <- events) {
+if (included) {
+  checkEvent(lines(i), event)
+  i += 1
+}
+  }
+} finally {
+  logData.close()
+}
+  }
+
+  /** Create a stage submitted event for the specified stage Id. */
+  private def createStageSubmittedEvent(stageId: Int) =
+SparkListenerStageSubmitted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create a stage completed event for the specified stage Id. */
+  private def createStageCompletedEvent(stageId: Int) =
+SparkListenerStageCompleted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create an executor added event for the specified executor Id. */
+  private def createExecutorAddedEvent(executorId: Int) =
+SparkListenerExecutorAdded(0L, executorId.toString, new 
ExecutorInfo("host1", 1, Map.empty))
+
+  /** Create an executor metrics update 

[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179978192
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// list of events and if they should be logged
+val events = Array(
+  (SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None), true),
+  (createExecutorAddedEvent(1), true),
+  (createExecutorAddedEvent(2), true),
+  (createStageSubmittedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), 
true), // onheap storage
+  (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), 
true), // JVM used
+  (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), 
true), // onheap unified
+  (createStageSubmittedEvent(1), true),
+  (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createStageCompletedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), 
true), // onheap execution
+  (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), 
true), // offheap execution
+  (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), 
true), // offheap storage
+  (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), 
false),
+  (createStageCompletedEvent(1), true),
+  (SparkListenerApplicationEnd(1000L), true))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+for ((event, included) <- events) {
+  listenerBus.post(event)
+}
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events
+val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
+try {
+  val lines = readLines(logData)
+  val logStart = SparkListenerLogStart(SPARK_VERSION)
+  assert(lines.size === 19)
+  assert(lines(0).contains("SparkListenerLogStart"))
+  assert(lines(1).contains("SparkListenerApplicationStart"))
+  assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+  var i = 1
+  for ((event, included) <- events) {
+if (included) {
+  checkEvent(lines(i), event)
+  i += 1
+}
+  }
+} finally {
+  logData.close()
+}
+  }
+
+  /** Create a stage submitted event for the specified stage Id. */
+  private def createStageSubmittedEvent(stageId: Int) =
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179978182
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -268,6 +268,9 @@ private class LiveExecutor(val executorId: String, 
_addTime: Long) extends LiveE
 
   def hasMemoryInfo: Boolean = totalOnHeap >= 0L
 
+  // peak values for executor level metrics
+  var peakExecutorMetrics = new PeakExecutorMetrics
--- End diff --

Yes, thanks for catching this, it should be val.

This is more properly part of SPARK-23431, and I can remove for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179978102
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

For a longer running stage, once it ramps up, hopefully there wouldn't be a 
lot of new peak values. Looking at a subset of our applications, the extra 
logging overhead has mostly been between 0.25% to 1%, but it can be 8%. 

By logging each peak value at the time they occur (and reinitializing when 
a stage starts), it's possible to tell which stages are active at the time, and 
it would potentially be possible to graph these changes on a timeline -- this 
information wouldn't be available if the metrics are only logged at stage end, 
and the times are lost.

Logging at stage end would limit the amount of extra logging. If we add 
more metrics (such as for offheap), then there could be more new peaks and more 
extra logging with the current approach. Excess logging is a concern, and I can 
move to stage end if the overhead is too much. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179802392
  
--- Diff: core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
---
@@ -654,6 +681,25 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
 assert(metrics1.bytesRead === metrics2.bytesRead)
   }
 
+  private def assertEquals(metrics1: Option[ExecutorMetrics], metrics2: 
Option[ExecutorMetrics]) {
+metrics1 match {
+  case Some(m1) =>
+metrics2 match {
+  case Some(m2) =>
+assert(m1.timestamp === m2.timestamp)
+assert(m1.jvmUsedMemory === m2.jvmUsedMemory)
+assert(m1.onHeapExecutionMemory === m2.onHeapExecutionMemory)
+assert(m1.offHeapExecutionMemory === m2.offHeapExecutionMemory)
+assert(m1.onHeapStorageMemory === m2.onHeapStorageMemory)
+assert(m1.offHeapStorageMemory === m2.offHeapStorageMemory)
+  case None =>
+assert(false)
+}
+  case None =>
+assert(metrics2.isEmpty)
--- End diff --

this  version looks correct, but I think the matching I mentioned above is 
a little cleaner.  And then you should be able to have 
`EventLoggingListenerSuite` jsut use this method rather than repeating.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179795171
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

I wouldn't think you'd want to log for every new peak, as I'd expect it 
would be natural for the peak to keep growing, so you'd just end up with a lot 
of logs.  I'd expect you'd just log the peak when the stage ended, or when the 
executor died.

the downside of that approach is that you never log a peak if the driver 
dies ... but then you've got to figure out the driver issue anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179797803
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// list of events and if they should be logged
+val events = Array(
+  (SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None), true),
+  (createExecutorAddedEvent(1), true),
+  (createExecutorAddedEvent(2), true),
+  (createStageSubmittedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), 
true), // onheap storage
+  (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), 
true), // JVM used
+  (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), 
true), // onheap unified
+  (createStageSubmittedEvent(1), true),
+  (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createStageCompletedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), 
true), // onheap execution
+  (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), 
true), // offheap execution
+  (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), 
true), // offheap storage
+  (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), 
false),
+  (createStageCompletedEvent(1), true),
+  (SparkListenerApplicationEnd(1000L), true))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+for ((event, included) <- events) {
+  listenerBus.post(event)
+}
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events
+val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
+try {
+  val lines = readLines(logData)
+  val logStart = SparkListenerLogStart(SPARK_VERSION)
+  assert(lines.size === 19)
+  assert(lines(0).contains("SparkListenerLogStart"))
+  assert(lines(1).contains("SparkListenerApplicationStart"))
+  assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+  var i = 1
+  for ((event, included) <- events) {
+if (included) {
+  checkEvent(lines(i), event)
+  i += 1
+}
+  }
+} finally {
+  logData.close()
+}
+  }
+
+  /** Create a stage submitted event for the specified stage Id. */
+  private def createStageSubmittedEvent(stageId: Int) =
--- End diff --

nit: multiline methods enclosed in `{}`, here and elsewhere, even if body 
is only one line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179796802
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -268,6 +268,9 @@ private class LiveExecutor(val executorId: String, 
_addTime: Long) extends LiveE
 
   def hasMemoryInfo: Boolean = totalOnHeap >= 0L
 
+  // peak values for executor level metrics
+  var peakExecutorMetrics = new PeakExecutorMetrics
--- End diff --

I think this can be a `val` -- its mutated, not replaced

Also I don't see any tests that this is updated correctly, both in a live 
app and when replayed from the limited logs?  OTOH, maybe you should just 
remove this from this change, and wait till SPARK-23431 as there you'll need to 
add something else to expose this for each executor / stage, which may be more 
important than just the overall total for the executor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179796239
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -93,6 +94,9 @@ private[spark] class EventLoggingListener(
   // Visible for tests only.
   private[scheduler] val logPath = getLogPath(logBaseDir, appId, 
appAttemptId, compressionCodecName)
 
+  // Peak metric values for each executor
+  private var peakExecutorMetrics = new mutable.HashMap[String, 
PeakExecutorMetrics]()
--- End diff --

you need to handle overlapping stages.  I think you actually need to key on 
both executor and stage, and on stage end, you only clear the metrics for that 
stage.

EDIT: ok after I went through everything, I think I see how this works -- 
since you log on every new peak, you'll also get a logged message for the 
earlier update.  But as I mention below, this strategy seems like it'll result 
in a lot of extra logging.  Maybe I'm wrong, though, would be great to see how 
much the logs grow this way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179803848
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -772,6 +772,12 @@ private[spark] class Executor(
 val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
 val curGCTime = computeTotalGcTime()
 
+// get executor level memory metrics
+val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(),
+  ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
--- End diff --

What about including the jvms direct & memory mapped usage as well? see 
https://issues.apache.org/jira/browse/SPARK-22483


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179797706
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// list of events and if they should be logged
+val events = Array(
+  (SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None), true),
+  (createExecutorAddedEvent(1), true),
+  (createExecutorAddedEvent(2), true),
+  (createStageSubmittedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), 
true), // onheap storage
+  (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), 
true), // JVM used
+  (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), 
true), // onheap unified
+  (createStageSubmittedEvent(1), true),
+  (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createStageCompletedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), 
true), // onheap execution
+  (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), 
true), // offheap execution
+  (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), 
true), // offheap storage
+  (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), 
false),
+  (createStageCompletedEvent(1), true),
+  (SparkListenerApplicationEnd(1000L), true))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+for ((event, included) <- events) {
+  listenerBus.post(event)
+}
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events
+val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
+try {
+  val lines = readLines(logData)
+  val logStart = SparkListenerLogStart(SPARK_VERSION)
+  assert(lines.size === 19)
+  assert(lines(0).contains("SparkListenerLogStart"))
+  assert(lines(1).contains("SparkListenerApplicationStart"))
+  assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+  var i = 1
+  for ((event, included) <- events) {
+if (included) {
+  checkEvent(lines(i), event)
+  i += 1
+}
+  }
+} finally {
+  logData.close()
+}
+  }
+
+  /** Create a stage submitted event for the specified stage Id. */
+  private def createStageSubmittedEvent(stageId: Int) =
+SparkListenerStageSubmitted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create a stage completed event for the specified stage Id. */
+  private def createStageCompletedEvent(stageId: Int) =
+SparkListenerStageCompleted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create an executor added event for the specified executor Id. */
+  private def createExecutorAddedEvent(executorId: Int) =
+SparkListenerExecutorAdded(0L, executorId.toString, new 
ExecutorInfo("host1", 1, Map.empty))
+
+  /** Create an executor metrics update 

[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179801207
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// list of events and if they should be logged
+val events = Array(
+  (SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None), true),
+  (createExecutorAddedEvent(1), true),
+  (createExecutorAddedEvent(2), true),
+  (createStageSubmittedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), 
true), // onheap storage
+  (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), 
true), // JVM used
+  (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), 
true), // onheap unified
+  (createStageSubmittedEvent(1), true),
+  (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createStageCompletedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), 
true), // onheap execution
+  (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), 
true), // offheap execution
+  (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), 
true), // offheap storage
+  (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), 
false),
+  (createStageCompletedEvent(1), true),
+  (SparkListenerApplicationEnd(1000L), true))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+for ((event, included) <- events) {
+  listenerBus.post(event)
+}
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events
+val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
+try {
+  val lines = readLines(logData)
+  val logStart = SparkListenerLogStart(SPARK_VERSION)
+  assert(lines.size === 19)
+  assert(lines(0).contains("SparkListenerLogStart"))
+  assert(lines(1).contains("SparkListenerApplicationStart"))
+  assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+  var i = 1
+  for ((event, included) <- events) {
+if (included) {
+  checkEvent(lines(i), event)
+  i += 1
+}
+  }
+} finally {
+  logData.close()
+}
+  }
+
+  /** Create a stage submitted event for the specified stage Id. */
+  private def createStageSubmittedEvent(stageId: Int) =
+SparkListenerStageSubmitted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create a stage completed event for the specified stage Id. */
+  private def createStageCompletedEvent(stageId: Int) =
+SparkListenerStageCompleted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create an executor added event for the specified executor Id. */
+  private def createExecutorAddedEvent(executorId: Int) =
+SparkListenerExecutorAdded(0L, executorId.toString, new 
ExecutorInfo("host1", 1, Map.empty))
+
+  /** Create an executor metrics update 

[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-03-29 Thread edwinalu
GitHub user edwinalu opened a pull request:

https://github.com/apache/spark/pull/20940

[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in 
executors REST API

Add new executor level memory metrics (JVM used memory, on/off heap 
execution memory, on/off heap storage memory), and expose via the executors 
REST API. This information will help provide insight into how executor and 
driver JVM memory is used, and for the different memory regions. It can be used 
to help determine good values for spark.executor.memory, spark.driver.memory, 
spark.memory.fraction, and spark.memory.storageFraction.

## What changes were proposed in this pull request?

An ExecutorMetrics class is added, with jvmUsedMemory, 
onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and 
offHeapStorageMemory. The new ExecutorMetrics is sent by executors to the 
driver as part of Heartbeat. A heartbeat is added for the driver as well, to 
collect these metrics for the driver.

The EventLoggingListener logs ExecutorMetricsUpdate events if there is a 
new peak value for any of the memory metrics for an executor and stage. Only 
the ExecutorMetrics will be logged, and not the TaskMetrics, to minimize 
additional logging.

The AppStatusListener records the peak values for each memory metric.

The new memory metrics are added to the executors REST API.

## How was this patch tested?

New unit tests have been added. This was also tested on our cluster.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/edwinalu/spark SPARK-23429

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20940.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20940


commit 48eab1d23fcebb315bbe787bed8e468c7fc69233
Author: Edwina Lu 
Date:   2018-03-09T23:39:36Z

SPARK-23429: Add executor memory metrics to heartbeat and expose in 
executors REST API

Add new executor level memory metrics (JVM used memory, on/off heap 
execution memory, on/off heap storage
memory), and expose via the executors REST API. This information will help 
provide insight into how executor
and driver JVM memory is used, and for the different memory regions. It can 
be used to help determine good
values for spark.executor.memory, spark.driver.memory, 
spark.memory.fraction, and spark.memory.storageFraction.

Add an ExecutorMetrics class, with jvmUsedMemory, onHeapExecutionMemory, 
offHeapExecutionMemory,
onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics will 
be sent by executors to the
driver as part of Heartbeat. A heartbeat will be added for the driver as 
well, to collect these metrics
for the driver.

Modify the EventLoggingListener to log ExecutorMetricsUpdate events if 
there is a new peak value for any
of the memory metrics for an executor and stage. Only the ExecutorMetrics 
will be logged, and not the
TaskMetrics, to minimize additional logging.

Modify the AppStatusListener to record the peak values for each memory 
metric.

Add the new memory metrics to the executors REST API.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org