[ https://issues.apache.org/jira/browse/SPARK-37301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442609#comment-17442609 ]
Apache Spark commented on SPARK-37301: -------------------------------------- User 'mcdull-zhang' has created a pull request for this issue: https://github.com/apache/spark/pull/34569 > ConcurrentModificationException caused by CollectionAccumulator serialization > in the heartbeat thread > ----------------------------------------------------------------------------------------------------- > > Key: SPARK-37301 > URL: https://issues.apache.org/jira/browse/SPARK-37301 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.2.0 > Reporter: mcdull_zhang > Priority: Minor > > In our production environment, you can use the following code to reproduce > the problem: > {code:scala} > val acc = sc.collectionAccumulator[String]("test_acc") > > sc.parallelize(Array(0)).foreach(_ => { > var i = 0 > var stop = false > val start = System.currentTimeMillis() > while (!stop) { > acc.add(i.toString) > if (i % 10000 == 0) { > acc.reset() > if ((System.currentTimeMillis() - start) / 1000 > 120) { > stop = true > } > } > i = i + 1 > } > }) > sc.stop() > {code} > This code can make the executor fail to send heartbeats, even more than the > default 60 times, and then the executor exits. > {noformat} > 21/11/11 21:00:23 WARN Executor: Issue communicating with driver in > heartbeater > org.apache.spark.SparkException: Exception thrown in awaitResult: > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103) > at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1007) > at > org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:212) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019) > at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.ConcurrentModificationException > at java.util.ArrayList.writeObject(ArrayList.java:766) > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) > at > org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:601) > at > org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:244) > at > org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:555) > at > org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:559) > at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:102) > ... 12 more > 21/11/11 21:00:23 ERROR Executor: Exit as unable to send heartbeats to driver > more than 60 times > {noformat} > The reason is that when the heartbeat thread serializes the Collection > Accumulator, the task thread may modify the Collection Accumulator -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org