Repository: spark Updated Branches: refs/heads/master 28f9f3f22 -> 1437e344e
[SPARK-22050][CORE] Allow BlockUpdated events to be optionally logged to the event log ## What changes were proposed in this pull request? I see that block updates are not logged to the event log. This makes sense as a default for performance reasons. However, I find it helpful when trying to get a better understanding of caching for a job to be able to log these updates. This PR adds a configuration setting `spark.eventLog.blockUpdates` (defaulting to false) which allows block updates to be recorded in the log. This contribution is original work which is licensed to the Apache Spark project. ## How was this patch tested? Current and additional unit tests. Author: Michael Mior <mm...@uwaterloo.ca> Closes #19263 from michaelmior/log-block-updates. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1437e344 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1437e344 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1437e344 Branch: refs/heads/master Commit: 1437e344ec0c29a44a19f4513986f5f184c44695 Parents: 28f9f3f Author: Michael Mior <mm...@uwaterloo.ca> Authored: Tue Oct 17 14:30:52 2017 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Tue Oct 17 14:30:52 2017 -0700 ---------------------------------------------------------------------- .../apache/spark/internal/config/package.scala | 23 +++++++++++++ .../spark/scheduler/EventLoggingListener.scala | 18 +++++++---- .../org/apache/spark/util/JsonProtocol.scala | 34 ++++++++++++++++++-- .../scheduler/EventLoggingListenerSuite.scala | 2 ++ .../apache/spark/util/JsonProtocolSuite.scala | 27 ++++++++++++++++ docs/configuration.md | 8 +++++ 6 files changed, 104 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e7b406a..0c36bdc 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -41,6 +41,29 @@ package object config { .bytesConf(ByteUnit.MiB) .createWithDefaultString("1g") + private[spark] val EVENT_LOG_COMPRESS = + ConfigBuilder("spark.eventLog.compress") + .booleanConf + .createWithDefault(false) + + private[spark] val EVENT_LOG_BLOCK_UPDATES = + ConfigBuilder("spark.eventLog.logBlockUpdates.enabled") + .booleanConf + .createWithDefault(false) + + private[spark] val EVENT_LOG_TESTING = + ConfigBuilder("spark.eventLog.testing") + .internal() + .booleanConf + .createWithDefault(false) + + private[spark] val EVENT_LOG_OUTPUT_BUFFER_SIZE = ConfigBuilder("spark.eventLog.buffer.kb") + .bytesConf(ByteUnit.KiB) + .createWithDefaultString("100k") + + private[spark] val EVENT_LOG_OVERWRITE = + ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false) + private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 9dafa0b..a77adc5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -37,6 +37,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{SPARK_VERSION, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} @@ -45,6 +46,7 @@ import org.apache.spark.util.{JsonProtocol, Utils} * * Event logging is specified by the following configurable parameters: * spark.eventLog.enabled - Whether event logging is enabled. + * spark.eventLog.logBlockUpdates.enabled - Whether to log block updates * spark.eventLog.compress - Whether to compress logged events * spark.eventLog.overwrite - Whether to overwrite any existing files. * spark.eventLog.dir - Path to the directory in which events are logged. @@ -64,10 +66,11 @@ private[spark] class EventLoggingListener( this(appId, appAttemptId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf)) - private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) - private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) - private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) - private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 + private val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + private val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) + private val shouldLogBlockUpdates = sparkConf.get(EVENT_LOG_BLOCK_UPDATES) + private val testing = sparkConf.get(EVENT_LOG_TESTING) + private val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) private val compressionCodec = if (shouldCompress) { @@ -216,8 +219,11 @@ private[spark] class EventLoggingListener( logEvent(event, flushLogger = true) } - // No-op because logging every update would be overkill - override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {} + override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { + if (shouldLogBlockUpdates) { + logEvent(event, flushLogger = true) + } + } // No-op because logging every update would be overkill override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8406826..5e60218 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -98,8 +98,8 @@ private[spark] object JsonProtocol { logStartToJson(logStart) case metricsUpdate: SparkListenerExecutorMetricsUpdate => executorMetricsUpdateToJson(metricsUpdate) - case blockUpdated: SparkListenerBlockUpdated => - throw new MatchError(blockUpdated) // TODO(ekl) implement this + case blockUpdate: SparkListenerBlockUpdated => + blockUpdateToJson(blockUpdate) case _ => parse(mapper.writeValueAsString(event)) } } @@ -246,6 +246,12 @@ private[spark] object JsonProtocol { }) } + def blockUpdateToJson(blockUpdate: SparkListenerBlockUpdated): JValue = { + val blockUpdatedInfo = blockUpdatedInfoToJson(blockUpdate.blockUpdatedInfo) + ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockUpdate) ~ + ("Block Updated Info" -> blockUpdatedInfo) + } + /** ------------------------------------------------------------------- * * JSON serialization methods for classes SparkListenerEvents depend on | * -------------------------------------------------------------------- */ @@ -458,6 +464,14 @@ private[spark] object JsonProtocol { ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) } + def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo): JValue = { + ("Block Manager ID" -> blockManagerIdToJson(blockUpdatedInfo.blockManagerId)) ~ + ("Block ID" -> blockUpdatedInfo.blockId.toString) ~ + ("Storage Level" -> storageLevelToJson(blockUpdatedInfo.storageLevel)) ~ + ("Memory Size" -> blockUpdatedInfo.memSize) ~ + ("Disk Size" -> blockUpdatedInfo.diskSize) + } + /** ------------------------------ * * Util JSON serialization methods | * ------------------------------- */ @@ -515,6 +529,7 @@ private[spark] object JsonProtocol { val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved) val logStart = Utils.getFormattedClassName(SparkListenerLogStart) val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate) + val blockUpdate = Utils.getFormattedClassName(SparkListenerBlockUpdated) } def sparkEventFromJson(json: JValue): SparkListenerEvent = { @@ -538,6 +553,7 @@ private[spark] object JsonProtocol { case `executorRemoved` => executorRemovedFromJson(json) case `logStart` => logStartFromJson(json) case `metricsUpdate` => executorMetricsUpdateFromJson(json) + case `blockUpdate` => blockUpdateFromJson(json) case other => mapper.readValue(compact(render(json)), Utils.classForName(other)) .asInstanceOf[SparkListenerEvent] } @@ -676,6 +692,11 @@ private[spark] object JsonProtocol { SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates) } + def blockUpdateFromJson(json: JValue): SparkListenerBlockUpdated = { + val blockUpdatedInfo = blockUpdatedInfoFromJson(json \ "Block Updated Info") + SparkListenerBlockUpdated(blockUpdatedInfo) + } + /** --------------------------------------------------------------------- * * JSON deserialization methods for classes SparkListenerEvents depend on | * ---------------------------------------------------------------------- */ @@ -989,6 +1010,15 @@ private[spark] object JsonProtocol { new ExecutorInfo(executorHost, totalCores, logUrls) } + def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = { + val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") + val blockId = BlockId((json \ "Block ID").extract[String]) + val storageLevel = storageLevelFromJson(json \ "Storage Level") + val memorySize = (json \ "Memory Size").extract[Long] + val diskSize = (json \ "Disk Size").extract[Long] + BlockUpdatedInfo(blockManagerId, blockId, storageLevel, memorySize, diskSize) + } + /** -------------------------------- * * Util JSON deserialization methods | * --------------------------------- */ http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 6b42775..a9e92fa 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -228,6 +228,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit SparkListenerStageCompleted, SparkListenerTaskStart, SparkListenerTaskEnd, + SparkListenerBlockUpdated, SparkListenerApplicationEnd).map(Utils.getFormattedClassName) Utils.tryWithSafeFinally { val logStart = SparkListenerLogStart(SPARK_VERSION) @@ -291,6 +292,7 @@ object EventLoggingListenerSuite { def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None): SparkConf = { val conf = new SparkConf conf.set("spark.eventLog.enabled", "true") + conf.set("spark.eventLog.logBlockUpdates.enabled", "true") conf.set("spark.eventLog.testing", "true") conf.set("spark.eventLog.dir", logDir.toString) compressionCodec.foreach { codec => http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a1a8587..4abbb8e 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -96,6 +96,9 @@ class JsonProtocolSuite extends SparkFunSuite { .zipWithIndex.map { case (a, i) => a.copy(id = i) } SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates))) } + val blockUpdated = + SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId("Stars", + "In your multitude...", 300), RDDBlockId(0, 0), StorageLevel.MEMORY_ONLY, 100L, 0L)) testEvent(stageSubmitted, stageSubmittedJsonString) testEvent(stageCompleted, stageCompletedJsonString) @@ -120,6 +123,7 @@ class JsonProtocolSuite extends SparkFunSuite { testEvent(nodeBlacklisted, nodeBlacklistedJsonString) testEvent(nodeUnblacklisted, nodeUnblacklistedJsonString) testEvent(executorMetricsUpdate, executorMetricsUpdateJsonString) + testEvent(blockUpdated, blockUpdatedJsonString) } test("Dependent Classes") { @@ -2007,6 +2011,29 @@ private[spark] object JsonProtocolSuite extends Assertions { |} """.stripMargin + private val blockUpdatedJsonString = + """ + |{ + | "Event": "SparkListenerBlockUpdated", + | "Block Updated Info": { + | "Block Manager ID": { + | "Executor ID": "Stars", + | "Host": "In your multitude...", + | "Port": 300 + | }, + | "Block ID": "rdd_0_0", + | "Storage Level": { + | "Use Disk": false, + | "Use Memory": true, + | "Deserialized": true, + | "Replication": 1 + | }, + | "Memory Size": 100, + | "Disk Size": 0 + | } + |} + """.stripMargin + private val executorBlacklistedJsonString = s""" |{ http://git-wip-us.apache.org/repos/asf/spark/blob/1437e344/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index bb06c8f..7b9e16a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -715,6 +715,14 @@ Apart from these, the following properties are also available, and may be useful <table class="table"> <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> <tr> + <td><code>spark.eventLog.logBlockUpdates.enabled</code></td> + <td>false</td> + <td> + Whether to log events for every block update, if <code>spark.eventLog.enabled</code> is true. + *Warning*: This will increase the size of the event log considerably. + </td> +</tr> +<tr> <td><code>spark.eventLog.compress</code></td> <td>false</td> <td> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org