[ 
https://issues.apache.org/jira/browse/SPARK-37301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-37301:
------------------------------------

    Assignee: Apache Spark

> ConcurrentModificationException caused by CollectionAccumulator serialization 
> in the heartbeat thread
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-37301
>                 URL: https://issues.apache.org/jira/browse/SPARK-37301
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.2.0
>            Reporter: mcdull_zhang
>            Assignee: Apache Spark
>            Priority: Minor
>
> In our production environment, you can use the following code to reproduce 
> the problem:
> {code:scala}
> val acc = sc.collectionAccumulator[String]("test_acc")
>     
> sc.parallelize(Array(0)).foreach(_ => {
>   var i = 0
>   var stop = false
>   val start = System.currentTimeMillis()
>   while (!stop) {
>     acc.add(i.toString)
>     if (i % 10000 == 0) {
>       acc.reset()
>       if ((System.currentTimeMillis() - start) / 1000 > 120) {
>         stop = true
>       }
>     }
>     i = i + 1
>   }
> })
> sc.stop()
> {code}
> This code can make the executor fail to send heartbeats, even more than the 
> default 60 times, and then the executor exits.
> {noformat}
> 21/11/11 21:00:23 WARN Executor: Issue communicating with driver in 
> heartbeater
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
>     at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
>     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
>     at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
>     at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1007)
>     at 
> org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:212)
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019)
>     at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException
>     at java.util.ArrayList.writeObject(ArrayList.java:766)
>     at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>     at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>     at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>     at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>     at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>     at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
>     at 
> org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:601)
>     at 
> org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:244)
>     at 
> org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:555)
>     at 
> org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:559)
>     at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:102)
>     ... 12 more
> 21/11/11 21:00:23 ERROR Executor: Exit as unable to send heartbeats to driver 
> more than 60 times
> {noformat}
> The reason is that when the heartbeat thread serializes the Collection 
> Accumulator, the task thread may modify the Collection Accumulator



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to