This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a39fc8773b2 [SPARK-39636][CORE][UI] Fix multiple bugs in JsonProtocol, 
impacting off heap StorageLevels and Task/Executor ResourceRequests
a39fc8773b2 is described below

commit a39fc8773b2a4e9c58a1e5d0010e0c8396784c37
Author: Josh Rosen <joshro...@databricks.com>
AuthorDate: Thu Jun 30 13:41:24 2022 -0700

    [SPARK-39636][CORE][UI] Fix multiple bugs in JsonProtocol, impacting off 
heap StorageLevels and Task/Executor ResourceRequests
    
    ### What changes were proposed in this pull request?
    
    This PR fixes three longstanding bugs in Spark's `JsonProtocol`:
    
    - `TaskResourceRequest` loses precision for `amount` < 0.5. The `amount` is 
a floating point number which is either between 0 and 0.5 or is a positive 
integer, but the JSON read path assumes it is an integer.
    - `ExecutorResourceRequest` integer overflows for values larger than 
Int.MaxValue because the write path writes longs but the read path assumes 
integers.
    - Off heap StorageLevels are not handled properly: the `useOffHeap` field 
isn't included in the JSON, so this StorageLevel cannot be round-tripped 
through JSON. This could cause the History Server to display inaccurate "off 
heap memory used" stats on the executors page.
    
    I discovered these bugs while working on #36885.
    
    ### Why are the changes needed?
    
    JsonProtocol should be able to roundtrip events through JSON without loss 
of information.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes: it fixes bugs that impact information shown in the History Server Web 
UI. The new StorageLevel JSON field will be visible to tools which process raw 
event log JSON.
    
    ### How was this patch tested?
    
    Updated existing unit tests to cover the changed logic.
    
    Closes #37027 from JoshRosen/jsonprotocol-bugfixes.
    
    Authored-by: Josh Rosen <joshro...@databricks.com>
    Signed-off-by: Josh Rosen <joshro...@databricks.com>
---
 .../scala/org/apache/spark/util/JsonProtocol.scala | 17 +++++--
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 53 ++++++++++++++++++++--
 2 files changed, 64 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 0c15b13d5a1..f0755b04bef 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -512,6 +512,7 @@ private[spark] object JsonProtocol {
   def storageLevelToJson(storageLevel: StorageLevel): JValue = {
     ("Use Disk" -> storageLevel.useDisk) ~
     ("Use Memory" -> storageLevel.useMemory) ~
+    ("Use Off Heap" -> storageLevel.useOffHeap) ~
     ("Deserialized" -> storageLevel.deserialized) ~
     ("Replication" -> storageLevel.replication)
   }
@@ -750,7 +751,7 @@ private[spark] object JsonProtocol {
 
   def executorResourceRequestFromJson(json: JValue): ExecutorResourceRequest = 
{
     val rName = (json \ "Resource Name").extract[String]
-    val amount = (json \ "Amount").extract[Int]
+    val amount = (json \ "Amount").extract[Long]
     val discoveryScript = (json \ "Discovery Script").extract[String]
     val vendor = (json \ "Vendor").extract[String]
     new ExecutorResourceRequest(rName, amount, discoveryScript, vendor)
@@ -758,7 +759,7 @@ private[spark] object JsonProtocol {
 
   def taskResourceRequestFromJson(json: JValue): TaskResourceRequest = {
     val rName = (json \ "Resource Name").extract[String]
-    val amount = (json \ "Amount").extract[Int]
+    val amount = (json \ "Amount").extract[Double]
     new TaskResourceRequest(rName, amount)
   }
 
@@ -1202,9 +1203,19 @@ private[spark] object JsonProtocol {
   def storageLevelFromJson(json: JValue): StorageLevel = {
     val useDisk = (json \ "Use Disk").extract[Boolean]
     val useMemory = (json \ "Use Memory").extract[Boolean]
+    // The "Use Off Heap" field was added in Spark 3.4.0
+    val useOffHeap = jsonOption(json \ "Use Off Heap") match {
+      case Some(value) => value.extract[Boolean]
+      case None => false
+    }
     val deserialized = (json \ "Deserialized").extract[Boolean]
     val replication = (json \ "Replication").extract[Int]
-    StorageLevel(useDisk, useMemory, deserialized, replication)
+    StorageLevel(
+      useDisk = useDisk,
+      useMemory = useMemory,
+      useOffHeap = useOffHeap,
+      deserialized = deserialized,
+      replication = replication)
   }
 
   def blockStatusFromJson(json: JValue): BlockStatus = {
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index ea6267698c8..7a18223ec5b 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -136,9 +136,14 @@ class JsonProtocolSuite extends SparkFunSuite {
           321L, 654L, 765L, 256912L, 123456L, 123456L, 61728L,
           30364L, 15182L, 10L, 90L, 2L, 20L, 80001L)))
     val rprofBuilder = new ResourceProfileBuilder()
-    val taskReq = new TaskResourceRequests().cpus(1).resource("gpu", 1)
-    val execReq =
-      new ExecutorResourceRequests().cores(2).resource("gpu", 2, "myscript")
+    val taskReq = new TaskResourceRequests()
+      .cpus(1)
+      .resource("gpu", 1)
+      .resource("fgpa", 0.5)
+    val execReq: ExecutorResourceRequests = new ExecutorResourceRequests()
+      .cores(2)
+      .resource("gpu", 2, "myscript")
+      .resource("myCustomResource", amount = Int.MaxValue + 1L, 
discoveryScript = "myscript2")
     rprofBuilder.require(taskReq).require(execReq)
     val resourceProfile = rprofBuilder.build
     resourceProfile.setResourceProfileId(21)
@@ -203,6 +208,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     testStorageLevel(StorageLevel.MEMORY_AND_DISK_2)
     testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER)
     testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)
+    testStorageLevel(StorageLevel.OFF_HEAP)
 
     // JobResult
     val exception = new Exception("Out of Memory! Please restock film.")
@@ -319,6 +325,21 @@ class JsonProtocolSuite extends SparkFunSuite {
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
   }
 
+  test("StorageLevel backward compatibility") {
+    // "Use Off Heap" was added in Spark 3.4.0
+    val level = StorageLevel(
+      useDisk = false,
+      useMemory = true,
+      useOffHeap = true,
+      deserialized = false,
+      replication = 1
+    )
+    val newJson = JsonProtocol.storageLevelToJson(level)
+    val oldJson = newJson.removeField { case (field, _) => field == "Use Off 
Heap" }
+    val newLevel = JsonProtocol.storageLevelFromJson(oldJson)
+    assert(newLevel.useOffHeap === false)
+  }
+
   test("BlockManager events backward compatibility") {
     // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a 
"time" property.
     val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
@@ -1189,6 +1210,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |        "Storage Level": {
       |          "Use Disk": true,
       |          "Use Memory": true,
+      |          "Use Off Heap": false,
       |          "Deserialized": true,
       |          "Replication": 1
       |        },
@@ -1437,6 +1459,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
+      |            "Use Off Heap": false,
       |            "Deserialized": false,
       |            "Replication": 2
       |          },
@@ -1563,6 +1586,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
+      |            "Use Off Heap": false,
       |            "Deserialized": false,
       |            "Replication": 2
       |          },
@@ -1689,6 +1713,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
+      |            "Use Off Heap": false,
       |            "Deserialized": false,
       |            "Replication": 2
       |          },
@@ -1722,6 +1747,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
+      |            "Use Off Heap": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
@@ -1769,6 +1795,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
+      |            "Use Off Heap": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
@@ -1787,6 +1814,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
+      |            "Use Off Heap": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
@@ -1834,6 +1862,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
+      |            "Use Off Heap": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
@@ -1852,6 +1881,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
+      |            "Use Off Heap": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
@@ -1870,6 +1900,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
+      |            "Use Off Heap": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
@@ -1917,6 +1948,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
+      |            "Use Off Heap": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
@@ -1935,6 +1967,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
+      |            "Use Off Heap": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
@@ -1953,6 +1986,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
+      |            "Use Off Heap": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
@@ -1971,6 +2005,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
+      |            "Use Off Heap": false,
       |            "Deserialized": true,
       |            "Replication": 1
       |          },
@@ -2291,6 +2326,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |                "Storage Level": {
       |                  "Use Disk": true,
       |                  "Use Memory": true,
+      |                  "Use Off Heap": false,
       |                  "Deserialized": false,
       |                  "Replication": 2
       |                },
@@ -2489,6 +2525,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    "Storage Level": {
       |      "Use Disk": false,
       |      "Use Memory": true,
+      |      "Use Off Heap": false,
       |      "Deserialized": true,
       |      "Replication": 1
       |    },
@@ -2578,6 +2615,12 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |      "Discovery Script":"",
       |      "Vendor":""
       |    },
+      |    "myCustomResource":{
+      |      "Resource Name":"myCustomResource",
+      |      "Amount": 2147483648,
+      |      "Discovery Script": "myscript2",
+      |      "Vendor" : ""
+      |    },
       |    "gpu":{
       |      "Resource Name":"gpu",
       |      "Amount":2,
@@ -2593,6 +2636,10 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    "gpu":{
       |      "Resource Name":"gpu",
       |      "Amount":1.0
+      |    },
+      |    "fgpa":{
+      |      "Resource Name":"fgpa",
+      |      "Amount":0.5
       |    }
       |  }
       |}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to