mcdull_zhang created SPARK-37301:
------------------------------------

             Summary: ConcurrentModificationException caused by 
CollectionAccumulator serialization in the heartbeat thread
                 Key: SPARK-37301
                 URL: https://issues.apache.org/jira/browse/SPARK-37301
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.2.0
            Reporter: mcdull_zhang


In our production environment, you can use the following code to reproduce the 
problem:

```scala
val acc = sc.collectionAccumulator[String]("test_acc")
    
sc.parallelize(Array(0)).foreach(_ => {
  var i = 0
  var stop = false
  val start = System.currentTimeMillis()
  while (!stop) {
    acc.add(i.toString)
    if (i % 10000 == 0) {
      acc.reset()
      if ((System.currentTimeMillis() - start) / 1000 > 120) {
        stop = true
      }
    }
    i = i + 1
  }
})
sc.stop()
```

This code can make the executor fail to send heartbeats, even more than the 
default 60 times, and then the executor exits.

```tex
21/11/11 21:00:23 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
    at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1007)
    at 
org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:212)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019)
    at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException
    at java.util.ArrayList.writeObject(ArrayList.java:766)
    at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at 
org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:601)
    at 
org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:244)
    at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:555)
    at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:559)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:102)
    ... 12 more
21/11/11 21:00:23 ERROR Executor: Exit as unable to send heartbeats to driver 
more than 60 times
```

The reason is that when the heartbeat thread serializes the Collection 
Accumulator, the task thread may modify the Collection Accumulator



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to