[ https://issues.apache.org/jira/browse/BEAM-10881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17369604#comment-17369604 ]
Beam JIRA Bot commented on BEAM-10881: -------------------------------------- This issue was marked "stale-P2" and has not received a public comment in 14 days. It is now automatically moved to P3. If you are still affected by it, you can comment and move it back to P2. > Spark-runner KafkaIO beam throws ConcurrentModification after running 2 hours > ----------------------------------------------------------------------------- > > Key: BEAM-10881 > URL: https://issues.apache.org/jira/browse/BEAM-10881 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark > Affects Versions: 2.23.0 > Reporter: yws > Priority: P3 > Labels: beam, kafkaio, spark-runner > > *mycode:* > Read<byte[], byte[]> kafkaRead = KafkaIO.<byte[], byte[]> read() > .withBootstrapServers(brokers) > .withConsumerConfigUpdates(properties) > .withProcessingTime().withTopic(topic) > .withKeyDeserializer(ByteArrayDeserializer.class) > .withValueDeserializer(ByteArrayDeserializer.class); > > mybeam sdk version is 2.23.0 > > *after running in spark-runner about 2hours it thows > ConcurrentModificationException: stacktrace is as follows* > > java.util.ConcurrentModificationException at > java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) at > java.util.ArrayList$Itr.next(ArrayList.java:851) at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.next(Iterators.java:418) > at > org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:150) > at > org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245) > at > org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232) > at > org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177) > at > org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107) > at > org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181) at > org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) at > org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1172) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1163) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1098) at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1163) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:889) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at > org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at > org.apache.spark.scheduler.Task.run(Task.scala:121) at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:442) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1386) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:448) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian Jira (v8.3.4#803005)