Bruce Robbins created SPARK-34731:
-------------------------------------

             Summary: ConcurrentModificationException in EventLoggingListener 
when redacting properties
                 Key: SPARK-34731
                 URL: https://issues.apache.org/jira/browse/SPARK-34731
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.2.0
            Reporter: Bruce Robbins


Reproduction:

The key elements of reproduction are enabling event logging, setting 
spark.executor.cores, and some bad luck:
{noformat}
$ bin/spark-shell --conf spark.ui.showConsoleProgress=false \
--conf spark.executor.cores=1 --driver-memory 4g --conf \
"spark.ui.showConsoleProgress=false" \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=/tmp/spark-events
...
scala> (0 to 500).foreach { i =>
     |   val df = spark.range(0, 20000).toDF("a")
     |   df.filter("a > 12").count
     | }
21/03/12 18:16:44 ERROR AsyncEventQueue: Listener EventLoggingListener threw an 
exception
java.util.ConcurrentModificationException
        at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
        at 
scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
        at 
scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.mutable.MapLike.toSeq(MapLike.scala:75)
        at scala.collection.mutable.MapLike.toSeq$(MapLike.scala:72)
        at scala.collection.mutable.AbstractMap.toSeq(Map.scala:82)
        at 
org.apache.spark.scheduler.EventLoggingListener.redactProperties(EventLoggingListener.scala:290)
        at 
org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:162)
        at 
org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
        at 
org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
        at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
        at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
        at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
        at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
        at 
org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
        at 
org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
        at 
scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at 
org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1379)
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
{noformat}
Analysis from quick reading of the code:

DAGScheduler posts a JobSubmitted event containing a clone of a properties 
object 
[here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L834].

This event is handled 
[here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2394].

DAGScheduler#handleJobSubmitted stores the properties object in a [Job 
object|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1154],
 which in turn is [saved in the jobIdToActiveJob 
map|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1163].

DAGScheduler#handleJobSubmitted posts a SparkListenerJobStart event 
[here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1169]
 with a reference to the same properties object that was stored indirectly in 
the jobIdToActiveJob map.

When the EventLoggerListener handles the SparkListenerJobStart event, it 
iterates over that properties object in redactProperties.

Meanwhile, the DAGScheduler#handleJobSubmitted method is not yet done. It calls 
submitStage, which calls submitMissingTasks, which [retrieves the same 
properties 
object|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1306]
 from jobIdToActiveJob and calls addPySparkConfigsToProperties, which will 
modify the properties if spark.executor.cores is set.

If redactProperties just happens to still be iterating over the properties 
object when the modification happens, HashTable throws a 
ConcurrentModificationException.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to