Rob Dawson created SPARK-26183: ---------------------------------- Summary: ConcurrentModificationException when using Spark collectionAccumulator Key: SPARK-26183 URL: https://issues.apache.org/jira/browse/SPARK-26183 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.1 Reporter: Rob Dawson
I'm trying to run a Spark-based application on an Azure HDInsight on-demand cluster, and am seeing lots of SparkExceptions (caused by ConcurrentModificationExceptions) being logged. The application runs without these errors when I start a local Spark instance. I've seen reports of [similar errors when using accumulators|https://issues.apache.org/jira/browse/SPARK-17463] and my code is indeed using a CollectionAccumulator, however I have placed synchronized blocks everywhere I use it, and it makes no difference. The accumulator code looks like this: {code:scala} class MySparkClass(sc : SparkContext) { val myAccumulator = sc.collectionAccumulator[MyRecord] override def add(record: MyRecord) = { synchronized { myAccumulator.add(record) } } override def endOfBatch() = { synchronized { myAccumulator.value.asScala.foreach((record: MyRecord) => { processIt(record) }) } } }{code} The exceptions don't cause the application to fail, however when the code tries to read values out of the accumulator it is empty and {{processIt}} is never called. {code} 18/11/26 11:04:37 WARN Executor: Issue communicating with driver in heartbeater org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785) at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814) at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814) at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988) at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.ConcurrentModificationException at java.util.ArrayList.writeObject(ArrayList.java:770) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:565) at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:231) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91) ... 13 more {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org