[GitHub] spark issue #22884: [SPARK-23429][CORE][FOLLOWUP] MetricGetter should rename...
Github user edwinalu commented on the issue: https://github.com/apache/spark/pull/22884 Thanks, lgtm. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r223905848 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -28,35 +30,63 @@ import org.apache.spark.metrics.ExecutorMetricType @DeveloperApi class ExecutorMetrics private[spark] extends Serializable { - // Metrics are indexed by MetricGetter.values - private val metrics = new Array[Long](ExecutorMetricType.values.length) + private var metrics = mutable.Map.empty[String, Long] --- End diff -- There's a fair amount of overhead in storing values in a Map, data structure and the String keys. All this would then be sent in the heartbeat, so this would add overhead to the heartbeats. The array representation is a lot more compact. Unfortunately there's no EnumMap in Scala. I'd prefer to keep the array storage, and use map for returning and passing in values. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r223766597 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal = 0L + private var latestJVMRSSTotal = 0L + private var latestPythonVmemTotal = 0L + private var latestPythonRSSTotal = 0L + private var latestOtherVmemTotal = 0L + private var latestOtherRSSTotal = 0L + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => logDebug("IO Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +val cmd = Array("getconf", "PAGESIZE") +val out2 = Utils.executeAndGetOutput(cmd) +return Integer.parseInt(out2.split("\n")(0)) + } + + private def computeProcessTree(): Unit = { +if (!isAvailable || testing) { + return +} +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree += (p -> c.toSet)
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r223467439 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal = 0L + private var latestJVMRSSTotal = 0L + private var latestPythonVmemTotal = 0L + private var latestPythonRSSTotal = 0L + private var latestOtherVmemTotal = 0L + private var latestOtherRSSTotal = 0L + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => logDebug("IO Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +val cmd = Array("getconf", "PAGESIZE") +val out2 = Utils.executeAndGetOutput(cmd) +return Integer.parseInt(out2.split("\n")(0)) + } + + private def computeProcessTree(): Unit = { +if (!isAvailable || testing) { + return +} +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree += (p -> c.toSet)
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r223466260 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -59,6 +60,43 @@ case object JVMOffHeapMemory extends ExecutorMetricType { } } +case object ProcessTreeJVMRSSMemory extends ExecutorMetricType { + override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { +ExecutorMetricType.pTreeInfo.updateAllMetrics() --- End diff -- Sorry for the delayed response. Thanks for adding the total memory metrics -- these will be very useful. Agreed that doing the work once is better, but that having ProcessTreeJVMRSSMemory.getMetricValue() update all the metrics is confusing, especially if a user at some point wants to call getMetricValue() for one of the other metrics, and not ProcessTreeJVMRSSMemory. @squito 's #1 is probably the easiest to make the change for with the existing code. However, #2 with @mccheah's suggestion to return Map sounds best/cleanest as an API, with @dhruve 's suggestion to consolidate into ProcessTreeMemory -- I prefer this approach as well. Right now the call for getMetricValue is done in Heartbeater.getCurrentMetrics(), and it's mapping ExecutorMetricType.values to the array of actual values. Translating the returned maps to an array (with index mapping to name rather than ExecutorMetricType) will involve some more code. In retrospect, getting the current metrics is probably better done by ExecutorMetrics iteself, rather than having Heartbeater exposed to the implementation details -- would you be able to move the logic there? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user edwinalu commented on the issue: https://github.com/apache/spark/pull/21221 Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r210691276 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -69,6 +69,11 @@ package object config { .bytesConf(ByteUnit.KiB) .createWithDefaultString("100k") + private[spark] val EVENT_LOG_STAGE_EXECUTOR_METRICS = +ConfigBuilder("spark.eventLog.logStageExecutorMetrics.enabled") + .booleanConf + .createWithDefault(true) --- End diff -- That would be safer. I'll change to false, and we can change change to true after people have had a chance to test it out. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r210690505 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -216,8 +217,7 @@ private[spark] class Executor( def stop(): Unit = { env.metricsSystem.report() -heartbeater.shutdown() -heartbeater.awaitTermination(10, TimeUnit.SECONDS) +heartbeater.stop() --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209772320 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,31 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() --- End diff -- The rest of the file is using System.nanoTime() -- it seems more consistent to keep it the same. Clock has getTimeMillis(), although we could always multiply by 1000, not sure if the precision would matter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209771443 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,31 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => --- End diff -- No, these don't need to be explicitly defined. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209770605 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +/** + * Executor metric types for executor-level metrics stored in ExecutorMetrics. + */ +sealed trait ExecutorMetricType { + private[spark] def getMetricValue(memoryManager: MemoryManager): Long + private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last +} + +private[spark] abstract class MemoryManagerExecutorMetricType( +f: MemoryManager => Long) extends ExecutorMetricType { + override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { +f(memoryManager) + } +} + +private[spark]abstract class MBeanExecutorMetricType(mBeanName: String) --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209770476 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) + + // the first element is initialized to -1, indicating that the values for the array + // haven't been set yet. + metrics(0) = -1 + + /** Returns the value for the specified metricType. */ + def getMetricValue(metricType: ExecutorMetricType): Long = { +metrics(ExecutorMetricType.metricIdxMap(metricType)) + } + + /** Returns true if the values for the metrics have been set, false otherwise. */ + def isSet(): Boolean = metrics(0) > -1 + + private[spark] def this(metrics: Array[Long]) { +this() +Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size)) + } + + /** + * Constructor: create the ExecutorMetrics with the values specified. + * + * @param executorMetrics map of executor metric name to value + */ + private[spark] def this(executorMetrics: Map[String, Long]) { +this() +(0 until ExecutorMetricType.values.length).foreach { idx => + metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L) +} + } + + /** + * Compare the specified executor metrics values with the current executor metric values, + * and update the value for any metrics where the new value for the metric is larger. + * + * @param executorMetrics the executor metrics to compare + * @return if there is a new peak value for any metric + */ + private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = { +var updated: Boolean = false --- End diff -- Removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209770404 --- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala --- @@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager( onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed } + /** + * On heap execution memory currently in use, in bytes. + */ + final def onHeapExecutionMemoryUsed: Long = onHeapExecutionMemoryPool.memoryUsed --- End diff -- That's true -- I'll change the methods to synchronized. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209770440 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) + + // the first element is initialized to -1, indicating that the values for the array + // haven't been set yet. + metrics(0) = -1 + + /** Returns the value for the specified metricType. */ + def getMetricValue(metricType: ExecutorMetricType): Long = { +metrics(ExecutorMetricType.metricIdxMap(metricType)) + } + + /** Returns true if the values for the metrics have been set, false otherwise. */ + def isSet(): Boolean = metrics(0) > -1 + + private[spark] def this(metrics: Array[Long]) { +this() +Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size)) + } + + /** + * Constructor: create the ExecutorMetrics with the values specified. + * + * @param executorMetrics map of executor metric name to value + */ + private[spark] def this(executorMetrics: Map[String, Long]) { +this() +(0 until ExecutorMetricType.values.length).foreach { idx => + metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L) +} + } + + /** + * Compare the specified executor metrics values with the current executor metric values, + * and update the value for any metrics where the new value for the metric is larger. + * + * @param executorMetrics the executor metrics to compare + * @return if there is a new peak value for any metric + */ + private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = { +var updated: Boolean = false + +(0 until ExecutorMetricType.values.length).foreach { idx => + if ( executorMetrics.metrics(idx) > metrics(idx)) { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723205 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) --- End diff -- Is it likely that users would want to access the individual fields, rather than iterating through all? The 1st option would be a bit nicer if so. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723188 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -296,7 +338,7 @@ private[spark] object EventLoggingListener extends Logging { private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) // A cache for compression codecs to avoid creating the same codec many times - private val codecMap = new mutable.HashMap[String, CompressionCodec] + private val codecMap = new HashMap[String, CompressionCodec] --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723177 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +103,50 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) +@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) +val peakMemoryMetrics: Option[ExecutorMetrics]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ +private[spark] class ExecutorMetricsJsonDeserializer + extends JsonDeserializer[Option[ExecutorMetrics]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[ExecutorMetrics] = { +val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]]( + new TypeReference[Option[Map[String, java.lang.Long]]] {}) +metricsMap match { + case Some(metrics) => +Some(new ExecutorMetrics(metrics)) + case None => None --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723173 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +103,50 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) +@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) +val peakMemoryMetrics: Option[ExecutorMetrics]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ +private[spark] class ExecutorMetricsJsonDeserializer + extends JsonDeserializer[Option[ExecutorMetrics]] { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723165 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,34 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() + +// check if there is a new peak value for any of the executor level memory metrics, +// while reading from the log. SparkListenerStageExecutorMetrics are only processed +// when reading logs. +liveExecutors.get(executorMetrics.execId) + .orElse(deadExecutors.get(executorMetrics.execId)) match { + case Some(exec) => + if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics)) { + maybeUpdate(exec, now) --- End diff -- Yes, this is called on replay. I've removed the "maybeUpdate". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723141 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,214 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test stage executor metrics logging functionality. This checks that peak + * values from SparkListenerExecutorMetricsUpdate events during a stage are + * logged in a StageExecutorMetrics event for each executor at stage completion. + */ + private def testStageExecutorMetricsEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "stageExecutorMetrics-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected StageExecutorMetrics, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] = --- End diff -- Moved to after the replay. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723114 --- Diff: core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala --- @@ -217,7 +218,12 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // Verify the same events are replayed in the same order assert(sc.eventLogger.isDefined) -val originalEvents = sc.eventLogger.get.loggedEvents +val originalEvents = sc.eventLogger.get.loggedEvents.filter { e => + JsonProtocol.sparkEventFromJson(e) match { --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723098 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -691,7 +723,19 @@ private[spark] object JsonProtocol { (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson) (taskId, stageId, stageAttemptId, updates) } -SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates) +val executorUpdates = jsonOption(json \ "Executor Metrics Updated") match { --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722892 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +103,50 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) +@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) +val peakMemoryMetrics: Option[ExecutorMetrics]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ +private[spark] class ExecutorMetricsJsonDeserializer + extends JsonDeserializer[Option[ExecutorMetrics]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[ExecutorMetrics] = { +val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]]( + new TypeReference[Option[Map[String, java.lang.Long]]] {}) +metricsMap match { + case Some(metrics) => +Some(new ExecutorMetrics(metrics)) + case None => None +} + } +} +/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */ +private[spark] class ExecutorMetricsJsonSerializer + extends JsonSerializer[Option[ExecutorMetrics]] { --- End diff -- It doesn't serialize -- nothing is added to the JSON. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722887 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +103,50 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) +@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) +val peakMemoryMetrics: Option[ExecutorMetrics]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ +private[spark] class ExecutorMetricsJsonDeserializer + extends JsonDeserializer[Option[ExecutorMetrics]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[ExecutorMetrics] = { +val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]]( + new TypeReference[Option[Map[String, java.lang.Long]]] {}) +metricsMap match { + case Some(metrics) => +Some(new ExecutorMetrics(metrics)) + case None => None +} + } +} +/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */ +private[spark] class ExecutorMetricsJsonSerializer + extends JsonSerializer[Option[ExecutorMetrics]] { + override def serialize( + metrics: Option[ExecutorMetrics], + jsonGenerator: JsonGenerator, + serializerProvider: SerializerProvider): Unit = { +metrics match { + case Some(m) => --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722865 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -302,10 +305,10 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE Option(removeReason), executorLogs, memoryMetrics, - blacklistedInStages) + blacklistedInStages, + if (peakExecutorMetrics.isSet()) Some(peakExecutorMetrics) else None) --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722773 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -155,7 +160,14 @@ private[spark] class EventLoggingListener( } // Events that do not trigger a flush - override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event) + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { +logEvent(event) +if (shouldLogStageExecutorMetrics) { + // record the peak metrics for the new stage + liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()), +new HashMap[String, ExecutorMetrics]()) --- End diff -- Modified. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722724 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -93,6 +95,9 @@ private[spark] class EventLoggingListener( // Visible for tests only. private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + // map of (stageId, stageAttempt), to peak executor metrics for the stage + private val liveStageExecutorMetrics = HashMap[(Int, Int), HashMap[String, ExecutorMetrics]]() --- End diff -- Yes, mutable Maps will work, and better to go with the more generic version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722674 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +/** + * Executor metric types for executor-level metrics stored in ExecutorMetrics. + */ +sealed trait ExecutorMetricType { + private[spark] def getMetricValue(memoryManager: MemoryManager): Long --- End diff -- Let's stick with the current version for now, and revisit if we end up adding more metric types. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722621 --- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala --- @@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager( onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed } + /** + * On heap execution memory currently in use, in bytes. + */ + final def onHeapExecutionMemoryUsed: Long = onHeapExecutionMemoryPool.memoryUsed --- End diff -- Since it's access just one Long field, I don't think so. The other methods, which are synchronized are summing 2 fields. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user edwinalu commented on the issue: https://github.com/apache/spark/pull/21221 @mccheah and @squito , thanks for reviewing and commenting, and sorry for the delay. I'll reply and update this weekend. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r205095575 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,215 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test stage executor metrics logging functionality. This checks that peak + * values from SparkListenerExecutorMetricsUpdate events during a stage are + * logged in a StageExecutorMetrics event for each executor at stage completion. + */ + private def testStageExecutorMetricsEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "stageExecutorMetrics-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected StageExecutorMetrics, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] = + Map( +((0, "1"), + new SparkListenerStageExecutorMetrics("1", 0, 0, + Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))), +((0, "2"), + new SparkListenerStageExecutorMetrics("2", 0, 0, + Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))), +((1, "1"), + new SparkListenerStageExecutorMetrics("1", 1, 0, + Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))), +((1, "2"), + new SparkListenerStageExecutorMetrics("2", 1, 0, + Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L + +// Events to post. +val events = Array( + SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), + createExecutorAddedEvent(1), + createExecutorAddedEvent(2), + createStageSubmittedEvent(0), + // receive 3 metric updates from each executor with just stage 0 running, + // with different peak updates for each executor + createExecutorMetricsUpdateEvent(1, + Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, + Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)), + // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6 + createExecutorMetricsUpdateEvent(1, + Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)), + // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6 + createExecutorMetricsUpdateEvent(2, + Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)), + // exec 1: new stage 0 peaks for metrics at indexes: 5, 7 + createExecutorMetricsUpdateEvent(1, + Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)), + // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8 + createExecutorMetricsUpdateEvent(2, + Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)), + // now start stage 1, one more metric update for each executor, and new + // peaks for some stage 1 metrics (as listed), initialize stage 1 peaks + createStageSubmittedEvent(1), + // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7 --- End diff -- Stage 0 is still running, and these are new peaks for that stage. It is also initializing all the stage 1 metric values, since these are the first executor metrics seen for stage 1 (I'll add this to the comments). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r203503691 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- This sounds like a good solution, with both a clean API, and also an efficient implementation that will be easier to add new metrics to. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r203122722 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- Adding getMetricValue() would abstract away the array implementation. Should MetricGetter/ExecutorMetricType be public? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r200826235 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- We can change back to using an ExecutorMetrics class in this case. The plan was for any new metrics to be added to the end, so that there wouldn't be any change in ordering, and executorMetrics could be changed to immutable Seq[Long], but there would still be the issue of having to reference MetricGetter to find out how the metrics are indexed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198815695 --- Diff: core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.status.api.v1.PeakMemoryMetrics + +/** + * Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no + * values have been recorded yet. + */ +private[spark] class PeakExecutorMetrics { --- End diff -- With ExecutorMetrics removed, it seems useful to have a class for tracking and setting peak metric values, that can be used by both EventLoggingListener and AppStatusListener. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198684121 --- Diff: project/MimaExcludes.scala --- @@ -89,7 +89,13 @@ object MimaExcludes { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.getValidationIndicatorCol"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.org$apache$spark$ml$param$shared$HasValidationIndicatorCol$_setter_$validationIndicatorCol_="), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol") + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"), + +// [SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.this"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate$") --- End diff -- Will move up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198683846 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,217 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics --- End diff -- Woops, that was left over from when it was ExecutorMetricsUpdated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198683408 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +102,48 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer]) +@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer]) +val peakMemoryMetrics: Option[Array[Long]]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */ +class PeakMemoryMetricsDeserializer private[spark] extends JsonDeserializer[Option[Array[Long]]] { --- End diff -- This is odd, but I can't seem to comment on your earlier comment. Regarding having a serializer/deserializer, I also don't have strong feelings -- it makes it more readable, but also takes up more space in the history log. Regarding this comment, thanks, I hadn't realized the placement meant that it marked the constructor. It's meant for the class, and I'll move. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198682917 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -264,6 +282,11 @@ private[spark] trait SparkListenerInterface { */ def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit + /** + * Called when the driver reads stage executor metrics from the history log. --- End diff -- Updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198682980 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,29 @@ private[spark] class AppStatusListener( } } } +event.executorUpdates.foreach { updates: Array[Long] => + // check if there is a new peak value for any of the executor level memory metrics + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdate(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() + +// check if there is a new peak value for any of the executor level memory metrics --- End diff -- Unfortunately, yes. I've added some comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198682809 --- Diff: core/src/main/scala/org/apache/spark/metrics/MetricGetter.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +sealed trait MetricGetter { --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198682884 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +181,28 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { +liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val executorMap = liveStageExecutorMetrics.remove( +(event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { --- End diff -- Yes, the naming is confusing. Changed to the 1st option. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198682779 --- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.TimeUnit + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryManager +import org.apache.spark.metrics.MetricGetter +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * Creates a heartbeat thread which will call the specified reportHeartbeat function at + * intervals of intervalMs. + * + * @param memoryManager the memory manager for execution and storage memory. + * @param reportHeartbeat the heartbeat reporting function to call. + * @param name the thread name for the heartbeater. + * @param intervalMs the interval between heartbeats. + */ +private[spark] class Heartbeater( +memoryManager: MemoryManager, +reportHeartbeat: () => Unit, +name: String, +intervalMs: Long) extends Logging { + // Executor for the heartbeat task + private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor(name) + + /** Schedules a task to report a heartbeat. */ + private[spark] def start(): Unit = { --- End diff -- Removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198682787 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1922,6 +1928,12 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _eventLogger.foreach(_.stop()) } +if(_heartbeater != null) { --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r196236364 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { +liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() + val executorMap = liveStageExecutorMetrics.remove( +(event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { +val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics) --- End diff -- Just discussed with @squito -- thanks! Logging -1 for timestamp is confusing and hacky. Some items discussed: For ExecutorMetrics, timestamp can be optional, or it can be removed completely and replaced by Array[Long], with comments explaining how the metrics work. For logging, stage ID could be added as part of the executorMetrics in SparkListenerExecutorMetricsUpdate, but this is awkward, since this information isn't used as part of the Heartbeat, and only for logging. While the application is running, there could be multiple stages running when the metrics are gathered, so specifying 1 stage ID doesn't make sense. For logging, the metrics are the peak values for a particular stage, so are associated with a stage. Another option is to add the information to SparkListenerStageCompleted, but this would bloat the event if there are many executors. A third option is to create a new event, SparkListenerStageExecutorMetrics, which would have the executor ID, stage ID and attempt, and peak metrics. I'll give the 3rd option a try, and will add details to the design doc once this is more finalized. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195957438 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { +liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() + val executorMap = liveStageExecutorMetrics.remove( +(event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { +val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics) --- End diff -- The last timestamp seems like it wouldn't have enough information, since peaks for different metrics could occur at different times, and with different combinations of stages running. Only -1 would be logged. Right now it's writing out SparkListenerExecutorMetricsUpdate events, which contain ExecutorMetrics, which has timestamp. Do you think timestamp should be removed from ExecutorMetrics? It seems good to have the timestamp for when the metrics were gathered, but it's not being exposed at this point. For both the history server and the live UI, the goal is to show the peak value for each metric each executor. For the executors tab, this is the peak value of each metric over the lifetime of the executor. For the stages tab, this is the peak value for each metric for that executor while the stage is running. The executor could be processing tasks for other stages as well, if there are concurrent stages, or no tasks for this stage if it isn't assigned any tasks, but it is the peak values between the time the stage starts and ends. Can you describe how the stage level metrics would work the last timestamp for any peak metric? Would there be a check to see if the event is being read from the history log? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195892287 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,222 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected ExecutorMetricsUpdate, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerExecutorMetricsUpdate] = --- End diff -- I'll add a check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195892271 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +101,53 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer]) +@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer]) +val peakMemoryMetrics: Option[Array[Long]]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */ +class PeakMemoryMetricsDeserializer extends JsonDeserializer[Option[Array[Long]]] { --- End diff -- Yes, changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195892263 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +101,53 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer]) +@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer]) +val peakMemoryMetrics: Option[Array[Long]]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */ +class PeakMemoryMetricsDeserializer extends JsonDeserializer[Option[Array[Long]]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[Array[Long]] = { +val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, Object]]]) --- End diff -- That works and is much cleaner, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195892278 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,222 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected ExecutorMetricsUpdate, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerExecutorMetricsUpdate] = + Map( +((0, "1"), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(-1L, + Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L, +((0, "2"), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(-1L, + Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L, +((1, "1"), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(-1L, + Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L, +((1, "2"), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(-1L, + Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L) + +// Events to post. +val events = Array( + SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), + createExecutorAddedEvent(1), + createExecutorAddedEvent(2), + createStageSubmittedEvent(0), + createExecutorMetricsUpdateEvent(1, --- End diff -- Adding comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195886625 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { +liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() + val executorMap = liveStageExecutorMetrics.remove( +(event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { +val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics) --- End diff -- This is confusing, especially since the current code does not have the stage level metrics, just executor level. The -1 wouldn't be replaced. PeakExecutorMetrics only tracks the peak metric values for each executor (and later each executor per stage) and doesn't have a timestamp. If there is a local max (500), which is the max between T3 and T5, it would be logged at time T5, even if it happens at T3.5. In actual event order, what the driver sees when the application is running: T1: start of stage 1 T2: value of 1000 for metric m1 T3: start of stage 2 T3.5: peak value of 500 for metric m1 T4: stage 1 ends T5: stage 2 ends Suppose that 1000 (seen at T2) is the peak value of m1 between T1 and T4, so it is the peak value seen while stage 1 is running. The m1=1000 value will be dumped as the max value in an executorMetricsUpdate event right before the stage 1 end event is logged. Also suppose that 500 (seen at T3.5 is the peak value of m1 between T3 and T5, so it is the peak value seen while stage 2 is running. The m1=500 value will be dumped as the max value in an executorMetricsUpdate right before the stage 2 end event is logged. The generated Spark history log would have the following order of events: Start of stage 1 Start of stage 2 executorMetricsUpdate with m1=1000 end of stage 1 executorMetricsUpdate with m1=500 end of stage 2 When the Spark history server is reading the log, it will will create the peakExecutorMetrics for stage 2 when stage 2 starts, which is before it sees the executorMetricsUpdate with m1=1000, and so will store m1=1000 as the current peak value. When it later sees the executorMetricsUpdate with m1=500, it needs to overwrite the m1 value (and set to 500), not compare and update to the max value (which would be 1000). The -1 would indicate that the event is coming from the Spark history log, is a peak value for the stage just about to complete, and should overwrite any previous values. If the timestamp is set to a positive value, then it will do a compare and update to the max value instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195543999 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +101,53 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer]) +@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer]) +val peakMemoryMetrics: Option[Array[Long]]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */ +class PeakMemoryMetricsDeserializer extends JsonDeserializer[Option[Array[Long]]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[Array[Long]] = { +val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, Object]]]) --- End diff -- Let me know if there's something else to try -- I'd also rather avoid the Map[String, Object]. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195543711 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +101,53 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer]) +@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer]) +val peakMemoryMetrics: Option[Array[Long]]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */ +class PeakMemoryMetricsDeserializer extends JsonDeserializer[Option[Array[Long]]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[Array[Long]] = { +val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, Object]]]) --- End diff -- Nope, it's still reading it as an Int, even with java.lang.Long, otherwise that would have been cleaner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195542491 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -304,6 +305,11 @@ class SparkContext(config: SparkConf) extends Logging { _dagScheduler = ds } + private[spark] def heartbeater: Heartbeater = _heartbeater + private[spark] def heartbeater_=(hb: Heartbeater): Unit = { --- End diff -- These aren't used -- I'll remove. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195542018 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +101,53 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer]) +@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer]) +val peakMemoryMetrics: Option[Array[Long]]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */ +class PeakMemoryMetricsDeserializer extends JsonDeserializer[Option[Array[Long]]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[Array[Long]] = { +val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, Object]]]) +metricsMap match { + case Some(metrics) => +Some(MetricGetter.values.map { m => + metrics.getOrElse (m.name, 0L) match { +case intVal: Int => intVal.toLong +case longVal: Long => longVal + } +}.toArray) + case None => None +} + } +} + +/** serializer for peakMemoryMetrics: convert array to map with metric name as key */ +class PeakMemoryMetricsSerializer extends JsonSerializer[Option[Array[Long]]] { + override def serialize( + metrics: Option[Array[Long]], + jsonGenerator: JsonGenerator, + serializerProvider: SerializerProvider): Unit = { +metrics match { + case Some(m) => +val metricsMap = (0 until MetricGetter.values.length).map { idx => --- End diff -- It's still being used in JsonProtocol.executorMetricsToJson -- let me know if you'd like me to convert that to use values instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195541136 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +272,18 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // For the active stages, record any new peak values for the memory metrics for the executor + event.executorUpdates.foreach { executorUpdates => +liveStageExecutorMetrics.values.foreach { peakExecutorMetrics => + val peakMetrics = peakExecutorMetrics.getOrElseUpdate( +event.execId, new PeakExecutorMetrics()) + peakMetrics.compareAndUpdate(executorUpdates) --- End diff -- What would be the right timestamp? Peaks for different metrics could have different timestamps. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195539837 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event --- End diff -- Tracking task start and end would be some amount of overhead. If it's a relatively unlikely corner case, and unlikely to have much impact on the numbers, it may be better to leave as is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195538848 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { +liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() + val executorMap = liveStageExecutorMetrics.remove( +(event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { +val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics) --- End diff -- We need to pass in a value for timestamp, but there isn't really one for the peak metrics, since times for each peak could be different. When processing, -1 will help indicate that the event is coming from the history log, and contains the peak values for the stage that is just ending. When updating the stage executor peaks (peak executor values stored for each active stage), we can replace all of the peak executor metric values instead of updating with the max of current and new values for each metric. As an example, suppose there is the following scenario: T1: start of stage 1 T2: peak value of 1000 for metric m1 T3: start of stage 2 T4: stage 1 ends, and peak metric values for stage 1 are logged, including m1=1000 T5: stage 2 ends, and peak metric values for stage 2 are logged. If values for m1 are < 1000 between T3 (start of stage 2) and T5 (end of stage 2), and say that the highest value for m1 during that period is 500, then we want the peak value for m1 for stage 2 to show as 500. There would be an ExecutorMetricsUpdate event logged (and then read) at T4 (end of stage 1), with m1=1000, which is after T3 (start of stage 2). If when reading the history log, we set the stage 2 peakExecutorMetrics to the max of current or new values from ExecutorMetricsUpdate, then the value for stage 2 would remain at 1000. However, we want it to be replaced by the value of 500 when it gets the ExecutorMetricsUpdate logged at T5 (end of stage 2). During processing of ExecutorMetricsUpdate, for the stage level metrics, it will replace all the peakExecutorMetrics if timestamp is -1. I can add some comments for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195534314 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -93,6 +96,9 @@ private[spark] class EventLoggingListener( // Visible for tests only. private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + // map of live stages, to peak executor metrics for the stage + private val liveStageExecutorMetrics = HashMap[(Int, Int), HashMap[String, PeakExecutorMetrics]]() --- End diff -- modified. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195533972 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1751,7 +1753,7 @@ class DAGScheduler( messageScheduler.shutdownNow() eventProcessLoop.stop() taskScheduler.stop() - } + } --- End diff -- fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user edwinalu commented on the issue: https://github.com/apache/spark/pull/21221 @squito , I've replaced PeakMemoryMetrics with just an Array[Long], and added a custom serializer an deserializer for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user edwinalu commented on the issue: https://github.com/apache/spark/pull/21221 @squito For PeakMemoryMetrics in api.scala, changing to the array gives REST API output of: "peakMemoryMetrics" : { "metrics" : [ 755008624, 100519936, 0, 0, 47962185, 0, 47962185, 0, 98230, 0 ] } instead of: "peakMemoryMetrics" : { "jvmUsedHeapMemory" : 629553808, "jvmUsedNonHeapMemory" : 205304696, "onHeapExecutionMemory" : 0, "offHeapExecutionMemory" : 0, "onHeapStorageMemory" : 905801, "offHeapStorageMemory" : 0, "onHeapUnifiedMemory" : 905801, "offHeapUnifiedMemory" : 0, "directMemory" : 397602, "mappedMemory" : 0 } Would it be OK to revert back to the original version of PeakMemoryMetrics, where each field is listed as a separate element? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user edwinalu commented on the issue: https://github.com/apache/spark/pull/21221 @squito , I'm modifying ExecutorMetrics to take in the metrics array -- this will be easier for tests where we pass in set values, and seems fine for the actual code. It will check that the length of the passed in array is the same as MetricGetter.values.length. Let me know if you have any concerns. @felixcheung , I'll finish the current changes, then rebase. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r191494940 --- Diff: core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.status.api.v1.PeakMemoryMetrics + +/** + * Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no + * values have been recorded yet. + */ +private[spark] class PeakExecutorMetrics { + private var _jvmUsedHeapMemory = -1L; + private var _jvmUsedNonHeapMemory = 0L; + private var _onHeapExecutionMemory = 0L + private var _offHeapExecutionMemory = 0L + private var _onHeapStorageMemory = 0L + private var _offHeapStorageMemory = 0L + private var _onHeapUnifiedMemory = 0L + private var _offHeapUnifiedMemory = 0L + private var _directMemory = 0L + private var _mappedMemory = 0L + + def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory + + def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory + + def onHeapExecutionMemory: Long = _onHeapExecutionMemory + + def offHeapExecutionMemory: Long = _offHeapExecutionMemory + + def onHeapStorageMemory: Long = _onHeapStorageMemory + + def offHeapStorageMemory: Long = _offHeapStorageMemory + + def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory + + def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory + + def directMemory: Long = _directMemory + + def mappedMemory: Long = _mappedMemory + + /** + * Compare the specified memory values with the saved peak executor memory + * values, and update if there is a new peak value. + * + * @param executorMetrics the executor metrics to compare + * @return if there is a new peak value for any metric + */ + def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = { +var updated: Boolean = false + +if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) { + _jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory + updated = true +} +if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) { + _jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory + updated = true +} +if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) { + _onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory + updated = true +} +if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) { + _offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory + updated = true +} +if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) { + _onHeapStorageMemory = executorMetrics.onHeapStorageMemory + updated = true +} +if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) { + _offHeapStorageMemory = executorMetrics.offHeapStorageMemory --- End diff -- Will do. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r191012697 --- Diff: core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.status.api.v1.PeakMemoryMetrics + +/** + * Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no + * values have been recorded yet. + */ +private[spark] class PeakExecutorMetrics { + private var _jvmUsedHeapMemory = -1L; + private var _jvmUsedNonHeapMemory = 0L; + private var _onHeapExecutionMemory = 0L + private var _offHeapExecutionMemory = 0L + private var _onHeapStorageMemory = 0L + private var _offHeapStorageMemory = 0L + private var _onHeapUnifiedMemory = 0L + private var _offHeapUnifiedMemory = 0L + private var _directMemory = 0L + private var _mappedMemory = 0L + + def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory + + def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory + + def onHeapExecutionMemory: Long = _onHeapExecutionMemory + + def offHeapExecutionMemory: Long = _offHeapExecutionMemory + + def onHeapStorageMemory: Long = _onHeapStorageMemory + + def offHeapStorageMemory: Long = _offHeapStorageMemory + + def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory + + def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory + + def directMemory: Long = _directMemory + + def mappedMemory: Long = _mappedMemory + + /** + * Compare the specified memory values with the saved peak executor memory + * values, and update if there is a new peak value. + * + * @param executorMetrics the executor metrics to compare + * @return if there is a new peak value for any metric + */ + def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = { +var updated: Boolean = false + +if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) { + _jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory + updated = true +} +if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) { + _jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory + updated = true +} +if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) { + _onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory + updated = true +} +if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) { + _offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory + updated = true +} +if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) { + _onHeapStorageMemory = executorMetrics.onHeapStorageMemory + updated = true +} +if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) { + _offHeapStorageMemory = executorMetrics.offHeapStorageMemory --- End diff -- Thanks! This is cleaner, and will make it easier to add new metrics. It is very easy to have a copy/paste error. I can merge and make the test changes -- let me know if that sounds good, or if you'd like to make some more changes first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r191012183 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected ExecutorMetricsUpdate, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerExecutorMetricsUpdate] = + Map( +((0, "1"), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))), +((0, "2"), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))), +((1, "1"), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))), +((1, "2"), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L + +// Events to post. +val events = Array( + SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), + createExecutorAddedEvent(1), + createExecutorAddedEvent(2), + createStageSubmittedEvent(0), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)), + createStageSubmittedEvent(1), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 10L, 40L)), + createStageCompletedEvent(0), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L, 20L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 20L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L, 0L)), + createExecutorRemovedEvent(1), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L)), + createStageCompletedEvent(1), + SparkListenerApplicationEnd(1000L)) + +// play the events for the event logger +eventLogger.start() +listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) +listenerBus.addToEventLogQueue(eventLogger) +events.foreach(event => listenerBus.post(event)) +listenerBus.stop() +eventLogger.stop() + +// Verify the log file contains the expected events. +// Posted events should be logged, except for ExecutorMetricsUpdate events -- these +// are consolidated,
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r191011834 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected ExecutorMetricsUpdate, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerExecutorMetricsUpdate] = + Map( +((0, "1"), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))), +((0, "2"), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))), +((1, "1"), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))), +((1, "2"), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L + +// Events to post. +val events = Array( + SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), + createExecutorAddedEvent(1), + createExecutorAddedEvent(2), + createStageSubmittedEvent(0), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)), + createStageSubmittedEvent(1), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 10L, 40L)), + createStageCompletedEvent(0), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L, 20L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 20L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L, 0L)), + createExecutorRemovedEvent(1), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L)), + createStageCompletedEvent(1), + SparkListenerApplicationEnd(1000L)) + +// play the events for the event logger +eventLogger.start() +listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) +listenerBus.addToEventLogQueue(eventLogger) +events.foreach(event => listenerBus.post(event)) +listenerBus.stop() +eventLogger.stop() + +// Verify the log file contains the expected events. +// Posted events should be logged, except for ExecutorMetricsUpdate events -- these +// are consolidated,
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r191008473 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected ExecutorMetricsUpdate, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerExecutorMetricsUpdate] = + Map( +((0, "1"), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))), +((0, "2"), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))), +((1, "1"), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))), +((1, "2"), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L + +// Events to post. +val events = Array( + SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), + createExecutorAddedEvent(1), + createExecutorAddedEvent(2), + createStageSubmittedEvent(0), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)), + createStageSubmittedEvent(1), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 10L, 40L)), + createStageCompletedEvent(0), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L, 20L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 20L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L, 0L)), + createExecutorRemovedEvent(1), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L)), + createStageCompletedEvent(1), + SparkListenerApplicationEnd(1000L)) + +// play the events for the event logger +eventLogger.start() +listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) +listenerBus.addToEventLogQueue(eventLogger) +events.foreach(event => listenerBus.post(event)) +listenerBus.stop() +eventLogger.stop() + +// Verify the log file contains the expected events. +// Posted events should be logged, except for ExecutorMetricsUpdate events -- these +// are consolidated,
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r191008115 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected ExecutorMetricsUpdate, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerExecutorMetricsUpdate] = + Map( +((0, "1"), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))), +((0, "2"), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))), +((1, "1"), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))), +((1, "2"), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L + +// Events to post. +val events = Array( + SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), + createExecutorAddedEvent(1), + createExecutorAddedEvent(2), + createStageSubmittedEvent(0), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)), + createStageSubmittedEvent(1), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 10L, 40L)), + createStageCompletedEvent(0), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L, 20L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L)), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 20L)), + createExecutorMetricsUpdateEvent(1, +new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L, 0L)), + createExecutorRemovedEvent(1), + createExecutorMetricsUpdateEvent(2, +new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L)), + createStageCompletedEvent(1), + SparkListenerApplicationEnd(1000L)) + +// play the events for the event logger +eventLogger.start() +listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) +listenerBus.addToEventLogQueue(eventLogger) +events.foreach(event => listenerBus.post(event)) +listenerBus.stop() +eventLogger.stop() + +// Verify the log file contains the expected events. +// Posted events should be logged, except for ExecutorMetricsUpdate events -- these +// are consolidated,
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r191007795 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -209,6 +210,16 @@ class DAGScheduler( private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) taskScheduler.setDAGScheduler(this) + /** driver heartbeat for collecting metrics */ + private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat, "driver-heartbeater", --- End diff -- Moved. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r190995781 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -800,26 +812,50 @@ private[spark] class Executor( } } } - - /** - * Schedules a task to report heartbeat and partial metrics for active tasks to driver. - */ - private def startDriverHeartbeater(): Unit = { -val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") - -// Wait a random interval so the heartbeats don't end up in sync -val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] - -val heartbeatTask = new Runnable() { - override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat()) -} -heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS) - } } private[spark] object Executor { // This is reserved for internal use by components that need to read task properties before a // task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be // used instead. val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties] + + val DIRECT_BUFFER_POOL_NAME = "direct" + val MAPPED_BUFFER_POOL_NAME = "mapped" + + /** Get the BufferPoolMXBean for the specified buffer pool. */ + def getBufferPool(pool: String): BufferPoolMXBean = { +val name = new ObjectName("java.nio:type=BufferPool,name=" + pool) + ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer, + name.toString, classOf[BufferPoolMXBean]) + } + + /** + * Get the current executor level memory metrics. + * + * @param memoryManager the memory manager + * @param direct the direct memory buffer pool + * @param mapped the mapped memory buffer pool + * @return the executor memory metrics + */ + def getCurrentExecutorMetrics( + memoryManager: MemoryManager, + direct: BufferPoolMXBean, + mapped: BufferPoolMXBean) : ExecutorMetrics = { --- End diff -- Yes, and easier to share the code between driver and executor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r190990997 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +183,35 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { +liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each executor --- End diff -- Yes, it's all running executors, and does not filter based on if they have tasks for the stage. I've updated the comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r190990593 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -93,6 +96,10 @@ private[spark] class EventLoggingListener( // Visible for tests only. private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + // map of live stages, to peak executor metrics for the stage + private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int), +mutable.HashMap[String, PeakExecutorMetrics]]() --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r190990562 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -81,7 +84,7 @@ private[spark] class EventLoggingListener( private val compressionCodecName = compressionCodec.map { c => CompressionCodec.getShortName(c.getClass.getName) } - +logInfo("spark.eventLog.logExecutorMetricsUpdates.enabled is " + shouldLogExecutorMetricsUpdates) --- End diff -- Removed. Thanks, I hadn't meant to push that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r188136553 --- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.TimeUnit + +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * Creates a heartbeat thread which will call the specified reportHeartbeat function at + * intervals of intervalMs. + * + * @param reportHeartbeat the heartbeat reporting function to call. + * @param intervalMs the interval between heartbeats. + */ +private[spark] class Heartbeater(reportHeartbeat: () => Unit, intervalMs: Long) { + // Executor for the heartbeat task + private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater") --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r188136532 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1753,9 +1766,21 @@ class DAGScheduler( messageScheduler.shutdownNow() eventProcessLoop.stop() taskScheduler.stop() +heartbeater.stop() + } + + /** Reports heartbeat metrics for the driver. */ + private def reportHeartBeat(): Unit = { --- End diff -- It's a bit redundant for fields that aren't used by the driver -- for the driver, execution memory gets set to 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187507940 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -93,6 +94,10 @@ private[spark] class EventLoggingListener( // Visible for tests only. private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + // map of live stages, to peak executor metrics for the stage + private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int), --- End diff -- This is tracking peak metric values for executors for each stage, so that the peak values for the stage can be dumped at stage end. The purpose is to reduce the amount of logging, to only number of stages * number of executors ExecutorMetricsUpdate events. I originally tried logging for new peak values, resetting when a new stage begins -- this is simpler, but can lead to more events being logged. Having stage level information is useful for users trying to identify which stages are more memory intensive. This information could be useful they are trying to reduce the amount of memory used, since they would know which stages (and the relevant code) to focus on. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187507139 --- Diff: core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.status.api.v1.PeakMemoryMetrics + +/** + * Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no + * values have been recorded yet. + */ +private[spark] class PeakExecutorMetrics { --- End diff -- I got some errors when trying to add methods to ExecutorMetrics. I don't remember the details, but can try this again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187506958 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +179,27 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +// log the peak executor metrics for the stage, for each executor +val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() +val executorMap = liveStageExecutorMetrics.remove( --- End diff -- Yes, it's safer to clean up earlier attempts -- I can add some code to iterate through earlier attemptIDs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187506780 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +179,27 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +// log the peak executor metrics for the stage, for each executor +val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() +val executorMap = liveStageExecutorMetrics.remove( + (event.stageInfo.stageId, event.stageInfo.attemptNumber())) +executorMap.foreach { + executorEntry => { +for ((executorId, peakExecutorMetrics) <- executorEntry) { --- End diff -- The for loop (line 187) is going through the hashmap entries of executorId to peakExecutorMetrics, so there are multiple values. Could you please provide more detail for how "case (executorId, peakExecutorMetrics) =>" would work? If the for loop is OK, then I can add some comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187501194 --- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.TimeUnit + +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * Creates a heartbeat thread which will call the specified reportHeartbeat function at + * intervals of intervalMs. + * + * @param reportHeartbeat the heartbeat reporting function to call. + * @param intervalMs the interval between heartbeats. + */ +private[spark] class Heartbeater(reportHeartbeat: () => Unit, intervalMs: Long) { + // Executor for the heartbeat task + private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater") --- End diff -- Yes, this could be for the driver as well, so could be misleading. I can change to "heartbeater". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187501157 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1753,9 +1766,21 @@ class DAGScheduler( messageScheduler.shutdownNow() eventProcessLoop.stop() taskScheduler.stop() +heartbeater.stop() + } + + /** Reports heartbeat metrics for the driver. */ + private def reportHeartBeat(): Unit = { --- End diff -- With cluster mode, including YARN, there isn't a local executor, so the metrics for the driver would not be collected. Perhaps this could be modified to skip this step for local mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu closed the pull request at: https://github.com/apache/spark/pull/20940 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user edwinalu commented on the issue: https://github.com/apache/spark/pull/20940 @squito, it is really weird. I had some problems with the last sync with master, and it looks like some of the changes from master got added. I've opened a new pull request: https://github.com/apache/spark/pull/21221 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
GitHub user edwinalu opened a pull request: https://github.com/apache/spark/pull/21221 [SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API The original PR #20940 is messed up, and the dif shows changes not related to SPARK-23429. This is a cleaned up version of that pull request. Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage memory, on/off heap unified memory, direct memory, and mapped memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction. ## What changes were proposed in this pull request? An ExecutorMetrics class is added, with jvmUsedHeapMemory, jvmUsedNonHeapMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory, onHeapUnifiedMemory, offHeapUnifiedMemory, directMemory and mappedMemory. The new ExecutorMetrics is sent by executors to the driver as part of the Heartbeat. A heartbeat is added for the driver as well, to collect these metrics for the driver. The EventLoggingListener store information about the peak values for each metric, per active stage and executor. When a StageCompleted event is seen, and ExecutorMetricsUpdate event will be logged for each executor, with peal values for the stage. Only the ExecutorMetrics will be logged, and not the TaskMetrics, to minimize additional logging. The AppStatusListener records the peak values for each memory metric. The new memory metrics are added to the executors REST API. ## How was this patch tested? New unit tests have been added. This was also tested on our cluster. You can merge this pull request into a Git repository by running: $ git pull https://github.com/edwinalu/spark SPARK-23429.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21221.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21221 commit c8e8abedbdfec6e92b0c63e90f3c2c5755fd8978 Author: Edwina Lu Date: 2018-03-09T23:39:36Z SPARK-23429: Add executor memory metrics to heartbeat and expose in executors REST API Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction. Add an ExecutorMetrics class, with jvmUsedMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics will be sent by executors to the driver as part of Heartbeat. A heartbeat will be added for the driver as well, to collect these metrics for the driver. Modify the EventLoggingListener to log ExecutorMetricsUpdate events if there is a new peak value for any of the memory metrics for an executor and stage. Only the ExecutorMetrics will be logged, and not the TaskMetrics, to minimize additional logging. Modify the AppStatusListener to record the peak values for each memory metric. Add the new memory metrics to the executors REST API. commit 5d6ae1c34bf6618754e4b8b2e756a9a7b4bad987 Author: Edwina Lu Date: 2018-04-02T02:13:41Z modify MimaExcludes.scala to filter changes to SparkListenerExecutorMetricsUpdate commit ad10d2814bbfbaf8c21fcbb1abe83ef7a8e9ffe7 Author: Edwina Lu Date: 2018-04-22T00:02:57Z Address code review comments, change event logging to stage end. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user edwinalu commented on the issue: https://github.com/apache/spark/pull/20940 @rezsafi, thanks for reviewing. It would be better if the heartbeat wasn't tied to sampling frequency. Most likely users would want to sample more frequently, although this would also mean more communication traffic between executors and driver. I'm out of town right now, but am planning to reschedule the design discussion when I am back, hopefully in a couple of weeks, and will invite you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user edwinalu commented on the issue: https://github.com/apache/spark/pull/20940 Could a committer please request a retest? It looks like the tests passed (https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89685/testReport/), and the failure occurs after posting to github: Attempting to post to Github... [error] running /home/jenkins/workspace/SparkPullRequestBuilder/build/sbt -Phadoop-2.6 -Pkubernetes -Pflume -Phive-thriftserver -Pyarn -Pkafka-0-8 -Phive -Pkinesis-asl -Pmesos test ; process was terminated by signal 9 > Post successful. Build step 'Execute shell' marked build as failure Archiving artifacts Recording test results Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89685/ Test FAILed. Finished: FAILURE --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r181943630 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- Thanks for your thoughts on this. Size of message, and also logging, but it is only an extra few longs per heartbeat, and and similarly for logging. Task end would help with minimizing communication for longer running tasks. The heartbeats are only every 10 seconds, so perhaps not so bad. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r181611071 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- ExecutorMetrics right now has: jvmUsedHeapMemory, jvmUsedNonHeapMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory. For logging at stage end, we can log the peak for each of these, but unified memory is more problematic. We could add new fields for on heap/off heap unified memory, but I'm inclined to remove unified memory (from all the places it is currently used), rather than add more fields. Users can still sum peak execution and peak storage values, which may be larger than the actual peak unified memory if they are not at peak values at the same time, but should still be a reasonable estimate for sizing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180446530 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- I will make the change to log at stage end, and will update the design doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180287725 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- The original analysis was focused on some test applications and larger applications. Looking at a broader sample of our applications, the logging overhead is higher, averaging about 14% per application, and 4% overall (sum of logging for ExecutorMetricsUpdate / sum of Spark history logs). The overhead for larger Spark history logs is mostly pretty small, but increases significantly for smaller ones (there's one at 49%). There's often some logging before the first stage starts, which is extra overhead especially for smaller applications/history logs, that doesn't contain useful information. It can also be high for the case where the stage takes a long time to run and memory is increasing rather than reaching the peak quickly -- logging at stage end would work better for this case. I should also note that these numbers are for the case where only 4 longs are recorded, and with more metrics, the overhead would be higher, both in the size of each logged event, and the number of potential peaks, since a new peak for any metric would be logged. Since there will be more metrics added, and the cost is higher than originally added, logging at stage end could be a better choice. We would lose some information about overlapping stages, but this information wouldn't be visible anyway with the currently planned REST APIs or web UI, which just show the peaks for stages and executors. For logging at stage end, we can log an ExecutorMetricsUpdate event for each executor that has sent a heartbeat for the stage just before logging the stage end -- this would have the peak value for each metric. This should be the minimum amount of logging needed to have information about peak values per stage per executor. Alternatively, the information could be added to the StageCompleted event for more compaction, but the code would be more complicated, with 2 paths for reading in values. Logging an event per executor at stage end seems like a reasonable choice, not too much extra logging or too much extra complexity. What are your thoughts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180204845 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -772,6 +772,12 @@ private[spark] class Executor( val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() val curGCTime = computeTotalGcTime() +// get executor level memory metrics +val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(), + ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(), --- End diff -- Thanks, I will play around with it a bit. If it seems more complicated or expensive, I'll file a separate subtask. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180180795 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -772,6 +772,12 @@ private[spark] class Executor( val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() val curGCTime = computeTotalGcTime() +// get executor level memory metrics +val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(), + ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(), --- End diff -- We could add ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed(), for total non-heap memory used by the JVM. For direct and memory mapped usage, would collecting these be similar to https://gist.github.com/t3rmin4t0r/1a753ccdcfa8d111f07c ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r180179243 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- We're not constructing a timeline currently, and yes, with only peak values, it could be rather sparse. We would get new values with new stages, but would not see decreases in memory during a stage. The situation where there is a stage 1 with peak X, and then stage 2 starts with peak Y > X is interesting though, and it would be useful to have this information, since we would then know to focus on stage 2 for memory consumption, even though both 1 and 2 could have the same peak values. Logging at stage start would double the amount of logging, and would be trickier, so I'd prefer either keeping the current approach or only logging at stage end. The higher logging was for smaller applications (and smaller logs). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179978448 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -772,6 +772,12 @@ private[spark] class Executor( val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() val curGCTime = computeTotalGcTime() +// get executor level memory metrics +val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(), + ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(), --- End diff -- It would be useful to have more information about offheap memory usage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179978186 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// list of events and if they should be logged +val events = Array( + (SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), true), + (createExecutorAddedEvent(1), true), + (createExecutorAddedEvent(2), true), + (createStageSubmittedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), true), // onheap storage + (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), true), // JVM used + (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), true), // onheap unified + (createStageSubmittedEvent(1), true), + (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), true), // new stage + (createStageCompletedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), true), // onheap execution + (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), true), // offheap execution + (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), true), // offheap storage + (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), false), + (createStageCompletedEvent(1), true), + (SparkListenerApplicationEnd(1000L), true)) + +// play the events for the event logger +eventLogger.start() +listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) +listenerBus.addToEventLogQueue(eventLogger) +for ((event, included) <- events) { + listenerBus.post(event) +} +listenerBus.stop() +eventLogger.stop() + +// Verify the log file contains the expected events +val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) +try { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 19) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + var i = 1 + for ((event, included) <- events) { +if (included) { + checkEvent(lines(i), event) + i += 1 +} + } +} finally { + logData.close() +} + } + + /** Create a stage submitted event for the specified stage Id. */ + private def createStageSubmittedEvent(stageId: Int) = +SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + + /** Create a stage completed event for the specified stage Id. */ + private def createStageCompletedEvent(stageId: Int) = +SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + + /** Create an executor added event for the specified executor Id. */ + private def createExecutorAddedEvent(executorId: Int) = +SparkListenerExecutorAdded(0L, executorId.toString, new ExecutorInf
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179978222 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// list of events and if they should be logged +val events = Array( + (SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), true), + (createExecutorAddedEvent(1), true), + (createExecutorAddedEvent(2), true), + (createStageSubmittedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), true), // onheap storage + (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), true), // JVM used + (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), true), // onheap unified + (createStageSubmittedEvent(1), true), + (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), true), // new stage + (createStageCompletedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), true), // onheap execution + (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), true), // offheap execution + (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), true), // offheap storage + (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), false), + (createStageCompletedEvent(1), true), + (SparkListenerApplicationEnd(1000L), true)) + +// play the events for the event logger +eventLogger.start() +listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) +listenerBus.addToEventLogQueue(eventLogger) +for ((event, included) <- events) { + listenerBus.post(event) +} +listenerBus.stop() +eventLogger.stop() + +// Verify the log file contains the expected events +val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) +try { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 19) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + var i = 1 + for ((event, included) <- events) { +if (included) { + checkEvent(lines(i), event) + i += 1 +} + } +} finally { + logData.close() +} + } + + /** Create a stage submitted event for the specified stage Id. */ + private def createStageSubmittedEvent(stageId: Int) = +SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + + /** Create a stage completed event for the specified stage Id. */ + private def createStageCompletedEvent(stageId: Int) = +SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + + /** Create an executor added event for the specified executor Id. */ + private def createExecutorAddedEvent(executorId: Int) = +SparkListenerExecutorAdded(0L, executorId.toString, new ExecutorInf
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179978192 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// list of events and if they should be logged +val events = Array( + (SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), true), + (createExecutorAddedEvent(1), true), + (createExecutorAddedEvent(2), true), + (createStageSubmittedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), true), // onheap storage + (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), true), // JVM used + (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), true), // onheap unified + (createStageSubmittedEvent(1), true), + (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), true), // new stage + (createStageCompletedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), true), // onheap execution + (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), true), // offheap execution + (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), true), // offheap storage + (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), false), + (createStageCompletedEvent(1), true), + (SparkListenerApplicationEnd(1000L), true)) + +// play the events for the event logger +eventLogger.start() +listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) +listenerBus.addToEventLogQueue(eventLogger) +for ((event, included) <- events) { + listenerBus.post(event) +} +listenerBus.stop() +eventLogger.stop() + +// Verify the log file contains the expected events +val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) +try { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 19) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + var i = 1 + for ((event, included) <- events) { +if (included) { + checkEvent(lines(i), event) + i += 1 +} + } +} finally { + logData.close() +} + } + + /** Create a stage submitted event for the specified stage Id. */ + private def createStageSubmittedEvent(stageId: Int) = --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179978182 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -268,6 +268,9 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE def hasMemoryInfo: Boolean = totalOnHeap >= 0L + // peak values for executor level metrics + var peakExecutorMetrics = new PeakExecutorMetrics --- End diff -- Yes, thanks for catching this, it should be val. This is more properly part of SPARK-23431, and I can remove for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20940: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179978102 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + /** + * Log if there is a new peak value for one of the memory metrics for the given executor. + * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will + * log new peak memory metric values per executor per stage. + */ + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { --- End diff -- For a longer running stage, once it ramps up, hopefully there wouldn't be a lot of new peak values. Looking at a subset of our applications, the extra logging overhead has mostly been between 0.25% to 1%, but it can be 8%. By logging each peak value at the time they occur (and reinitializing when a stage starts), it's possible to tell which stages are active at the time, and it would potentially be possible to graph these changes on a timeline -- this information wouldn't be available if the metrics are only logged at stage end, and the times are lost. Logging at stage end would limit the amount of extra logging. If we add more metrics (such as for offheap), then there could be more new peaks and more extra logging with the current approach. Excess logging is a concern, and I can move to stage end if the overhead is too much. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org