This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 48017cc  [SPARK-31923][CORE] Ignore internal accumulators that use 
unrecognized types rather than crashing (branch-2.4)
48017cc is described below

commit 48017cc36bdf7d84506daeed589e4cbebff269f8
Author: Shixiong Zhu <zsxw...@gmail.com>
AuthorDate: Mon Jun 8 16:52:34 2020 -0700

    [SPARK-31923][CORE] Ignore internal accumulators that use unrecognized 
types rather than crashing (branch-2.4)
    
    ### What changes were proposed in this pull request?
    
    Backport #28744 to branch-2.4.
    
    ### Why are the changes needed?
    
    Low risky fix for branch-2.4.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit tests.
    
    Closes #28758 from zsxwing/SPARK-31923-2.4.
    
    Authored-by: Shixiong Zhu <zsxw...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 .../scala/org/apache/spark/util/JsonProtocol.scala | 20 ++++++---
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 47 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 50c6461..0e613ce 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -326,12 +326,22 @@ private[spark] object JsonProtocol {
         case v: Long => JInt(v)
         // We only have 3 kind of internal accumulator types, so if it's not 
int or long, it must be
         // the blocks accumulator, whose type is `java.util.List[(BlockId, 
BlockStatus)]`
-        case v =>
-          JArray(v.asInstanceOf[java.util.List[(BlockId, 
BlockStatus)]].asScala.toList.map {
-            case (id, status) =>
-              ("Block ID" -> id.toString) ~
-              ("Status" -> blockStatusToJson(status))
+        case v: java.util.List[_] =>
+          JArray(v.asScala.toList.flatMap {
+            case (id: BlockId, status: BlockStatus) =>
+              Some(
+                ("Block ID" -> id.toString) ~
+                ("Status" -> blockStatusToJson(status))
+              )
+            case _ =>
+              // Ignore unsupported types. A user may put `METRICS_PREFIX` in 
the name. We should
+              // not crash.
+              None
           })
+        case _ =>
+          // Ignore unsupported types. A user may put `METRICS_PREFIX` in the 
name. We should not
+          // crash.
+          JNothing
       }
     } else {
       // For all external accumulators, just use strings
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 74b72d9..40fb2e3 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -436,6 +436,53 @@ class JsonProtocolSuite extends SparkFunSuite {
     testAccumValue(Some("anything"), 123, JString("123"))
   }
 
+  /** Create an AccumulableInfo and verify we can serialize and deserialize 
it. */
+  private def testAccumulableInfo(
+      name: String,
+      value: Option[Any],
+      expectedValue: Option[Any]): Unit = {
+    val isInternal = name.startsWith(InternalAccumulator.METRICS_PREFIX)
+    val accum = AccumulableInfo(
+      123L,
+      Some(name),
+      update = value,
+      value = value,
+      internal = isInternal,
+      countFailedValues = false)
+    val json = JsonProtocol.accumulableInfoToJson(accum)
+    val newAccum = JsonProtocol.accumulableInfoFromJson(json)
+    assert(newAccum == accum.copy(update = expectedValue, value = 
expectedValue))
+  }
+
+  test("SPARK-31923: unexpected value type of internal accumulator") {
+    // Because a user may use `METRICS_PREFIX` in an accumulator name, we 
should test unexpected
+    // types to make sure we don't crash.
+    import InternalAccumulator.METRICS_PREFIX
+    testAccumulableInfo(
+      METRICS_PREFIX + "fooString",
+      value = Some("foo"),
+      expectedValue = None)
+    testAccumulableInfo(
+      METRICS_PREFIX + "fooList",
+      value = Some(java.util.Arrays.asList("string")),
+      expectedValue = Some(java.util.Collections.emptyList())
+    )
+    val blocks = Seq(
+      (TestBlockId("block1"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)),
+      (TestBlockId("block2"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L)))
+    testAccumulableInfo(
+      METRICS_PREFIX + "fooList",
+      value = Some(java.util.Arrays.asList(
+        "string",
+        blocks(0),
+        blocks(1))),
+      expectedValue = Some(blocks.asJava)
+    )
+    testAccumulableInfo(
+      METRICS_PREFIX + "fooSet",
+      value = Some(Set("foo")),
+      expectedValue = None)
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to