[GitHub] spark issue #22884: [SPARK-23429][CORE][FOLLOWUP] MetricGetter should rename...

2018-10-30 Thread edwinalu
Github user edwinalu commented on the issue:

https://github.com/apache/spark/pull/22884
  
Thanks, lgtm.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-10-09 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r223905848
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -28,35 +30,63 @@ import org.apache.spark.metrics.ExecutorMetricType
 @DeveloperApi
 class ExecutorMetrics private[spark] extends Serializable {
 
-  // Metrics are indexed by MetricGetter.values
-  private val metrics = new Array[Long](ExecutorMetricType.values.length)
+  private var metrics = mutable.Map.empty[String, Long]
--- End diff --

There's a fair amount of overhead in storing values in a Map, data 
structure and the String keys. All this would then be sent in the heartbeat, so 
this would add overhead to the heartbeats. The array representation is a lot 
more compact. Unfortunately there's no EnumMap in Scala. I'd prefer to keep the 
array storage, and use map for returning and passing in values.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-10-09 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r223766597
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+  private val ptree = mutable.Map[ Int, Set[Int]]()
+
+  var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 
0, 0, 0, 0, 0)
+  private var latestJVMVmemTotal = 0L
+  private var latestJVMRSSTotal = 0L
+  private var latestPythonVmemTotal = 0L
+  private var latestPythonRSSTotal = 0L
+  private var latestOtherVmemTotal = 0L
+  private var latestOtherRSSTotal = 0L
+
+  computeProcessTree()
+
+  private def isProcfsAvailable: Boolean = {
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+val shouldLogStageExecutorMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+val shouldLogStageExecutorProcessTreeMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val length = 10
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException => logDebug("IO Exception when trying to 
compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+val cmd = Array("getconf", "PAGESIZE")
+val out2 = Utils.executeAndGetOutput(cmd)
+return Integer.parseInt(out2.split("\n")(0))
+  }
+
+  private def computeProcessTree(): Unit = {
+if (!isAvailable || testing) {
+  return
+}
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
+ptree += (p -> c.toSet)
 

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-10-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r223467439
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+  private val ptree = mutable.Map[ Int, Set[Int]]()
+
+  var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 
0, 0, 0, 0, 0)
+  private var latestJVMVmemTotal = 0L
+  private var latestJVMRSSTotal = 0L
+  private var latestPythonVmemTotal = 0L
+  private var latestPythonRSSTotal = 0L
+  private var latestOtherVmemTotal = 0L
+  private var latestOtherRSSTotal = 0L
+
+  computeProcessTree()
+
+  private def isProcfsAvailable: Boolean = {
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+val shouldLogStageExecutorMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+val shouldLogStageExecutorProcessTreeMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val length = 10
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException => logDebug("IO Exception when trying to 
compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+val cmd = Array("getconf", "PAGESIZE")
+val out2 = Utils.executeAndGetOutput(cmd)
+return Integer.parseInt(out2.split("\n")(0))
+  }
+
+  private def computeProcessTree(): Unit = {
+if (!isAvailable || testing) {
+  return
+}
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
+ptree += (p -> c.toSet)
 

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-10-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r223466260
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -59,6 +60,43 @@ case object JVMOffHeapMemory extends ExecutorMetricType {
   }
 }
 
+case object ProcessTreeJVMRSSMemory extends ExecutorMetricType {
+  override private[spark] def getMetricValue(memoryManager: 
MemoryManager): Long = {
+ExecutorMetricType.pTreeInfo.updateAllMetrics()
--- End diff --

Sorry for the delayed response. Thanks for adding the total memory metrics 
-- these will be very useful. Agreed that doing the work once is better, but 
that having ProcessTreeJVMRSSMemory.getMetricValue() update all the metrics is 
confusing, especially if a user at some point wants to call getMetricValue() 
for one of the other metrics, and not ProcessTreeJVMRSSMemory.

@squito 's #1 is probably the easiest to make the change for with the 
existing code. However, #2 with @mccheah's suggestion to return Map sounds 
best/cleanest as an API, with @dhruve 's suggestion to consolidate into 
ProcessTreeMemory -- I prefer this approach as well.

Right now the call for getMetricValue is done in 
Heartbeater.getCurrentMetrics(), and it's mapping ExecutorMetricType.values to 
the array of actual values. Translating the returned maps to an array (with 
index mapping to name rather than ExecutorMetricType) will involve some more 
code. In retrospect, getting the current metrics is probably better done by 
ExecutorMetrics iteself, rather than having Heartbeater exposed to the 
implementation details -- would you be able to move the logic there?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-09-07 Thread edwinalu
Github user edwinalu commented on the issue:

https://github.com/apache/spark/pull/21221
  
Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-16 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r210691276
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -69,6 +69,11 @@ package object config {
 .bytesConf(ByteUnit.KiB)
 .createWithDefaultString("100k")
 
+  private[spark] val EVENT_LOG_STAGE_EXECUTOR_METRICS =
+ConfigBuilder("spark.eventLog.logStageExecutorMetrics.enabled")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

That would be safer. I'll change to false, and we can change change to true 
after people have had a chance to test it out.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-16 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r210690505
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -216,8 +217,7 @@ private[spark] class Executor(
 
   def stop(): Unit = {
 env.metricsSystem.report()
-heartbeater.shutdown()
-heartbeater.awaitTermination(10, TimeUnit.SECONDS)
+heartbeater.stop()
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209772320
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,31 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
--- End diff --

The rest of the file is using System.nanoTime() -- it seems more consistent 
to keep it the same. Clock has getTimeMillis(), although we could always 
multiply by 1000, not sure if the precision would matter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209771443
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,31 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
--- End diff --

No, these don't need to be explicitly defined.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209770605
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+/**
+ * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
+ */
+sealed trait ExecutorMetricType {
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
+  private[spark] val name = 
getClass().getName().stripSuffix("$").split("""\.""").last
+}
+
+private[spark] abstract class MemoryManagerExecutorMetricType(
+f: MemoryManager => Long) extends ExecutorMetricType {
+  override private[spark] def getMetricValue(memoryManager: 
MemoryManager): Long = {
+f(memoryManager)
+  }
+}
+
+private[spark]abstract class MBeanExecutorMetricType(mBeanName: String)
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209770476
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
+
+  // the first element is initialized to -1, indicating that the values 
for the array
+  // haven't been set yet.
+  metrics(0) = -1
+
+  /** Returns the value for the specified metricType. */
+  def getMetricValue(metricType: ExecutorMetricType): Long = {
+metrics(ExecutorMetricType.metricIdxMap(metricType))
+  }
+
+  /** Returns true if the values for the metrics have been set, false 
otherwise. */
+  def isSet(): Boolean = metrics(0) > -1
+
+  private[spark] def this(metrics: Array[Long]) {
+this()
+Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, 
this.metrics.size))
+  }
+
+  /**
+   * Constructor: create the ExecutorMetrics with the values specified.
+   *
+   * @param executorMetrics map of executor metric name to value
+   */
+  private[spark] def this(executorMetrics: Map[String, Long]) {
+this()
+(0 until ExecutorMetricType.values.length).foreach { idx =>
+  metrics(idx) = 
executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
+}
+  }
+
+  /**
+   * Compare the specified executor metrics values with the current 
executor metric values,
+   * and update the value for any metrics where the new value for the 
metric is larger.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  private[spark] def compareAndUpdatePeakValues(executorMetrics: 
ExecutorMetrics): Boolean = {
+var updated: Boolean = false
--- End diff --

Removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209770404
  
--- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
---
@@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager(
 onHeapStorageMemoryPool.memoryUsed + 
offHeapStorageMemoryPool.memoryUsed
   }
 
+  /**
+   *  On heap execution memory currently in use, in bytes.
+   */
+  final def onHeapExecutionMemoryUsed: Long = 
onHeapExecutionMemoryPool.memoryUsed
--- End diff --

That's true -- I'll change the methods to synchronized.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209770440
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
+
+  // the first element is initialized to -1, indicating that the values 
for the array
+  // haven't been set yet.
+  metrics(0) = -1
+
+  /** Returns the value for the specified metricType. */
+  def getMetricValue(metricType: ExecutorMetricType): Long = {
+metrics(ExecutorMetricType.metricIdxMap(metricType))
+  }
+
+  /** Returns true if the values for the metrics have been set, false 
otherwise. */
+  def isSet(): Boolean = metrics(0) > -1
+
+  private[spark] def this(metrics: Array[Long]) {
+this()
+Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, 
this.metrics.size))
+  }
+
+  /**
+   * Constructor: create the ExecutorMetrics with the values specified.
+   *
+   * @param executorMetrics map of executor metric name to value
+   */
+  private[spark] def this(executorMetrics: Map[String, Long]) {
+this()
+(0 until ExecutorMetricType.values.length).foreach { idx =>
+  metrics(idx) = 
executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
+}
+  }
+
+  /**
+   * Compare the specified executor metrics values with the current 
executor metric values,
+   * and update the value for any metrics where the new value for the 
metric is larger.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  private[spark] def compareAndUpdatePeakValues(executorMetrics: 
ExecutorMetrics): Boolean = {
+var updated: Boolean = false
+
+(0 until ExecutorMetricType.values.length).foreach { idx =>
+   if ( executorMetrics.metrics(idx) > metrics(idx)) {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723205
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
--- End diff --

Is it likely that users would want to access the individual fields, rather 
than iterating through all? The 1st option would be a bit nicer if so. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723188
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -296,7 +338,7 @@ private[spark] object EventLoggingListener extends 
Logging {
   private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
 
   // A cache for compression codecs to avoid creating the same codec many 
times
-  private val codecMap = new mutable.HashMap[String, CompressionCodec]
+  private val codecMap = new HashMap[String, CompressionCodec]
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723177
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +103,50 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+  extends JsonDeserializer[Option[ExecutorMetrics]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): 
Option[ExecutorMetrics] = {
+val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
+  new TypeReference[Option[Map[String, java.lang.Long]]] {})
+metricsMap match {
+  case Some(metrics) =>
+Some(new ExecutorMetrics(metrics))
+  case None => None
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723173
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +103,50 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+  extends JsonDeserializer[Option[ExecutorMetrics]] {
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723165
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,34 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
+
+// check if there is a new peak value for any of the executor level 
memory metrics,
+// while reading from the log. SparkListenerStageExecutorMetrics are 
only processed
+// when reading logs.
+liveExecutors.get(executorMetrics.execId)
+  .orElse(deadExecutors.get(executorMetrics.execId)) match {
+  case Some(exec) =>
+ if 
(exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics))
 {
+  maybeUpdate(exec, now)
--- End diff --

Yes, this is called on replay. I've removed the "maybeUpdate".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723141
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,214 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test stage executor metrics logging functionality. This checks that 
peak
+   * values from SparkListenerExecutorMetricsUpdate events during a stage 
are
+   * logged in a StageExecutorMetrics event for each executor at stage 
completion.
+   */
+  private def testStageExecutorMetricsEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "stageExecutorMetrics-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected StageExecutorMetrics, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerStageExecutorMetrics] =
--- End diff --

Moved to after the replay.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723114
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala ---
@@ -217,7 +218,12 @@ class ReplayListenerSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSp
 
 // Verify the same events are replayed in the same order
 assert(sc.eventLogger.isDefined)
-val originalEvents = sc.eventLogger.get.loggedEvents
+val originalEvents = sc.eventLogger.get.loggedEvents.filter { e =>
+  JsonProtocol.sparkEventFromJson(e) match {
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723098
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -691,7 +723,19 @@ private[spark] object JsonProtocol {
 (json \ "Accumulator 
Updates").extract[List[JValue]].map(accumulableInfoFromJson)
   (taskId, stageId, stageAttemptId, updates)
 }
-SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates)
+val executorUpdates = jsonOption(json \ "Executor Metrics Updated") 
match {
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722892
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +103,50 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+  extends JsonDeserializer[Option[ExecutorMetrics]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): 
Option[ExecutorMetrics] = {
+val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
+  new TypeReference[Option[Map[String, java.lang.Long]]] {})
+metricsMap match {
+  case Some(metrics) =>
+Some(new ExecutorMetrics(metrics))
+  case None => None
+}
+  }
+}
+/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with 
metric name as key */
+private[spark] class ExecutorMetricsJsonSerializer
+  extends JsonSerializer[Option[ExecutorMetrics]] {
--- End diff --

It doesn't serialize -- nothing is added to the JSON.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722887
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +103,50 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+  extends JsonDeserializer[Option[ExecutorMetrics]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): 
Option[ExecutorMetrics] = {
+val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
+  new TypeReference[Option[Map[String, java.lang.Long]]] {})
+metricsMap match {
+  case Some(metrics) =>
+Some(new ExecutorMetrics(metrics))
+  case None => None
+}
+  }
+}
+/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with 
metric name as key */
+private[spark] class ExecutorMetricsJsonSerializer
+  extends JsonSerializer[Option[ExecutorMetrics]] {
+  override def serialize(
+  metrics: Option[ExecutorMetrics],
+  jsonGenerator: JsonGenerator,
+  serializerProvider: SerializerProvider): Unit = {
+metrics match {
+  case Some(m) =>
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722865
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -302,10 +305,10 @@ private class LiveExecutor(val executorId: String, 
_addTime: Long) extends LiveE
   Option(removeReason),
   executorLogs,
   memoryMetrics,
-  blacklistedInStages)
+  blacklistedInStages,
+  if (peakExecutorMetrics.isSet()) Some(peakExecutorMetrics) else None)
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722773
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -155,7 +160,14 @@ private[spark] class EventLoggingListener(
   }
 
   // Events that do not trigger a flush
-  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= logEvent(event)
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
+logEvent(event)
+if (shouldLogStageExecutorMetrics) {
+  // record the peak metrics for the new stage
+  liveStageExecutorMetrics.put((event.stageInfo.stageId, 
event.stageInfo.attemptNumber()),
+new HashMap[String, ExecutorMetrics]())
--- End diff --

Modified.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722724
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -93,6 +95,9 @@ private[spark] class EventLoggingListener(
   // Visible for tests only.
   private[scheduler] val logPath = getLogPath(logBaseDir, appId, 
appAttemptId, compressionCodecName)
 
+  // map of (stageId, stageAttempt), to peak executor metrics for the stage
+  private val liveStageExecutorMetrics = HashMap[(Int, Int), 
HashMap[String, ExecutorMetrics]]()
--- End diff --

Yes, mutable Maps will work, and better to go with the more generic version.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722674
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+/**
+ * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
+ */
+sealed trait ExecutorMetricType {
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
--- End diff --

Let's stick with the current version for now, and revisit if we end up 
adding more metric types.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722621
  
--- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
---
@@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager(
 onHeapStorageMemoryPool.memoryUsed + 
offHeapStorageMemoryPool.memoryUsed
   }
 
+  /**
+   *  On heap execution memory currently in use, in bytes.
+   */
+  final def onHeapExecutionMemoryUsed: Long = 
onHeapExecutionMemoryPool.memoryUsed
--- End diff --

Since it's access just one Long field, I don't think so. The other methods, 
which are synchronized are summing 2 fields.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-08-02 Thread edwinalu
Github user edwinalu commented on the issue:

https://github.com/apache/spark/pull/21221
  
@mccheah and @squito , thanks for reviewing and commenting, and sorry for 
the delay. I'll reply and update this weekend.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-25 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r205095575
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,215 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test stage executor metrics logging functionality. This checks that 
peak
+   * values from SparkListenerExecutorMetricsUpdate events during a stage 
are
+   * logged in a StageExecutorMetrics event for each executor at stage 
completion.
+   */
+  private def testStageExecutorMetricsEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "stageExecutorMetrics-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected StageExecutorMetrics, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerStageExecutorMetrics] =
+  Map(
+((0, "1"),
+  new SparkListenerStageExecutorMetrics("1", 0, 0,
+  Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))),
+((0, "2"),
+  new SparkListenerStageExecutorMetrics("2", 0, 0,
+  Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))),
+((1, "1"),
+  new SparkListenerStageExecutorMetrics("1", 1, 0,
+  Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))),
+((1, "2"),
+  new SparkListenerStageExecutorMetrics("2", 1, 0,
+  Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L
+
+// Events to post.
+val events = Array(
+  SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None),
+  createExecutorAddedEvent(1),
+  createExecutorAddedEvent(2),
+  createStageSubmittedEvent(0),
+  // receive 3 metric updates from each executor with just stage 0 
running,
+  // with different peak updates for each executor
+  createExecutorMetricsUpdateEvent(1,
+  Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+  Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)),
+  // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6
+  createExecutorMetricsUpdateEvent(1,
+  Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)),
+  // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6
+  createExecutorMetricsUpdateEvent(2,
+  Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)),
+  // exec 1: new stage 0 peaks for metrics at indexes: 5, 7
+  createExecutorMetricsUpdateEvent(1,
+  Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)),
+  // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8
+  createExecutorMetricsUpdateEvent(2,
+  Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)),
+  // now start stage 1, one more metric update for each executor, and 
new
+  // peaks for some stage 1 metrics (as listed), initialize stage 1 
peaks
+  createStageSubmittedEvent(1),
+  // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7
--- End diff --

Stage 0 is still running, and these are new peaks for that stage. It is 
also initializing all the stage 1 metric values, since these are the first 
executor metrics seen for stage 1 (I'll add this to the comments).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-18 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r203503691
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

This sounds like a good solution, with both a clean API, and also an 
efficient implementation that will be easier to add new metrics to. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-17 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r203122722
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

Adding getMetricValue() would abstract away the array implementation. 
Should MetricGetter/ExecutorMetricType be public?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-07 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r200826235
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

We can change back to using an ExecutorMetrics class in this case.

The plan was for any new metrics to be added to the end, so that there 
wouldn't be any change in ordering, and executorMetrics could be changed to 
immutable Seq[Long], but there would still be the issue of having to reference 
MetricGetter to find out how the metrics are indexed. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-28 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198815695
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.status.api.v1.PeakMemoryMetrics
+
+/**
+ * Records the peak values for executor level metrics. If 
jvmUsedHeapMemory is -1, then no
+ * values have been recorded yet.
+ */
+private[spark] class PeakExecutorMetrics {
--- End diff --

With ExecutorMetrics removed, it seems useful to have a class for tracking 
and setting peak metric values, that can be used by both EventLoggingListener 
and AppStatusListener.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198684121
  
--- Diff: project/MimaExcludes.scala ---
@@ -89,7 +89,13 @@ object MimaExcludes {
 
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"),
 
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.getValidationIndicatorCol"),
 
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.org$apache$spark$ml$param$shared$HasValidationIndicatorCol$_setter_$validationIndicatorCol_="),
-
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol")
+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"),
+
+// [SPARK-23429][CORE] Add executor memory metrics to heartbeat and 
expose in executors REST API
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.apply"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.this"),
+
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate$")
--- End diff --

Will move up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198683846
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,217 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
--- End diff --

Woops, that was left over from when it was ExecutorMetricsUpdated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198683408
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +102,48 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+val peakMemoryMetrics: Option[Array[Long]])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric 
name */
+class PeakMemoryMetricsDeserializer private[spark] extends 
JsonDeserializer[Option[Array[Long]]] {
--- End diff --

This is odd, but I can't seem to comment on your earlier comment. Regarding 
having a serializer/deserializer, I also don't have strong feelings -- it makes 
it more readable, but also takes up more space in the history log.

Regarding this comment, thanks, I hadn't realized the placement meant that 
it marked the constructor. It's meant for the class, and I'll move.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198682917
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -264,6 +282,11 @@ private[spark] trait SparkListenerInterface {
*/
   def onExecutorMetricsUpdate(executorMetricsUpdate: 
SparkListenerExecutorMetricsUpdate): Unit
 
+  /**
+   * Called when the driver reads stage executor metrics from the history 
log.
--- End diff --

Updated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198682980
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,29 @@ private[spark] class AppStatusListener(
 }
   }
 }
+event.executorUpdates.foreach { updates: Array[Long] =>
+  // check if there is a new peak value for any of the executor level 
memory metrics
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdate(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
+
+// check if there is a new peak value for any of the executor level 
memory metrics
--- End diff --

Unfortunately, yes. I've added some comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198682809
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/MetricGetter.scala 
---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+sealed trait MetricGetter {
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198682884
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +181,28 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each live 
executor,
+  // whether or not the executor is running tasks for the stage
+  val executorMap = liveStageExecutorMetrics.remove(
+(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+  executorMap.foreach {
+   executorEntry => {
+  for ((executorId, peakExecutorMetrics) <- executorEntry) {
--- End diff --

Yes, the naming is confusing. Changed to the 1st option.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198682779
  
--- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryManager
+import org.apache.spark.metrics.MetricGetter
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Creates a heartbeat thread which will call the specified 
reportHeartbeat function at
+ * intervals of intervalMs.
+ *
+ * @param memoryManager the memory manager for execution and storage 
memory.
+ * @param reportHeartbeat the heartbeat reporting function to call.
+ * @param name the thread name for the heartbeater.
+ * @param intervalMs the interval between heartbeats.
+ */
+private[spark] class Heartbeater(
+memoryManager: MemoryManager,
+reportHeartbeat: () => Unit,
+name: String,
+intervalMs: Long) extends Logging {
+  // Executor for the heartbeat task
+  private val heartbeater = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(name)
+
+  /** Schedules a task to report a heartbeat. */
+  private[spark] def start(): Unit = {
--- End diff --

Removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198682787
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1922,6 +1928,12 @@ class SparkContext(config: SparkConf) extends 
Logging {
 Utils.tryLogNonFatalError {
   _eventLogger.foreach(_.stop())
 }
+if(_heartbeater != null) {
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-18 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r196236364
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each live 
executor,
+  // whether or not the executor is running tasks for the stage
+  val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+  val executorMap = liveStageExecutorMetrics.remove(
+(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+  executorMap.foreach {
+   executorEntry => {
+  for ((executorId, peakExecutorMetrics) <- executorEntry) {
+val executorMetrics = new ExecutorMetrics(-1, 
peakExecutorMetrics.metrics)
--- End diff --

Just discussed with @squito -- thanks! Logging -1 for timestamp is 
confusing and hacky. 

Some items discussed:

For ExecutorMetrics, timestamp can be optional, or it can be removed 
completely and replaced by Array[Long], with comments explaining how the 
metrics work.

For logging, stage ID could be added as part of the executorMetrics in 
SparkListenerExecutorMetricsUpdate, but this is awkward, since this information 
isn't used as part of the Heartbeat, and only for logging. While the 
application is running, there could be multiple stages running when the metrics 
are gathered, so specifying 1 stage ID doesn't make sense. For logging, the 
metrics are the peak values for a particular stage, so are associated with a 
stage.

Another option is to add the information to SparkListenerStageCompleted, 
but this would bloat the event if there are many executors.

A third option is to create a new event, SparkListenerStageExecutorMetrics, 
which would have the executor ID, stage ID and attempt, and peak metrics. 

I'll give the 3rd option a try, and will add details to the design doc once 
this is more finalized.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-17 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195957438
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each live 
executor,
+  // whether or not the executor is running tasks for the stage
+  val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+  val executorMap = liveStageExecutorMetrics.remove(
+(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+  executorMap.foreach {
+   executorEntry => {
+  for ((executorId, peakExecutorMetrics) <- executorEntry) {
+val executorMetrics = new ExecutorMetrics(-1, 
peakExecutorMetrics.metrics)
--- End diff --

The last timestamp seems like it wouldn't have enough information, since 
peaks for different metrics could occur at different times, and with different 
combinations of stages running. 

Only -1 would be logged. Right now it's writing out 
SparkListenerExecutorMetricsUpdate events, which contain ExecutorMetrics, which 
has timestamp. Do you think timestamp should be removed from ExecutorMetrics? 
It seems good to have the timestamp for when the metrics were gathered, but 
it's not being exposed at this point.

For both the history server and the live UI, the goal is to show the peak 
value for each metric each executor. For the executors tab, this is the peak 
value of each metric over the lifetime of the executor. For the stages tab, 
this is the peak value for each metric for that executor while the stage is 
running. The executor could be processing tasks for other stages as well, if 
there are concurrent stages, or no tasks for this stage if it isn't assigned 
any tasks, but it is the peak values between the time the stage starts and ends.

Can you describe how the stage level metrics would work the last timestamp 
for any peak metric? Would there be a check to see if the event is being read 
from the history log?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-15 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195892287
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,222 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected ExecutorMetricsUpdate, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerExecutorMetricsUpdate] =
--- End diff --

I'll add a check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-15 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195892271
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +101,53 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+val peakMemoryMetrics: Option[Array[Long]])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric 
name */
+class PeakMemoryMetricsDeserializer extends 
JsonDeserializer[Option[Array[Long]]] {
--- End diff --

Yes, changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-15 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195892263
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +101,53 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+val peakMemoryMetrics: Option[Array[Long]])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric 
name */
+class PeakMemoryMetricsDeserializer extends 
JsonDeserializer[Option[Array[Long]]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): Option[Array[Long]] 
= {
+val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, 
Object]]])
--- End diff --

That works and is much cleaner, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-15 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195892278
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,222 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected ExecutorMetricsUpdate, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerExecutorMetricsUpdate] =
+  Map(
+((0, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L,
+  Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 
20L,
+((0, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L,
+  Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L,
+((1, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L,
+  Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L,
+((1, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L,
+  Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L)
+
+// Events to post.
+val events = Array(
+  SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None),
+  createExecutorAddedEvent(1),
+  createExecutorAddedEvent(2),
+  createStageSubmittedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
--- End diff --

Adding comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-15 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195886625
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each live 
executor,
+  // whether or not the executor is running tasks for the stage
+  val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+  val executorMap = liveStageExecutorMetrics.remove(
+(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+  executorMap.foreach {
+   executorEntry => {
+  for ((executorId, peakExecutorMetrics) <- executorEntry) {
+val executorMetrics = new ExecutorMetrics(-1, 
peakExecutorMetrics.metrics)
--- End diff --

This is confusing, especially since the current code does not have the 
stage level metrics, just executor level. 

The -1 wouldn't be replaced. PeakExecutorMetrics only tracks the peak 
metric values for each executor (and later each executor per stage) and doesn't 
have a timestamp.

If there is a local max (500), which is the max between T3 and T5, it would 
be logged at time T5, even if it happens at T3.5.

In actual event order, what the driver sees when the application is running:
T1: start of stage 1
T2: value of 1000 for metric m1
T3: start of stage 2
T3.5: peak value of 500 for metric m1
T4: stage 1 ends
T5: stage 2 ends

Suppose that 1000 (seen at T2) is the peak value of m1 between T1 and T4, 
so it is the peak value seen while stage 1 is running. The m1=1000 value will 
be dumped as the max value in an executorMetricsUpdate event right before the 
stage 1 end event is logged. Also suppose that 500 (seen at T3.5 is the peak 
value of m1 between T3 and T5, so it is the peak value seen while stage 2 is 
running. The m1=500 value will be dumped as the max value in an 
executorMetricsUpdate right before the stage 2 end event is logged.

The generated Spark history log would have the following order of events:

Start of stage 1
Start of stage 2
executorMetricsUpdate with m1=1000
end of stage 1
executorMetricsUpdate with m1=500
end of stage 2

When the Spark history server is reading the log, it will will create the 
peakExecutorMetrics for stage 2 when stage 2 starts, which is before it sees 
the executorMetricsUpdate with m1=1000, and so will store m1=1000 as the 
current peak value. When it later sees the executorMetricsUpdate with m1=500, 
it needs to overwrite the m1 value (and set to 500), not compare and update to 
the max value (which would be 1000). The -1 would indicate that the event is 
coming from the Spark history log, is a peak value for the stage just about to 
complete, and should overwrite any previous values.

If the timestamp is set to a positive value, then it will do a compare and 
update to the max value instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195543999
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +101,53 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+val peakMemoryMetrics: Option[Array[Long]])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric 
name */
+class PeakMemoryMetricsDeserializer extends 
JsonDeserializer[Option[Array[Long]]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): Option[Array[Long]] 
= {
+val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, 
Object]]])
--- End diff --

Let me know if there's something else to try -- I'd also rather avoid the 
Map[String, Object].


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195543711
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +101,53 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+val peakMemoryMetrics: Option[Array[Long]])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric 
name */
+class PeakMemoryMetricsDeserializer extends 
JsonDeserializer[Option[Array[Long]]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): Option[Array[Long]] 
= {
+val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, 
Object]]])
--- End diff --

Nope, it's still reading it as an Int, even with java.lang.Long, otherwise 
that would have been cleaner.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195542491
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -304,6 +305,11 @@ class SparkContext(config: SparkConf) extends Logging {
 _dagScheduler = ds
   }
 
+  private[spark] def heartbeater: Heartbeater = _heartbeater
+  private[spark] def heartbeater_=(hb: Heartbeater): Unit = {
--- End diff --

These aren't used -- I'll remove.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195542018
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +101,53 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+val peakMemoryMetrics: Option[Array[Long]])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric 
name */
+class PeakMemoryMetricsDeserializer extends 
JsonDeserializer[Option[Array[Long]]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): Option[Array[Long]] 
= {
+val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, 
Object]]])
+metricsMap match {
+  case Some(metrics) =>
+Some(MetricGetter.values.map { m =>
+  metrics.getOrElse (m.name, 0L) match {
+case intVal: Int => intVal.toLong
+case longVal: Long => longVal
+  }
+}.toArray)
+  case None => None
+}
+  }
+}
+
+/** serializer for peakMemoryMetrics: convert array to map with metric 
name as key */
+class PeakMemoryMetricsSerializer extends 
JsonSerializer[Option[Array[Long]]] {
+  override def serialize(
+  metrics: Option[Array[Long]],
+  jsonGenerator: JsonGenerator,
+  serializerProvider: SerializerProvider): Unit = {
+metrics match {
+  case Some(m) =>
+val metricsMap = (0 until MetricGetter.values.length).map { idx =>
--- End diff --

It's still being used in JsonProtocol.executorMetricsToJson -- let me know 
if you'd like me to convert that to use values instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195541136
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +272,18 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
+if (shouldLogExecutorMetricsUpdates) {
+  // For the active stages, record any new peak values for the memory 
metrics for the executor
+  event.executorUpdates.foreach { executorUpdates =>
+liveStageExecutorMetrics.values.foreach { peakExecutorMetrics =>
+  val peakMetrics = peakExecutorMetrics.getOrElseUpdate(
+event.execId, new PeakExecutorMetrics())
+  peakMetrics.compareAndUpdate(executorUpdates)
--- End diff --

What would be the right timestamp? Peaks for different metrics could have 
different timestamps.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195539837
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
--- End diff --

Tracking task start and end would be some amount of overhead. If it's a 
relatively unlikely corner case, and unlikely to have much impact on the 
numbers, it may be better to leave as is. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195538848
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each live 
executor,
+  // whether or not the executor is running tasks for the stage
+  val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+  val executorMap = liveStageExecutorMetrics.remove(
+(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+  executorMap.foreach {
+   executorEntry => {
+  for ((executorId, peakExecutorMetrics) <- executorEntry) {
+val executorMetrics = new ExecutorMetrics(-1, 
peakExecutorMetrics.metrics)
--- End diff --

We need to pass in a value for timestamp, but there isn't really one for 
the peak metrics, since times for each peak could be different. 

When processing, -1 will help indicate that the event is coming from the 
history log, and contains the peak values for the stage that is just ending. 
When updating the stage executor peaks (peak executor values stored for each 
active stage), we can replace all of the peak executor metric values instead of 
updating with the max of current and new values for each metric. 

As an example, suppose there is the following scenario:
T1: start of stage 1
T2: peak value of 1000 for metric m1
T3: start of stage 2
T4: stage 1 ends, and peak metric values for stage 1 are logged, including 
m1=1000
T5: stage 2 ends, and peak metric values for stage 2 are logged.

If values for m1 are < 1000 between T3 (start of stage 2) and T5 (end of 
stage 2), and say that the highest value for m1 during that period is 500, then 
we want the peak value for m1 for stage 2 to show as 500.

There would be an ExecutorMetricsUpdate event logged (and then read) at T4 
(end of stage 1), with m1=1000, which is after T3 (start of stage 2). If when 
reading the history log, we set the stage 2 peakExecutorMetrics to the max of 
current or new values from ExecutorMetricsUpdate, then the value for stage 2 
would remain at 1000. However, we want it to be replaced by the value of 500 
when it gets the ExecutorMetricsUpdate logged at T5 (end of stage 2). During 
processing of ExecutorMetricsUpdate, for the stage level metrics, it will 
replace all the peakExecutorMetrics if timestamp is -1.

I can add some comments for this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195534314
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -93,6 +96,9 @@ private[spark] class EventLoggingListener(
   // Visible for tests only.
   private[scheduler] val logPath = getLogPath(logBaseDir, appId, 
appAttemptId, compressionCodecName)
 
+  // map of live stages, to peak executor metrics for the stage
+  private val liveStageExecutorMetrics = HashMap[(Int, Int), 
HashMap[String, PeakExecutorMetrics]]()
--- End diff --

modified.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195533972
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1751,7 +1753,7 @@ class DAGScheduler(
 messageScheduler.shutdownNow()
 eventProcessLoop.stop()
 taskScheduler.stop()
-  }
+   }
--- End diff --

fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-06-13 Thread edwinalu
Github user edwinalu commented on the issue:

https://github.com/apache/spark/pull/21221
  
@squito , I've replaced PeakMemoryMetrics with just an Array[Long], and 
added a custom serializer an deserializer for it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-06-11 Thread edwinalu
Github user edwinalu commented on the issue:

https://github.com/apache/spark/pull/21221
  
@squito For PeakMemoryMetrics in api.scala, changing to the array gives 
REST API output of:

  "peakMemoryMetrics" : {
"metrics" : [ 755008624, 100519936, 0, 0, 47962185, 0, 47962185, 0, 
98230, 0 ]
  }

instead of:

  "peakMemoryMetrics" : {
"jvmUsedHeapMemory" : 629553808,
"jvmUsedNonHeapMemory" : 205304696,
"onHeapExecutionMemory" : 0,
"offHeapExecutionMemory" : 0,
"onHeapStorageMemory" : 905801,
"offHeapStorageMemory" : 0,
"onHeapUnifiedMemory" : 905801,
"offHeapUnifiedMemory" : 0,
"directMemory" : 397602,
"mappedMemory" : 0
  }

Would it be OK to revert back to the original version of PeakMemoryMetrics, 
where each field is listed as a separate element? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-06-10 Thread edwinalu
Github user edwinalu commented on the issue:

https://github.com/apache/spark/pull/21221
  
@squito , I'm modifying ExecutorMetrics to take in the metrics array -- 
this will be easier for tests where we pass in set values, and seems fine for 
the actual code. It will check that the length of the passed in array is the 
same as MetricGetter.values.length. Let me know if you have any concerns.

@felixcheung , I'll finish the current changes, then rebase. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-29 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r191494940
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.status.api.v1.PeakMemoryMetrics
+
+/**
+ * Records the peak values for executor level metrics. If 
jvmUsedHeapMemory is -1, then no
+ * values have been recorded yet.
+ */
+private[spark] class PeakExecutorMetrics {
+  private var _jvmUsedHeapMemory = -1L;
+  private var _jvmUsedNonHeapMemory = 0L;
+  private var _onHeapExecutionMemory = 0L
+  private var _offHeapExecutionMemory = 0L
+  private var _onHeapStorageMemory = 0L
+  private var _offHeapStorageMemory = 0L
+  private var _onHeapUnifiedMemory = 0L
+  private var _offHeapUnifiedMemory = 0L
+  private var _directMemory = 0L
+  private var _mappedMemory = 0L
+
+  def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory
+
+  def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory
+
+  def onHeapExecutionMemory: Long = _onHeapExecutionMemory
+
+  def offHeapExecutionMemory: Long = _offHeapExecutionMemory
+
+  def onHeapStorageMemory: Long = _onHeapStorageMemory
+
+  def offHeapStorageMemory: Long = _offHeapStorageMemory
+
+  def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory
+
+  def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory
+
+  def directMemory: Long = _directMemory
+
+  def mappedMemory: Long = _mappedMemory
+
+  /**
+   * Compare the specified memory values with the saved peak executor 
memory
+   * values, and update if there is a new peak value.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = {
+var updated: Boolean = false
+
+if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) {
+  _jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory
+  updated = true
+}
+if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) {
+  _jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory
+  updated = true
+}
+if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) {
+  _onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory
+  updated = true
+}
+if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) {
+  _offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory
+  updated = true
+}
+if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) {
+  _onHeapStorageMemory = executorMetrics.onHeapStorageMemory
+  updated = true
+}
+if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) {
+  _offHeapStorageMemory = executorMetrics.offHeapStorageMemory
--- End diff --

Will do. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-25 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r191012697
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.status.api.v1.PeakMemoryMetrics
+
+/**
+ * Records the peak values for executor level metrics. If 
jvmUsedHeapMemory is -1, then no
+ * values have been recorded yet.
+ */
+private[spark] class PeakExecutorMetrics {
+  private var _jvmUsedHeapMemory = -1L;
+  private var _jvmUsedNonHeapMemory = 0L;
+  private var _onHeapExecutionMemory = 0L
+  private var _offHeapExecutionMemory = 0L
+  private var _onHeapStorageMemory = 0L
+  private var _offHeapStorageMemory = 0L
+  private var _onHeapUnifiedMemory = 0L
+  private var _offHeapUnifiedMemory = 0L
+  private var _directMemory = 0L
+  private var _mappedMemory = 0L
+
+  def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory
+
+  def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory
+
+  def onHeapExecutionMemory: Long = _onHeapExecutionMemory
+
+  def offHeapExecutionMemory: Long = _offHeapExecutionMemory
+
+  def onHeapStorageMemory: Long = _onHeapStorageMemory
+
+  def offHeapStorageMemory: Long = _offHeapStorageMemory
+
+  def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory
+
+  def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory
+
+  def directMemory: Long = _directMemory
+
+  def mappedMemory: Long = _mappedMemory
+
+  /**
+   * Compare the specified memory values with the saved peak executor 
memory
+   * values, and update if there is a new peak value.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = {
+var updated: Boolean = false
+
+if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) {
+  _jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory
+  updated = true
+}
+if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) {
+  _jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory
+  updated = true
+}
+if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) {
+  _onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory
+  updated = true
+}
+if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) {
+  _offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory
+  updated = true
+}
+if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) {
+  _onHeapStorageMemory = executorMetrics.onHeapStorageMemory
+  updated = true
+}
+if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) {
+  _offHeapStorageMemory = executorMetrics.offHeapStorageMemory
--- End diff --

Thanks! This is cleaner, and will make it easier to add new metrics. It is 
very easy to have a copy/paste error. I can merge and make the test changes -- 
let me know if that sounds good, or if you'd like to make some more changes 
first. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-25 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r191012183
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected ExecutorMetricsUpdate, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerExecutorMetricsUpdate] =
+  Map(
+((0, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 
30L, 70L, 20L))),
+((0, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 
30L, 80L, 40L))),
+((1, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 
55L, 50L, 0L))),
+((1, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 
60L, 40L, 40L
+
+// Events to post.
+val events = Array(
+  SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None),
+  createExecutorAddedEvent(1),
+  createExecutorAddedEvent(2),
+  createStageSubmittedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 
80L, 0L)),
+  createStageSubmittedEvent(1),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 
10L, 40L)),
+  createStageCompletedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 
40L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 
20L, 0L)),
+  createExecutorRemovedEvent(1),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 0L)),
+  createStageCompletedEvent(1),
+  SparkListenerApplicationEnd(1000L))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+events.foreach(event => listenerBus.post(event))
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events.
+// Posted events should be logged, except for ExecutorMetricsUpdate 
events -- these
+// are consolidated, 

[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-25 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r191011834
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected ExecutorMetricsUpdate, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerExecutorMetricsUpdate] =
+  Map(
+((0, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 
30L, 70L, 20L))),
+((0, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 
30L, 80L, 40L))),
+((1, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 
55L, 50L, 0L))),
+((1, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 
60L, 40L, 40L
+
+// Events to post.
+val events = Array(
+  SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None),
+  createExecutorAddedEvent(1),
+  createExecutorAddedEvent(2),
+  createStageSubmittedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 
80L, 0L)),
+  createStageSubmittedEvent(1),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 
10L, 40L)),
+  createStageCompletedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 
40L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 
20L, 0L)),
+  createExecutorRemovedEvent(1),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 0L)),
+  createStageCompletedEvent(1),
+  SparkListenerApplicationEnd(1000L))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+events.foreach(event => listenerBus.post(event))
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events.
+// Posted events should be logged, except for ExecutorMetricsUpdate 
events -- these
+// are consolidated, 

[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-25 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r191008473
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected ExecutorMetricsUpdate, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerExecutorMetricsUpdate] =
+  Map(
+((0, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 
30L, 70L, 20L))),
+((0, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 
30L, 80L, 40L))),
+((1, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 
55L, 50L, 0L))),
+((1, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 
60L, 40L, 40L
+
+// Events to post.
+val events = Array(
+  SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None),
+  createExecutorAddedEvent(1),
+  createExecutorAddedEvent(2),
+  createStageSubmittedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 
80L, 0L)),
+  createStageSubmittedEvent(1),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 
10L, 40L)),
+  createStageCompletedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 
40L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 
20L, 0L)),
+  createExecutorRemovedEvent(1),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 0L)),
+  createStageCompletedEvent(1),
+  SparkListenerApplicationEnd(1000L))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+events.foreach(event => listenerBus.post(event))
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events.
+// Posted events should be logged, except for ExecutorMetricsUpdate 
events -- these
+// are consolidated, 

[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-25 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r191008115
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected ExecutorMetricsUpdate, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerExecutorMetricsUpdate] =
+  Map(
+((0, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 
30L, 70L, 20L))),
+((0, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 
30L, 80L, 40L))),
+((1, "1"),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 
55L, 50L, 0L))),
+((1, "2"),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 
60L, 40L, 40L
+
+// Events to post.
+val events = Array(
+  SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None),
+  createExecutorAddedEvent(1),
+  createExecutorAddedEvent(2),
+  createStageSubmittedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 
70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 
70L, 0L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 
80L, 0L)),
+  createStageSubmittedEvent(1),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 
50L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 
10L, 40L)),
+  createStageCompletedEvent(0),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 
40L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 
30L, 0L)),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 20L)),
+  createExecutorMetricsUpdateEvent(1,
+new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 
20L, 0L)),
+  createExecutorRemovedEvent(1),
+  createExecutorMetricsUpdateEvent(2,
+new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 0L)),
+  createStageCompletedEvent(1),
+  SparkListenerApplicationEnd(1000L))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+events.foreach(event => listenerBus.post(event))
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events.
+// Posted events should be logged, except for ExecutorMetricsUpdate 
events -- these
+// are consolidated, 

[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-25 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r191007795
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -209,6 +210,16 @@ class DAGScheduler(
   private[scheduler] val eventProcessLoop = new 
DAGSchedulerEventProcessLoop(this)
   taskScheduler.setDAGScheduler(this)
 
+  /** driver heartbeat for collecting metrics */
+  private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat, 
"driver-heartbeater",
--- End diff --

Moved.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-25 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190995781
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -800,26 +812,50 @@ private[spark] class Executor(
 }
 }
   }
-
-  /**
-   * Schedules a task to report heartbeat and partial metrics for active 
tasks to driver.
-   */
-  private def startDriverHeartbeater(): Unit = {
-val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", 
"10s")
-
-// Wait a random interval so the heartbeats don't end up in sync
-val initialDelay = intervalMs + (math.random * 
intervalMs).asInstanceOf[Int]
-
-val heartbeatTask = new Runnable() {
-  override def run(): Unit = 
Utils.logUncaughtExceptions(reportHeartBeat())
-}
-heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, 
intervalMs, TimeUnit.MILLISECONDS)
-  }
 }
 
 private[spark] object Executor {
   // This is reserved for internal use by components that need to read 
task properties before a
   // task is fully deserialized. When possible, the 
TaskContext.getLocalProperty call should be
   // used instead.
   val taskDeserializationProps: ThreadLocal[Properties] = new 
ThreadLocal[Properties]
+
+  val DIRECT_BUFFER_POOL_NAME = "direct"
+  val MAPPED_BUFFER_POOL_NAME = "mapped"
+
+  /** Get the BufferPoolMXBean for the specified buffer pool. */
+  def getBufferPool(pool: String): BufferPoolMXBean = {
+val name = new ObjectName("java.nio:type=BufferPool,name=" + pool)
+
ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer,
+  name.toString, classOf[BufferPoolMXBean])
+  }
+
+  /**
+   * Get the current executor level memory metrics.
+   *
+   * @param memoryManager the memory manager
+   * @param direct the direct memory buffer pool
+   * @param mapped the mapped memory buffer pool
+   * @return the executor memory metrics
+   */
+  def getCurrentExecutorMetrics(
+  memoryManager: MemoryManager,
+  direct: BufferPoolMXBean,
+  mapped: BufferPoolMXBean) : ExecutorMetrics = {
--- End diff --

Yes, and easier to share the code between driver and executor. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-25 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190990997
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +183,35 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each executor
--- End diff --

Yes, it's all running executors, and does not filter based on if they have 
tasks for the stage. I've updated the comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-25 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190990593
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -93,6 +96,10 @@ private[spark] class EventLoggingListener(
   // Visible for tests only.
   private[scheduler] val logPath = getLogPath(logBaseDir, appId, 
appAttemptId, compressionCodecName)
 
+  // map of live stages, to peak executor metrics for the stage
+  private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int),
+mutable.HashMap[String, PeakExecutorMetrics]]()
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-25 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190990562
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -81,7 +84,7 @@ private[spark] class EventLoggingListener(
   private val compressionCodecName = compressionCodec.map { c =>
 CompressionCodec.getShortName(c.getClass.getName)
   }
-
+logInfo("spark.eventLog.logExecutorMetricsUpdates.enabled is " + 
shouldLogExecutorMetricsUpdates)
--- End diff --

Removed. Thanks, I hadn't meant to push that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r188136532
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1753,9 +1766,21 @@ class DAGScheduler(
 messageScheduler.shutdownNow()
 eventProcessLoop.stop()
 taskScheduler.stop()
+heartbeater.stop()
+  }
+
+  /** Reports heartbeat metrics for the driver. */
+  private def reportHeartBeat(): Unit = {
--- End diff --

It's a bit redundant for fields that aren't used by the driver -- for the 
driver, execution memory gets set to 0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r188136553
  
--- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Creates a heartbeat thread which will call the specified 
reportHeartbeat function at
+ * intervals of intervalMs.
+ *
+ * @param reportHeartbeat the heartbeat reporting function to call.
+ * @param intervalMs the interval between heartbeats.
+ */
+private[spark] class Heartbeater(reportHeartbeat: () => Unit, intervalMs: 
Long) {
+  // Executor for the heartbeat task
+  private val heartbeater = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-10 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187507940
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -93,6 +94,10 @@ private[spark] class EventLoggingListener(
   // Visible for tests only.
   private[scheduler] val logPath = getLogPath(logBaseDir, appId, 
appAttemptId, compressionCodecName)
 
+  // map of live stages, to peak executor metrics for the stage
+  private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int),
--- End diff --

This is tracking peak metric values for executors for each stage, so that 
the peak values for the stage can be dumped at stage end. The purpose is to 
reduce the amount of logging, to only number of stages * number of executors 
ExecutorMetricsUpdate events.

I originally tried logging for new peak values, resetting when a new stage 
begins -- this is simpler, but can lead to more events being logged.

Having stage level information is useful for users trying to identify which 
stages are more memory intensive. This information could be useful they are 
trying to reduce the amount of memory used, since they would know which stages 
(and the relevant code) to focus on.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-10 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187507139
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.status.api.v1.PeakMemoryMetrics
+
+/**
+ * Records the peak values for executor level metrics. If 
jvmUsedHeapMemory is -1, then no
+ * values have been recorded yet.
+ */
+private[spark] class PeakExecutorMetrics {
--- End diff --

I got some errors when trying to add methods to ExecutorMetrics. I don't 
remember the details, but can try this again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-10 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187506958
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +179,27 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+// log the peak executor metrics for the stage, for each executor
+val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+val executorMap = liveStageExecutorMetrics.remove(
--- End diff --

Yes, it's safer to clean up earlier attempts -- I can add some code to 
iterate through earlier attemptIDs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-10 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187506780
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +179,27 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+// log the peak executor metrics for the stage, for each executor
+val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+val executorMap = liveStageExecutorMetrics.remove(
+  (event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+executorMap.foreach {
+  executorEntry => {
+for ((executorId, peakExecutorMetrics) <- executorEntry) {
--- End diff --

The for loop (line 187) is going through the hashmap entries of executorId 
to peakExecutorMetrics, so there are multiple values. Could you please provide 
more detail for how "case (executorId, peakExecutorMetrics) =>" would work? If 
the for loop is OK, then I can add some comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-10 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187501194
  
--- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Creates a heartbeat thread which will call the specified 
reportHeartbeat function at
+ * intervals of intervalMs.
+ *
+ * @param reportHeartbeat the heartbeat reporting function to call.
+ * @param intervalMs the interval between heartbeats.
+ */
+private[spark] class Heartbeater(reportHeartbeat: () => Unit, intervalMs: 
Long) {
+  // Executor for the heartbeat task
+  private val heartbeater = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
--- End diff --

Yes, this could be for the driver as well, so could be misleading. I can 
change to "heartbeater".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-10 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187501157
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1753,9 +1766,21 @@ class DAGScheduler(
 messageScheduler.shutdownNow()
 eventProcessLoop.stop()
 taskScheduler.stop()
+heartbeater.stop()
+  }
+
+  /** Reports heartbeat metrics for the driver. */
+  private def reportHeartBeat(): Unit = {
--- End diff --

With cluster mode, including YARN, there isn't a local executor, so the 
metrics for the driver would not be collected. Perhaps this could be modified 
to skip this step for local mode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-03 Thread edwinalu
Github user edwinalu closed the pull request at:

https://github.com/apache/spark/pull/20940


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-05-02 Thread edwinalu
Github user edwinalu commented on the issue:

https://github.com/apache/spark/pull/20940
  
@squito, it is really weird. I had some problems with the last sync with 
master, and it looks like some of the changes from master got added.

I've opened a new pull request: https://github.com/apache/spark/pull/21221


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-02 Thread edwinalu
GitHub user edwinalu opened a pull request:

https://github.com/apache/spark/pull/21221

[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in 
executors REST API

The original PR #20940 is messed up, and the dif shows changes not related 
to SPARK-23429. This is a cleaned up version of that pull request.

Add new executor level memory metrics (JVM used memory, on/off heap 
execution memory, on/off heap storage memory, on/off heap unified memory, 
direct memory, and mapped memory), and expose via the executors REST API. This 
information will help provide insight into how executor and driver JVM memory 
is used, and for the different memory regions. It can be used to help determine 
good values for spark.executor.memory, spark.driver.memory, 
spark.memory.fraction, and spark.memory.storageFraction.

## What changes were proposed in this pull request?

An ExecutorMetrics class is added, with jvmUsedHeapMemory, 
jvmUsedNonHeapMemory, onHeapExecutionMemory, offHeapExecutionMemory, 
onHeapStorageMemory, and offHeapStorageMemory, onHeapUnifiedMemory, 
offHeapUnifiedMemory, directMemory and mappedMemory. The new ExecutorMetrics is 
sent by executors to the driver as part of the Heartbeat. A heartbeat is added 
for the driver as well, to collect these metrics for the driver.

The EventLoggingListener store information about the peak values for each 
metric, per active stage and executor. When a StageCompleted event is seen, and 
ExecutorMetricsUpdate event will be logged for each executor, with peal values 
for the stage. Only the ExecutorMetrics will be logged, and not the 
TaskMetrics, to minimize additional logging.

The AppStatusListener records the peak values for each memory metric.

The new memory metrics are added to the executors REST API.

## How was this patch tested?

New unit tests have been added. This was also tested on our cluster.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/edwinalu/spark SPARK-23429.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21221.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21221


commit c8e8abedbdfec6e92b0c63e90f3c2c5755fd8978
Author: Edwina Lu <edlu@...>
Date:   2018-03-09T23:39:36Z

SPARK-23429: Add executor memory metrics to heartbeat and expose in 
executors REST API

Add new executor level memory metrics (JVM used memory, on/off heap 
execution memory, on/off heap storage
memory), and expose via the executors REST API. This information will help 
provide insight into how executor
and driver JVM memory is used, and for the different memory regions. It can 
be used to help determine good
values for spark.executor.memory, spark.driver.memory, 
spark.memory.fraction, and spark.memory.storageFraction.

Add an ExecutorMetrics class, with jvmUsedMemory, onHeapExecutionMemory, 
offHeapExecutionMemory,
onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics will 
be sent by executors to the
driver as part of Heartbeat. A heartbeat will be added for the driver as 
well, to collect these metrics
for the driver.

Modify the EventLoggingListener to log ExecutorMetricsUpdate events if 
there is a new peak value for any
of the memory metrics for an executor and stage. Only the ExecutorMetrics 
will be logged, and not the
TaskMetrics, to minimize additional logging.

Modify the AppStatusListener to record the peak values for each memory 
metric.

Add the new memory metrics to the executors REST API.

commit 5d6ae1c34bf6618754e4b8b2e756a9a7b4bad987
Author: Edwina Lu <edlu@...>
Date:   2018-04-02T02:13:41Z

modify MimaExcludes.scala to filter changes to 
SparkListenerExecutorMetricsUpdate

commit ad10d2814bbfbaf8c21fcbb1abe83ef7a8e9ffe7
Author: Edwina Lu <edlu@...>
Date:   2018-04-22T00:02:57Z

Address code review comments, change event logging to stage end.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-05-01 Thread edwinalu
Github user edwinalu commented on the issue:

https://github.com/apache/spark/pull/20940
  
@rezsafi, thanks for reviewing. It would be better if the heartbeat wasn't 
tied to sampling frequency. Most likely users would want to sample more 
frequently, although this would also mean more communication traffic between 
executors and driver.  I'm out of town right now, but am planning to reschedule 
the design discussion when I am back, hopefully in a couple of weeks, and will 
invite you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-04-22 Thread edwinalu
Github user edwinalu commented on the issue:

https://github.com/apache/spark/pull/20940
  
Could a committer please request a retest? It looks like the tests passed 
(https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89685/testReport/),
 and the failure occurs after posting to github:

Attempting to post to Github...
[error] running /home/jenkins/workspace/SparkPullRequestBuilder/build/sbt 
-Phadoop-2.6 -Pkubernetes -Pflume -Phive-thriftserver -Pyarn -Pkafka-0-8 -Phive 
-Pkinesis-asl -Pmesos test ; process was terminated by signal 9
 > Post successful.
Build step 'Execute shell' marked build as failure
Archiving artifacts
Recording test results
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89685/
Test FAILed.
Finished: FAILURE


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-16 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r181943630
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

Thanks for your thoughts on this. Size of message, and also logging, but it 
is only an extra few longs per heartbeat, and and similarly for logging. Task 
end would help with minimizing communication for longer running tasks. The 
heartbeats are only every 10 seconds, so perhaps not so bad. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-15 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r181611071
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

ExecutorMetrics right now has: jvmUsedHeapMemory, jvmUsedNonHeapMemory, 
onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and 
offHeapStorageMemory. For logging at stage end, we can log the peak for each of 
these, but unified memory is more problematic. We could add new fields for on 
heap/off heap unified memory, but I'm inclined to remove unified memory (from 
all the places it is currently used), rather than add more fields. Users can 
still sum peak execution and peak storage values, which may be larger than the 
actual peak unified memory if they are not at peak values at the same time, but 
should still be a reasonable estimate for sizing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-10 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180446530
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

I will make the change to log at stage end, and will update the design doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-09 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180287725
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

The original analysis was focused on some test applications and larger 
applications. Looking at a broader sample of our applications, the logging 
overhead is higher, averaging about 14% per application, and 4% overall (sum of 
logging for ExecutorMetricsUpdate / sum of Spark history logs). The overhead 
for larger Spark history logs is mostly pretty small, but increases 
significantly for smaller ones (there's one at 49%).

There's often some logging before the first stage starts, which is extra 
overhead especially for smaller applications/history logs, that doesn't contain 
useful information. It can also be high for the case where the stage takes a 
long time to run and memory is increasing rather than reaching the peak quickly 
-- logging at stage end would work better for this case. 

I should also note that these numbers are for the case where only 4 longs 
are recorded, and with more metrics, the overhead would be higher, both in the 
size of each logged event, and the number of potential peaks, since a new peak 
for any metric would be logged.

Since there will be more metrics added, and the cost is higher than 
originally added, logging at stage end could be a better choice. We would lose 
some information about overlapping stages, but this information wouldn't be 
visible anyway with the currently planned REST APIs or web UI, which just show 
the peaks for stages and executors. 

For logging at stage end, we can log an ExecutorMetricsUpdate event for 
each executor that has sent a heartbeat for the stage just before logging the 
stage end -- this would have the peak value for each metric. This should be the 
minimum amount of logging needed to have information about peak values per 
stage per executor. Alternatively, the information could be added to the 
StageCompleted event for more compaction, but the code would be more 
complicated, with 2 paths for reading in values. Logging an event per executor 
at stage end seems like a reasonable choice, not too much extra logging or too 
much extra complexity.

What are your thoughts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-09 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180204845
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -772,6 +772,12 @@ private[spark] class Executor(
 val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
 val curGCTime = computeTotalGcTime()
 
+// get executor level memory metrics
+val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(),
+  ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
--- End diff --

Thanks, I will play around with it a bit. If it seems more complicated or 
expensive, I'll file a separate subtask.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-09 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180180795
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -772,6 +772,12 @@ private[spark] class Executor(
 val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
 val curGCTime = computeTotalGcTime()
 
+// get executor level memory metrics
+val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(),
+  ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
--- End diff --

We could add 
ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed(), for total 
non-heap memory used by the JVM.

For direct and memory mapped usage, would collecting these be similar to 
https://gist.github.com/t3rmin4t0r/1a753ccdcfa8d111f07c ? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-09 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r180179243
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

We're not constructing a timeline currently, and yes, with only peak 
values, it could be rather sparse. We would get new values with new stages, but 
would not see decreases in memory during a stage.

The situation where there is a stage 1 with peak X, and then stage 2 starts 
with peak Y > X is interesting though, and it would be useful to have this 
information, since we would then know to focus on stage 2 for memory 
consumption, even though both 1 and 2 could have the same peak values. Logging 
at stage start would double the amount of logging, and would be trickier, so 
I'd prefer either keeping the current approach or only logging at stage end.

The higher logging was for smaller applications (and smaller logs).



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179978448
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -772,6 +772,12 @@ private[spark] class Executor(
 val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
 val curGCTime = computeTotalGcTime()
 
+// get executor level memory metrics
+val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(),
+  ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
--- End diff --

It would be useful to have more information about offheap memory usage. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179978186
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// list of events and if they should be logged
+val events = Array(
+  (SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None), true),
+  (createExecutorAddedEvent(1), true),
+  (createExecutorAddedEvent(2), true),
+  (createStageSubmittedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), 
true), // onheap storage
+  (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), 
true), // JVM used
+  (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), 
true), // onheap unified
+  (createStageSubmittedEvent(1), true),
+  (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createStageCompletedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), 
true), // onheap execution
+  (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), 
true), // offheap execution
+  (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), 
true), // offheap storage
+  (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), 
false),
+  (createStageCompletedEvent(1), true),
+  (SparkListenerApplicationEnd(1000L), true))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+for ((event, included) <- events) {
+  listenerBus.post(event)
+}
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events
+val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
+try {
+  val lines = readLines(logData)
+  val logStart = SparkListenerLogStart(SPARK_VERSION)
+  assert(lines.size === 19)
+  assert(lines(0).contains("SparkListenerLogStart"))
+  assert(lines(1).contains("SparkListenerApplicationStart"))
+  assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+  var i = 1
+  for ((event, included) <- events) {
+if (included) {
+  checkEvent(lines(i), event)
+  i += 1
+}
+  }
+} finally {
+  logData.close()
+}
+  }
+
+  /** Create a stage submitted event for the specified stage Id. */
+  private def createStageSubmittedEvent(stageId: Int) =
+SparkListenerStageSubmitted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create a stage completed event for the specified stage Id. */
+  private def createStageCompletedEvent(stageId: Int) =
+SparkListenerStageCompleted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create an executor added event for the specified executor Id. */
+  private def createExecutorAddedEvent(executorId: Int) =
+SparkListenerExecutorAdded(0L, executorId.toString, new 
ExecutorInf

[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179978222
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// list of events and if they should be logged
+val events = Array(
+  (SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None), true),
+  (createExecutorAddedEvent(1), true),
+  (createExecutorAddedEvent(2), true),
+  (createStageSubmittedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), 
true), // onheap storage
+  (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), 
true), // JVM used
+  (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), 
true), // onheap unified
+  (createStageSubmittedEvent(1), true),
+  (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createStageCompletedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), 
true), // onheap execution
+  (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), 
true), // offheap execution
+  (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), 
true), // offheap storage
+  (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), 
false),
+  (createStageCompletedEvent(1), true),
+  (SparkListenerApplicationEnd(1000L), true))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+for ((event, included) <- events) {
+  listenerBus.post(event)
+}
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events
+val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
+try {
+  val lines = readLines(logData)
+  val logStart = SparkListenerLogStart(SPARK_VERSION)
+  assert(lines.size === 19)
+  assert(lines(0).contains("SparkListenerLogStart"))
+  assert(lines(1).contains("SparkListenerApplicationStart"))
+  assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+  var i = 1
+  for ((event, included) <- events) {
+if (included) {
+  checkEvent(lines(i), event)
+  i += 1
+}
+  }
+} finally {
+  logData.close()
+}
+  }
+
+  /** Create a stage submitted event for the specified stage Id. */
+  private def createStageSubmittedEvent(stageId: Int) =
+SparkListenerStageSubmitted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create a stage completed event for the specified stage Id. */
+  private def createStageCompletedEvent(stageId: Int) =
+SparkListenerStageCompleted(new StageInfo(stageId, 0, 
stageId.toString, 0,
+  Seq.empty, Seq.empty, "details"))
+
+  /** Create an executor added event for the specified executor Id. */
+  private def createExecutorAddedEvent(executorId: Int) =
+SparkListenerExecutorAdded(0L, executorId.toString, new 
ExecutorInf

[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179978192
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// list of events and if they should be logged
+val events = Array(
+  (SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None), true),
+  (createExecutorAddedEvent(1), true),
+  (createExecutorAddedEvent(2), true),
+  (createStageSubmittedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), 
true), // onheap storage
+  (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), 
true), // JVM used
+  (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), 
true), // onheap unified
+  (createStageSubmittedEvent(1), true),
+  (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), 
true), // new stage
+  (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), 
true), // new stage
+  (createStageCompletedEvent(0), true),
+  (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), 
true), // onheap execution
+  (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), 
true), // offheap execution
+  (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), 
true), // offheap storage
+  (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), 
false),
+  (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), 
false),
+  (createStageCompletedEvent(1), true),
+  (SparkListenerApplicationEnd(1000L), true))
+
+// play the events for the event logger
+eventLogger.start()
+listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+listenerBus.addToEventLogQueue(eventLogger)
+for ((event, included) <- events) {
+  listenerBus.post(event)
+}
+listenerBus.stop()
+eventLogger.stop()
+
+// Verify the log file contains the expected events
+val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
+try {
+  val lines = readLines(logData)
+  val logStart = SparkListenerLogStart(SPARK_VERSION)
+  assert(lines.size === 19)
+  assert(lines(0).contains("SparkListenerLogStart"))
+  assert(lines(1).contains("SparkListenerApplicationStart"))
+  assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+  var i = 1
+  for ((event, included) <- events) {
+if (included) {
+  checkEvent(lines(i), event)
+  i += 1
+}
+  }
+} finally {
+  logData.close()
+}
+  }
+
+  /** Create a stage submitted event for the specified stage Id. */
+  private def createStageSubmittedEvent(stageId: Int) =
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179978182
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -268,6 +268,9 @@ private class LiveExecutor(val executorId: String, 
_addTime: Long) extends LiveE
 
   def hasMemoryInfo: Boolean = totalOnHeap >= 0L
 
+  // peak values for executor level metrics
+  var peakExecutorMetrics = new PeakExecutorMetrics
--- End diff --

Yes, thanks for catching this, it should be val.

This is more properly part of SPARK-23431, and I can remove for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...

2018-04-08 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20940#discussion_r179978102
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  /**
+   * Log if there is a new peak value for one of the memory metrics for 
the given executor.
+   * Metrics are cleared out when a new stage is started in 
onStageSubmitted, so this will
+   * log new peak memory metric values per executor per stage.
+   */
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
--- End diff --

For a longer running stage, once it ramps up, hopefully there wouldn't be a 
lot of new peak values. Looking at a subset of our applications, the extra 
logging overhead has mostly been between 0.25% to 1%, but it can be 8%. 

By logging each peak value at the time they occur (and reinitializing when 
a stage starts), it's possible to tell which stages are active at the time, and 
it would potentially be possible to graph these changes on a timeline -- this 
information wouldn't be available if the metrics are only logged at stage end, 
and the times are lost.

Logging at stage end would limit the amount of extra logging. If we add 
more metrics (such as for offheap), then there could be more new peaks and more 
extra logging with the current approach. Excess logging is a concern, and I can 
move to stage end if the overhead is too much. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >