[ https://issues.apache.org/jira/browse/SPARK-19185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405887#comment-16405887 ]
kaushik srinivas commented on SPARK-19185: ------------------------------------------ same issue found with kafka spark streaming 010. With increased batch window, this issue seems to be more frequently appearing. SPARK-23663 created for reference. Is disabling window the workaround as of now ? We see in spark 2.2.0 > ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing > ------------------------------------------------------------------------- > > Key: SPARK-19185 > URL: https://issues.apache.org/jira/browse/SPARK-19185 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.0.2 > Environment: Spark 2.0.2 > Spark Streaming Kafka 010 > Mesos 0.28.0 - client mode > spark.executor.cores 1 > spark.mesos.extra.cores 1 > Reporter: Kalvin Chau > Priority: Major > Labels: streaming, windowing > > We've been running into ConcurrentModificationExcpetions "KafkaConsumer is > not safe for multi-threaded access" with the CachedKafkaConsumer. I've been > working through debugging this issue and after looking through some of the > spark source code I think this is a bug. > Our set up is: > Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using > Spark-Streaming-Kafka-010 > spark.executor.cores 1 > spark.mesos.extra.cores 1 > Batch interval: 10s, window interval: 180s, and slide interval: 30s > We would see the exception when in one executor there are two task worker > threads assigned the same Topic+Partition, but a different set of offsets. > They would both get the same CachedKafkaConsumer, and whichever task thread > went first would seek and poll for all the records, and at the same time the > second thread would try to seek to its offset but fail because it is unable > to acquire the lock. > Time0 E0 Task0 - TopicPartition("abc", 0) X to Y > Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z > Time1 E0 Task0 - Seeks and starts to poll > Time1 E0 Task1 - Attempts to seek, but fails > Here are some relevant logs: > {code} > 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing > topic test-topic, partition 2 offsets 4394204414 -> 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing > topic test-topic, partition 2 offsets 4394238058 -> 4394257712 > 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: > Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested > 4394204414 > 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: > Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested > 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer: > Initial fetch for spark-executor-consumer test-topic 2 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: > Seeking to test-topic-2 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting > block rdd_199_2 failed due to an exception > 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block > rdd_199_2 could not be removed as it was not found on disk or in memory > 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception in > task 49.0 in stage 45.0 (TID 3201) > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: > Polled [test-topic-2] 8237 > 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: > Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested > 4394204415 > 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: > Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested > 4394204416 > ... > {code} > It looks like when WindowedDStream does the getOrCompute call its computing > all the sets of of offsets it needs and tries to farm out the work in > parallel. So each available worker task gets each set of offsets that need to > be read. > After realizing what was going on I tested four states: > * spark.executor.cores 1 and spark.mesos.extra.cores 0 > ** No Exceptions > * spark.executor.cores 1 and spark.mesos.extra.cores 1 > ** ConcurrentModificationException > * spark.executor.cores 2 and spark.mesos.extra.cores 0 > ** ConcurrentModificationException > * spark.executor.cores 2 and spark.mesos.extra.cores 1 > ** ConcurrentModificationException > Minimal set of code I was able to reproduce with: > Streaming batch interval was set to 2 seconds. This increased the rate of > exceptions I saw. > {code} > val kafkaParams = Map[String, Object]( > "bootstrap.servers" -> brokers, > "key.deserializer" -> classOf[KafkaAvroDeserializer], > "value.deserializer" -> classOf[KafkaAvroDeserializer], > "enable.auto.commit" -> (false: java.lang.Boolean), > "group.id" -> groupId, > "schema.registry.url" -> schemaRegistryUrl, > "auto.offset.reset" -> offset > ) > val inputStream = KafkaUtils.createDirectStream[Object, Object]( > ssc, > PreferConsistent, > Subscribe[Object, Object] > (kafkaTopic, kafkaParams) > ) > val windowStream = inputStream.map(_.toString).window(Seconds(180), > Seconds(30)) > windowStream.foreachRDD{ > rdd => { > val filtered = rdd.filter(_.contains("idb")) > filtered.foreach( > message => { > var i = 0 > if (i == 0) { > logger.info(message) > i = i + 1 > } > } > ) > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org