[ https://issues.apache.org/jira/browse/SPARK-31923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-31923: ------------------------------------ Assignee: Apache Spark (was: Shixiong Zhu) > Event log cannot be generated when some internal accumulators use unexpected > types > ---------------------------------------------------------------------------------- > > Key: SPARK-31923 > URL: https://issues.apache.org/jira/browse/SPARK-31923 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.6 > Reporter: Shixiong Zhu > Assignee: Apache Spark > Priority: Major > Fix For: 3.0.1, 3.1.0, 2.4.7 > > > A user may use internal accumulators by adding the "internal.metrics." prefix > to the accumulator name to hide sensitive information from UI (Accumulators > except internal ones will be shown in Spark UI). > However, *org.apache.spark.util.JsonProtocol.accumValueToJson* assumes an > internal accumulator has only 3 possible types: int, long, and > java.util.List[(BlockId, BlockStatus)]. When an internal accumulator uses an > unexpected type, it will crash. > An event log that contains such accumulator will be dropped because it cannot > be converted to JSON, and it will cause weird UI issue when rendering in > Spark History Server. For example, if `SparkListenerTaskEnd` is dropped > because of this issue, the user will see the task is still running even if it > was finished. > It's better to make *accumValueToJson* more robust. > ---- > How to reproduce it: > - Enable Spark event log > - Run the following command: > {code} > scala> val accu = sc.doubleAccumulator("internal.metrics.foo") > accu: org.apache.spark.util.DoubleAccumulator = DoubleAccumulator(id: 0, > name: Some(internal.metrics.foo), value: 0.0) > scala> sc.parallelize(1 to 1, 1).foreach { _ => accu.add(1.0) } > 20/06/06 16:11:27 ERROR AsyncEventQueue: Listener EventLoggingListener threw > an exception > java.lang.ClassCastException: java.lang.Double cannot be cast to > java.util.List > at > org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:330) > at > org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$3.apply(JsonProtocol.scala:306) > at > org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$3.apply(JsonProtocol.scala:306) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:306) > at > org.apache.spark.util.JsonProtocol$$anonfun$accumulablesToJson$2.apply(JsonProtocol.scala:299) > at > org.apache.spark.util.JsonProtocol$$anonfun$accumulablesToJson$2.apply(JsonProtocol.scala:299) > at scala.collection.immutable.List.map(List.scala:284) > at > org.apache.spark.util.JsonProtocol$.accumulablesToJson(JsonProtocol.scala:299) > at > org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:291) > at > org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145) > at > org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76) > at > org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:138) > at > org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:158) > at > org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45) > at > org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) > at > org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) > at > org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91) > at > org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92) > at > org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92) > at > org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87) > at > org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87) > at > org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83) > at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302) > at > org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82) > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org