[ https://issues.apache.org/jira/browse/BEAM-979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666975#comment-15666975 ]
Stas Levin commented on BEAM-979: --------------------------------- I think it may have to do with {StateSpecFunctions#mapSourceFunction}}, where we have this: {code:java} // read microbatch. final List<WindowedValue<T>> readValues = new ArrayList<>(); // ... while (!finished) { readValues.add(WindowedValue.of(reader.getCurrent(), reader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); finished = !reader.advance(); } // ... return Iterators.unmodifiableIterator(readValues.iterator()); {code} I think there may be a scenario where {{readValues}} may be changed and read concurrently, which causes a {{ConcurrentModificationException}} to be thrown upon invoking the {{iterator#next}} we see in the stacktrace. One of the workarounds I'm trying is to change {{final List<WindowedValue<T>> readValues = new ArrayList<>();}} to {{final Collection<WindowedValue<T>> readValues = new ConcurrentLinkedQueue<>();}}. Ever since making this change the exception has not reoccured, but given its nature it's too soon to tell for sure. Moreover, this change may prevent from a {{ConcurrentModificationException}} being thrown, but we still need to consider if it being thrown is merely a symptom of some scenario being improperly handled rather than a problem per-se. > ConcurrentModificationException exception after hours of running > ---------------------------------------------------------------- > > Key: BEAM-979 > URL: https://issues.apache.org/jira/browse/BEAM-979 > Project: Beam > Issue Type: Bug > Components: runner-spark > Reporter: Stas Levin > Assignee: Stas Levin > > {code} > > User class threw exception: org.apache.spark.SparkException: Job aborted due > to stage failure: Task 1 in stage 4483.0 failed 4 times, most recent failure: > Lost task 1.3 in stage 4483.0 (TID 44548, xxxx.com): > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) > at java.util.ArrayList$Itr.next(ArrayList.java:851) > at > com.xxx.relocated.com.google.common.collect.Iterators$3.next(Iterators.java:177) > at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285) > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:275) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)