This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a39fc8773b2 [SPARK-39636][CORE][UI] Fix multiple bugs in JsonProtocol, impacting off heap StorageLevels and Task/Executor ResourceRequests a39fc8773b2 is described below commit a39fc8773b2a4e9c58a1e5d0010e0c8396784c37 Author: Josh Rosen <joshro...@databricks.com> AuthorDate: Thu Jun 30 13:41:24 2022 -0700 [SPARK-39636][CORE][UI] Fix multiple bugs in JsonProtocol, impacting off heap StorageLevels and Task/Executor ResourceRequests ### What changes were proposed in this pull request? This PR fixes three longstanding bugs in Spark's `JsonProtocol`: - `TaskResourceRequest` loses precision for `amount` < 0.5. The `amount` is a floating point number which is either between 0 and 0.5 or is a positive integer, but the JSON read path assumes it is an integer. - `ExecutorResourceRequest` integer overflows for values larger than Int.MaxValue because the write path writes longs but the read path assumes integers. - Off heap StorageLevels are not handled properly: the `useOffHeap` field isn't included in the JSON, so this StorageLevel cannot be round-tripped through JSON. This could cause the History Server to display inaccurate "off heap memory used" stats on the executors page. I discovered these bugs while working on #36885. ### Why are the changes needed? JsonProtocol should be able to roundtrip events through JSON without loss of information. ### Does this PR introduce _any_ user-facing change? Yes: it fixes bugs that impact information shown in the History Server Web UI. The new StorageLevel JSON field will be visible to tools which process raw event log JSON. ### How was this patch tested? Updated existing unit tests to cover the changed logic. Closes #37027 from JoshRosen/jsonprotocol-bugfixes. Authored-by: Josh Rosen <joshro...@databricks.com> Signed-off-by: Josh Rosen <joshro...@databricks.com> --- .../scala/org/apache/spark/util/JsonProtocol.scala | 17 +++++-- .../org/apache/spark/util/JsonProtocolSuite.scala | 53 ++++++++++++++++++++-- 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 0c15b13d5a1..f0755b04bef 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -512,6 +512,7 @@ private[spark] object JsonProtocol { def storageLevelToJson(storageLevel: StorageLevel): JValue = { ("Use Disk" -> storageLevel.useDisk) ~ ("Use Memory" -> storageLevel.useMemory) ~ + ("Use Off Heap" -> storageLevel.useOffHeap) ~ ("Deserialized" -> storageLevel.deserialized) ~ ("Replication" -> storageLevel.replication) } @@ -750,7 +751,7 @@ private[spark] object JsonProtocol { def executorResourceRequestFromJson(json: JValue): ExecutorResourceRequest = { val rName = (json \ "Resource Name").extract[String] - val amount = (json \ "Amount").extract[Int] + val amount = (json \ "Amount").extract[Long] val discoveryScript = (json \ "Discovery Script").extract[String] val vendor = (json \ "Vendor").extract[String] new ExecutorResourceRequest(rName, amount, discoveryScript, vendor) @@ -758,7 +759,7 @@ private[spark] object JsonProtocol { def taskResourceRequestFromJson(json: JValue): TaskResourceRequest = { val rName = (json \ "Resource Name").extract[String] - val amount = (json \ "Amount").extract[Int] + val amount = (json \ "Amount").extract[Double] new TaskResourceRequest(rName, amount) } @@ -1202,9 +1203,19 @@ private[spark] object JsonProtocol { def storageLevelFromJson(json: JValue): StorageLevel = { val useDisk = (json \ "Use Disk").extract[Boolean] val useMemory = (json \ "Use Memory").extract[Boolean] + // The "Use Off Heap" field was added in Spark 3.4.0 + val useOffHeap = jsonOption(json \ "Use Off Heap") match { + case Some(value) => value.extract[Boolean] + case None => false + } val deserialized = (json \ "Deserialized").extract[Boolean] val replication = (json \ "Replication").extract[Int] - StorageLevel(useDisk, useMemory, deserialized, replication) + StorageLevel( + useDisk = useDisk, + useMemory = useMemory, + useOffHeap = useOffHeap, + deserialized = deserialized, + replication = replication) } def blockStatusFromJson(json: JValue): BlockStatus = { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index ea6267698c8..7a18223ec5b 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -136,9 +136,14 @@ class JsonProtocolSuite extends SparkFunSuite { 321L, 654L, 765L, 256912L, 123456L, 123456L, 61728L, 30364L, 15182L, 10L, 90L, 2L, 20L, 80001L))) val rprofBuilder = new ResourceProfileBuilder() - val taskReq = new TaskResourceRequests().cpus(1).resource("gpu", 1) - val execReq = - new ExecutorResourceRequests().cores(2).resource("gpu", 2, "myscript") + val taskReq = new TaskResourceRequests() + .cpus(1) + .resource("gpu", 1) + .resource("fgpa", 0.5) + val execReq: ExecutorResourceRequests = new ExecutorResourceRequests() + .cores(2) + .resource("gpu", 2, "myscript") + .resource("myCustomResource", amount = Int.MaxValue + 1L, discoveryScript = "myscript2") rprofBuilder.require(taskReq).require(execReq) val resourceProfile = rprofBuilder.build resourceProfile.setResourceProfileId(21) @@ -203,6 +208,7 @@ class JsonProtocolSuite extends SparkFunSuite { testStorageLevel(StorageLevel.MEMORY_AND_DISK_2) testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER) testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER_2) + testStorageLevel(StorageLevel.OFF_HEAP) // JobResult val exception = new Exception("Out of Memory! Please restock film.") @@ -319,6 +325,21 @@ class JsonProtocolSuite extends SparkFunSuite { val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) } + test("StorageLevel backward compatibility") { + // "Use Off Heap" was added in Spark 3.4.0 + val level = StorageLevel( + useDisk = false, + useMemory = true, + useOffHeap = true, + deserialized = false, + replication = 1 + ) + val newJson = JsonProtocol.storageLevelToJson(level) + val oldJson = newJson.removeField { case (field, _) => field == "Use Off Heap" } + val newLevel = JsonProtocol.storageLevelFromJson(oldJson) + assert(newLevel.useOffHeap === false) + } + test("BlockManager events backward compatibility") { // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property. val blockManagerAdded = SparkListenerBlockManagerAdded(1L, @@ -1189,6 +1210,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1437,6 +1459,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": false, | "Replication": 2 | }, @@ -1563,6 +1586,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": false, | "Replication": 2 | }, @@ -1689,6 +1713,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": false, | "Replication": 2 | }, @@ -1722,6 +1747,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1769,6 +1795,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1787,6 +1814,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1834,6 +1862,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1852,6 +1881,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1870,6 +1900,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1917,6 +1948,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1935,6 +1967,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1953,6 +1986,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1971,6 +2005,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -2291,6 +2326,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": false, | "Replication": 2 | }, @@ -2489,6 +2525,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": false, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -2578,6 +2615,12 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Discovery Script":"", | "Vendor":"" | }, + | "myCustomResource":{ + | "Resource Name":"myCustomResource", + | "Amount": 2147483648, + | "Discovery Script": "myscript2", + | "Vendor" : "" + | }, | "gpu":{ | "Resource Name":"gpu", | "Amount":2, @@ -2593,6 +2636,10 @@ private[spark] object JsonProtocolSuite extends Assertions { | "gpu":{ | "Resource Name":"gpu", | "Amount":1.0 + | }, + | "fgpa":{ + | "Resource Name":"fgpa", + | "Amount":0.5 | } | } |} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org