[ 
https://issues.apache.org/jira/browse/SPARK-34731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-34731:
------------------------------------

    Assignee: Apache Spark

> ConcurrentModificationException in EventLoggingListener when redacting 
> properties
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-34731
>                 URL: https://issues.apache.org/jira/browse/SPARK-34731
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.2.0
>            Reporter: Bruce Robbins
>            Assignee: Apache Spark
>            Priority: Major
>
> Reproduction:
> The key elements of reproduction are enabling event logging, setting 
> spark.executor.cores, and some bad luck:
> {noformat}
> $ bin/spark-shell --conf spark.ui.showConsoleProgress=false \
> --conf spark.executor.cores=1 --driver-memory 4g --conf \
> "spark.ui.showConsoleProgress=false" \
> --conf spark.eventLog.enabled=true \
> --conf spark.eventLog.dir=/tmp/spark-events
> ...
> scala> (0 to 500).foreach { i =>
>      |   val df = spark.range(0, 20000).toDF("a")
>      |   df.filter("a > 12").count
>      | }
> 21/03/12 18:16:44 ERROR AsyncEventQueue: Listener EventLoggingListener threw 
> an exception
> java.util.ConcurrentModificationException
>       at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
>       at 
> scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
>       at 
> scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
>       at scala.collection.Iterator.foreach(Iterator.scala:941)
>       at scala.collection.Iterator.foreach$(Iterator.scala:941)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>       at scala.collection.mutable.MapLike.toSeq(MapLike.scala:75)
>       at scala.collection.mutable.MapLike.toSeq$(MapLike.scala:72)
>       at scala.collection.mutable.AbstractMap.toSeq(Map.scala:82)
>       at 
> org.apache.spark.scheduler.EventLoggingListener.redactProperties(EventLoggingListener.scala:290)
>       at 
> org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:162)
>       at 
> org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
>       at 
> org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
>       at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
>       at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
>       at 
> scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
>       at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1379)
>       at 
> org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
> {noformat}
> Analysis from quick reading of the code:
> DAGScheduler posts a JobSubmitted event containing a clone of a properties 
> object 
> [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L834].
> This event is handled 
> [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2394].
> DAGScheduler#handleJobSubmitted stores the properties object in a [Job 
> object|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1154],
>  which in turn is [saved in the jobIdToActiveJob 
> map|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1163].
> DAGScheduler#handleJobSubmitted posts a SparkListenerJobStart event 
> [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1169]
>  with a reference to the same properties object that was stored indirectly in 
> the jobIdToActiveJob map.
> When the EventLoggerListener handles the SparkListenerJobStart event, it 
> iterates over that properties object in redactProperties.
> Meanwhile, the DAGScheduler#handleJobSubmitted method is not yet done. It 
> calls submitStage, which calls submitMissingTasks, which [retrieves the same 
> properties 
> object|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1306]
>  from jobIdToActiveJob and calls addPySparkConfigsToProperties, which will 
> modify the properties if spark.executor.cores is set.
> If redactProperties just happens to still be iterating over the properties 
> object when the modification happens, HashTable throws a 
> ConcurrentModificationException.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to