mcdull_zhang created SPARK-37301: ------------------------------------ Summary: ConcurrentModificationException caused by CollectionAccumulator serialization in the heartbeat thread Key: SPARK-37301 URL: https://issues.apache.org/jira/browse/SPARK-37301 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.2.0 Reporter: mcdull_zhang
In our production environment, you can use the following code to reproduce the problem: ```scala val acc = sc.collectionAccumulator[String]("test_acc") sc.parallelize(Array(0)).foreach(_ => { var i = 0 var stop = false val start = System.currentTimeMillis() while (!stop) { acc.add(i.toString) if (i % 10000 == 0) { acc.reset() if ((System.currentTimeMillis() - start) / 1000 > 120) { stop = true } } i = i + 1 } }) sc.stop() ``` This code can make the executor fail to send heartbeats, even more than the default 60 times, and then the executor exits. ```tex 21/11/11 21:00:23 WARN Executor: Issue communicating with driver in heartbeater org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103) at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1007) at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:212) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019) at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.ConcurrentModificationException at java.util.ArrayList.writeObject(ArrayList.java:766) at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:601) at org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:244) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:555) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:559) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:102) ... 12 more 21/11/11 21:00:23 ERROR Executor: Exit as unable to send heartbeats to driver more than 60 times ``` The reason is that when the heartbeat thread serializes the Collection Accumulator, the task thread may modify the Collection Accumulator -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org