[ 
https://issues.apache.org/jira/browse/SPARK-31923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-31923:
---------------------------------
    Description: 
A user may use internal accumulators by adding the "internal.metrics." prefix 
to the accumulator name to hide sensitive information from UI (Accumulators 
except internal ones will be shown in Spark UI). However, 
org.apache.spark.util.JsonProtocol.accumValueToJson assumes an internal 
accumulator has only 3 possible types: int, long, and java.util.List[(BlockId, 
BlockStatus)]. When an internal accumulator uses an unexpected type, it will 
crash. An event log that contains such accumulator will be dropped because it 
cannot be converted to JSON, and it will cause weird UI issue when rendering in 
Spark History Server. For example, if `SparkListenerTaskEnd` is dropped because 
of this issue, the user will see the task is still running even if it was 
finished.

It's better to make accumValueToJson more robust.

How to reproduce it:

- Enable Spark event log
- Run the following command:

{code}
scala> val accu = sc.doubleAccumulator("internal.metrics.foo")
accu: org.apache.spark.util.DoubleAccumulator = DoubleAccumulator(id: 0, name: 
Some(internal.metrics.foo), value: 0.0)

scala> sc.parallelize(1 to 1, 1).foreach { _ => accu.add(1.0) }
20/06/06 16:11:27 ERROR AsyncEventQueue: Listener EventLoggingListener threw an 
exception
java.lang.ClassCastException: java.lang.Double cannot be cast to java.util.List
        at 
org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:330)
        at 
org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$3.apply(JsonProtocol.scala:306)
        at 
org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$3.apply(JsonProtocol.scala:306)
        at scala.Option.map(Option.scala:146)
        at 
org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:306)
        at 
org.apache.spark.util.JsonProtocol$$anonfun$accumulablesToJson$2.apply(JsonProtocol.scala:299)
        at 
org.apache.spark.util.JsonProtocol$$anonfun$accumulablesToJson$2.apply(JsonProtocol.scala:299)
        at scala.collection.immutable.List.map(List.scala:284)
        at 
org.apache.spark.util.JsonProtocol$.accumulablesToJson(JsonProtocol.scala:299)
        at 
org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:291)
        at 
org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145)
        at 
org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76)
        at 
org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:138)
        at 
org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:158)
        at 
org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45)
        at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
        at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
        at 
org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
        at 
org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
{code}

 

  was:
A user may use internal accumulators by adding the "internal.metrics." prefix 
to the accumulator name to hide sensitive information from UI (Accumulators 
will be shown in Spark UI by default). However, 
org.apache.spark.util.JsonProtocol.accumValueToJson assumes an internal 
accumulator has only 3 possible types: int, long, and java.util.List[(BlockId, 
BlockStatus)]. When an internal accumulator uses an unexpected type, it will 
crash. An event log that contains such accumulator will be dropped because it 
cannot be converted to JSON, and it will cause weird UI issue when rendering in 
Spark History Server. For example, if `SparkListenerTaskEnd` is dropped because 
of this issue, the user will see the task is still running even if it was 
finished.

It's better to make accumValueToJson more robust.

How to reproduce it:

- Enable Spark event log
- Run the following command:

{code}
scala> val accu = sc.doubleAccumulator("internal.metrics.foo")
accu: org.apache.spark.util.DoubleAccumulator = DoubleAccumulator(id: 0, name: 
Some(internal.metrics.foo), value: 0.0)

scala> sc.parallelize(1 to 1, 1).foreach { _ => accu.add(1.0) }
20/06/06 16:11:27 ERROR AsyncEventQueue: Listener EventLoggingListener threw an 
exception
java.lang.ClassCastException: java.lang.Double cannot be cast to java.util.List
        at 
org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:330)
        at 
org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$3.apply(JsonProtocol.scala:306)
        at 
org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$3.apply(JsonProtocol.scala:306)
        at scala.Option.map(Option.scala:146)
        at 
org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:306)
        at 
org.apache.spark.util.JsonProtocol$$anonfun$accumulablesToJson$2.apply(JsonProtocol.scala:299)
        at 
org.apache.spark.util.JsonProtocol$$anonfun$accumulablesToJson$2.apply(JsonProtocol.scala:299)
        at scala.collection.immutable.List.map(List.scala:284)
        at 
org.apache.spark.util.JsonProtocol$.accumulablesToJson(JsonProtocol.scala:299)
        at 
org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:291)
        at 
org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145)
        at 
org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76)
        at 
org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:138)
        at 
org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:158)
        at 
org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45)
        at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
        at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
        at 
org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
        at 
org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
{code}

 


> Event log cannot be generated when some internal accumulators use unexpected 
> types
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-31923
>                 URL: https://issues.apache.org/jira/browse/SPARK-31923
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.6
>            Reporter: Shixiong Zhu
>            Priority: Major
>
> A user may use internal accumulators by adding the "internal.metrics." prefix 
> to the accumulator name to hide sensitive information from UI (Accumulators 
> except internal ones will be shown in Spark UI). However, 
> org.apache.spark.util.JsonProtocol.accumValueToJson assumes an internal 
> accumulator has only 3 possible types: int, long, and 
> java.util.List[(BlockId, BlockStatus)]. When an internal accumulator uses an 
> unexpected type, it will crash. An event log that contains such accumulator 
> will be dropped because it cannot be converted to JSON, and it will cause 
> weird UI issue when rendering in Spark History Server. For example, if 
> `SparkListenerTaskEnd` is dropped because of this issue, the user will see 
> the task is still running even if it was finished.
> It's better to make accumValueToJson more robust.
> How to reproduce it:
> - Enable Spark event log
> - Run the following command:
> {code}
> scala> val accu = sc.doubleAccumulator("internal.metrics.foo")
> accu: org.apache.spark.util.DoubleAccumulator = DoubleAccumulator(id: 0, 
> name: Some(internal.metrics.foo), value: 0.0)
> scala> sc.parallelize(1 to 1, 1).foreach { _ => accu.add(1.0) }
> 20/06/06 16:11:27 ERROR AsyncEventQueue: Listener EventLoggingListener threw 
> an exception
> java.lang.ClassCastException: java.lang.Double cannot be cast to 
> java.util.List
>       at 
> org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:330)
>       at 
> org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$3.apply(JsonProtocol.scala:306)
>       at 
> org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$3.apply(JsonProtocol.scala:306)
>       at scala.Option.map(Option.scala:146)
>       at 
> org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:306)
>       at 
> org.apache.spark.util.JsonProtocol$$anonfun$accumulablesToJson$2.apply(JsonProtocol.scala:299)
>       at 
> org.apache.spark.util.JsonProtocol$$anonfun$accumulablesToJson$2.apply(JsonProtocol.scala:299)
>       at scala.collection.immutable.List.map(List.scala:284)
>       at 
> org.apache.spark.util.JsonProtocol$.accumulablesToJson(JsonProtocol.scala:299)
>       at 
> org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:291)
>       at 
> org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145)
>       at 
> org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76)
>       at 
> org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:138)
>       at 
> org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:158)
>       at 
> org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
>       at 
> org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
>       at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to