[ https://issues.apache.org/jira/browse/SPARK-37301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
mcdull_zhang updated SPARK-37301: --------------------------------- Description: In our production environment, you can use the following code to reproduce the problem: {code:scala} val acc = sc.collectionAccumulator[String]("test_acc") sc.parallelize(Array(0)).foreach(_ => { var i = 0 var stop = false val start = System.currentTimeMillis() while (!stop) { acc.add(i.toString) if (i % 10000 == 0) { acc.reset() if ((System.currentTimeMillis() - start) / 1000 > 120) { stop = true } } i = i + 1 } }) sc.stop() {code} This code can make the executor fail to send heartbeats, even more than the default 60 times, and then the executor exits. ```tex 21/11/11 21:00:23 WARN Executor: Issue communicating with driver in heartbeater org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103) at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1007) at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:212) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019) at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.ConcurrentModificationException at java.util.ArrayList.writeObject(ArrayList.java:766) at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:601) at org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:244) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:555) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:559) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:102) ... 12 more 21/11/11 21:00:23 ERROR Executor: Exit as unable to send heartbeats to driver more than 60 times ``` The reason is that when the heartbeat thread serializes the Collection Accumulator, the task thread may modify the Collection Accumulator was: In our production environment, you can use the following code to reproduce the problem: ```scala val acc = sc.collectionAccumulator[String]("test_acc") sc.parallelize(Array(0)).foreach(_ => { var i = 0 var stop = false val start = System.currentTimeMillis() while (!stop) { acc.add(i.toString) if (i % 10000 == 0) { acc.reset() if ((System.currentTimeMillis() - start) / 1000 > 120) { stop = true } } i = i + 1 } }) sc.stop() ``` This code can make the executor fail to send heartbeats, even more than the default 60 times, and then the executor exits. ```tex 21/11/11 21:00:23 WARN Executor: Issue communicating with driver in heartbeater org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103) at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1007) at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:212) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019) at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.ConcurrentModificationException at java.util.ArrayList.writeObject(ArrayList.java:766) at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:601) at org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:244) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:555) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:559) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:102) ... 12 more 21/11/11 21:00:23 ERROR Executor: Exit as unable to send heartbeats to driver more than 60 times ``` The reason is that when the heartbeat thread serializes the Collection Accumulator, the task thread may modify the Collection Accumulator > ConcurrentModificationException caused by CollectionAccumulator serialization > in the heartbeat thread > ----------------------------------------------------------------------------------------------------- > > Key: SPARK-37301 > URL: https://issues.apache.org/jira/browse/SPARK-37301 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.2.0 > Reporter: mcdull_zhang > Priority: Minor > > In our production environment, you can use the following code to reproduce > the problem: > {code:scala} > val acc = sc.collectionAccumulator[String]("test_acc") > > sc.parallelize(Array(0)).foreach(_ => { > var i = 0 > var stop = false > val start = System.currentTimeMillis() > while (!stop) { > acc.add(i.toString) > if (i % 10000 == 0) { > acc.reset() > if ((System.currentTimeMillis() - start) / 1000 > 120) { > stop = true > } > } > i = i + 1 > } > }) > sc.stop() > {code} > This code can make the executor fail to send heartbeats, even more than the > default 60 times, and then the executor exits. > ```tex > 21/11/11 21:00:23 WARN Executor: Issue communicating with driver in > heartbeater > org.apache.spark.SparkException: Exception thrown in awaitResult: > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103) > at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1007) > at > org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:212) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019) > at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.ConcurrentModificationException > at java.util.ArrayList.writeObject(ArrayList.java:766) > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) > at > org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:601) > at > org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:244) > at > org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:555) > at > org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:559) > at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:102) > ... 12 more > 21/11/11 21:00:23 ERROR Executor: Exit as unable to send heartbeats to driver > more than 60 times > ``` > The reason is that when the heartbeat thread serializes the Collection > Accumulator, the task thread may modify the Collection Accumulator -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org