[ 
https://issues.apache.org/jira/browse/BEAM-979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666975#comment-15666975
 ] 

Stas Levin commented on BEAM-979:
---------------------------------

I think it may have to do with {StateSpecFunctions#mapSourceFunction}}, where 
we have this:

{code:java}

// read microbatch.
final List<WindowedValue<T>> readValues = new ArrayList<>();

// ...

while (!finished) {
            readValues.add(WindowedValue.of(reader.getCurrent(), 
reader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
            finished = !reader.advance();
          }

// ...

return Iterators.unmodifiableIterator(readValues.iterator());

{code}

I think there may be a scenario where {{readValues}} may be changed and read 
concurrently, which causes a {{ConcurrentModificationException}} to be thrown 
upon invoking the {{iterator#next}} we see in the stacktrace. 

One of the workarounds I'm trying is to change {{final List<WindowedValue<T>> 
readValues = new ArrayList<>();}} to {{final Collection<WindowedValue<T>> 
readValues = new ConcurrentLinkedQueue<>();}}. Ever since making this change 
the exception has not reoccured, but given its nature it's too soon to tell for 
sure.

Moreover, this change may prevent from a {{ConcurrentModificationException}} 
being thrown, but we still need to consider if it being thrown is merely a 
symptom of some scenario being improperly handled rather than a problem per-se.

> ConcurrentModificationException exception after hours of running
> ----------------------------------------------------------------
>
>                 Key: BEAM-979
>                 URL: https://issues.apache.org/jira/browse/BEAM-979
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Stas Levin
>            Assignee: Stas Levin
>
> {code}
>       
> User class threw exception: org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 1 in stage 4483.0 failed 4 times, most recent failure: 
> Lost task 1.3 in stage 4483.0 (TID 44548, xxxx.com): 
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
> at java.util.ArrayList$Itr.next(ArrayList.java:851)
> at 
> com.xxx.relocated.com.google.common.collect.Iterators$3.next(Iterators.java:177)
> at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
> at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to