[ https://issues.apache.org/jira/browse/SPARK-34731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-34731: ------------------------------------ Assignee: (was: Apache Spark) > ConcurrentModificationException in EventLoggingListener when redacting > properties > --------------------------------------------------------------------------------- > > Key: SPARK-34731 > URL: https://issues.apache.org/jira/browse/SPARK-34731 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.2.0 > Reporter: Bruce Robbins > Priority: Major > > Reproduction: > The key elements of reproduction are enabling event logging, setting > spark.executor.cores, and some bad luck: > {noformat} > $ bin/spark-shell --conf spark.ui.showConsoleProgress=false \ > --conf spark.executor.cores=1 --driver-memory 4g --conf \ > "spark.ui.showConsoleProgress=false" \ > --conf spark.eventLog.enabled=true \ > --conf spark.eventLog.dir=/tmp/spark-events > ... > scala> (0 to 500).foreach { i => > | val df = spark.range(0, 20000).toDF("a") > | df.filter("a > 12").count > | } > 21/03/12 18:16:44 ERROR AsyncEventQueue: Listener EventLoggingListener threw > an exception > java.util.ConcurrentModificationException > at java.util.Hashtable$Enumerator.next(Hashtable.java:1387) > at > scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424) > at > scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.mutable.MapLike.toSeq(MapLike.scala:75) > at scala.collection.mutable.MapLike.toSeq$(MapLike.scala:72) > at scala.collection.mutable.AbstractMap.toSeq(Map.scala:82) > at > org.apache.spark.scheduler.EventLoggingListener.redactProperties(EventLoggingListener.scala:290) > at > org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:162) > at > org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37) > at > org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) > at > org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) > at > org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) > at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) > at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) > at > org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) > at > org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) > at > scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) > at > org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) > at > org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) > at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1379) > at > org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96) > {noformat} > Analysis from quick reading of the code: > DAGScheduler posts a JobSubmitted event containing a clone of a properties > object > [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L834]. > This event is handled > [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2394]. > DAGScheduler#handleJobSubmitted stores the properties object in a [Job > object|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1154], > which in turn is [saved in the jobIdToActiveJob > map|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1163]. > DAGScheduler#handleJobSubmitted posts a SparkListenerJobStart event > [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1169] > with a reference to the same properties object that was stored indirectly in > the jobIdToActiveJob map. > When the EventLoggerListener handles the SparkListenerJobStart event, it > iterates over that properties object in redactProperties. > Meanwhile, the DAGScheduler#handleJobSubmitted method is not yet done. It > calls submitStage, which calls submitMissingTasks, which [retrieves the same > properties > object|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1306] > from jobIdToActiveJob and calls addPySparkConfigsToProperties, which will > modify the properties if spark.executor.cores is set. > If redactProperties just happens to still be iterating over the properties > object when the modification happens, HashTable throws a > ConcurrentModificationException. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org