[ https://issues.apache.org/jira/browse/SPARK-23663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16595035#comment-16595035 ]
Peter Simon commented on SPARK-23663: ------------------------------------- [~gsomogyi] isn't this the same as SPARK-19185? > Spark Streaming Kafka 010 , fails with > "java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access" > --------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-23663 > URL: https://issues.apache.org/jira/browse/SPARK-23663 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.2.0 > Environment: Spark 2.2.0 > Spark streaming kafka 010 > > Reporter: kaushik srinivas > Priority: Major > > test being tried: > 10 kafka topics created. Streamed with avro data from kafka producers. > org.apache.spark.streaming.kafka010 used for creating directStream to kafka. > A single direct stream is created for all the ten topics. > And on each RDD(batch of 50 seconds), key of kafka consumer record is checked > and seperate RDDs are created for seperate topics. > Each topic has records with key as topic name and value of avro messages. > Finally ten RDDs are converted to data frames and registered as separate temp > tables. > Once all the temp tables are registered, few sql queries are run on these > temp tables. > > Exception seen: > 18/03/12 11:58:34 WARN TaskSetManager: Lost task 23.0 in stage 4.0 (TID 269, > 192.168.24.145, executor 7): java.util.ConcurrentModificationException: > KafkaConsumer is not safe for multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:80) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189) > at > org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) > at > org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.1 in stage 4.0 (TID > 828, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes) > 18/03/12 11:58:34 INFO TaskSetManager: Lost task 23.1 in stage 4.0 (TID 828) > on 192.168.24.145, executor 7: java.util.ConcurrentModificationException > (KafkaConsumer is not safe for multi-threaded access) [duplicate 1] > 18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.2 in stage 4.0 (TID > 829, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes) > 18/03/12 11:58:40 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 30, > 192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer > has already been closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417) > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189) > at > org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) > at > org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 18/03/12 11:58:40 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID > 830, 192.168.24.147, executor 6, partition 0, PROCESS_LOCAL, 4758 bytes) > 18/03/12 11:58:45 WARN TaskSetManager: Lost task 0.1 in stage 4.0 (TID 296, > 192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer > has already been closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417) > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189) > at > org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) > at > org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Code snippet: > val stream = KafkaUtils.createDirectStream[Object, Object](ssc, > PreferConsistent, > Subscribe[Object, Object](topicsArr, kafkaParams) > ) > val tbl = topicsArr(0).toString > stream.foreachRDD(rdd => { > var ca = new Array[String](0) > var ss = new Array[String](0) > if (!rdd.isEmpty()) > { > val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) > import sqlContext.implicits._ > rdd.foreach(record => > { > record.key() match { > case "customer_address" => ca=Array.concat(ca,Array(record.value().toString)) > case "store_sales" => ss=Array.concat(ss,Array(record.value().toString)) > case _ => println("Invalid Key") > }; > }) > //val topicValueStrings = rdd.map(record => (record.value()).toString) > val df_ca = sqlContext.read.json(spark.sparkContext.parallelize(ca)) > val df_ss = sqlContext.read.json(spark.sparkContext.parallelize(ss)) > try{ > df_ca.registerTempTable("customer_address") > df_ss.registerTempTable("store_sales") > } > catch{ > case e : Throwable => { > println(e.getStackTrace()) > } > } > try{ > //spark.sql("show tables") > println ("======New Batch=======") > spark.sql(s"select count(1) as cnt,'customer_address' as tableName from > customer_address").show() > spark.sql(s"select count(1) as cnt,'store_sales' as tableName from > store_sales").show() > } > catch{ > case e : Throwable => { > println(e.getStackTrace())} > } > > Spark session is created with below confs: > val spark = SparkSession.builder() > .appName(appname) > .config("hive.metastore.uris", hivemeta) > .enableHiveSupport() > .config("hive.exec.dynamic.partition", "true") > .config("hive.exec.dynamic.partition.mode", "nonstrict") > .config("spark.driver.allowMultipleContexts", "true") > .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") > .config("spark.kryoserializer.buffer.mb", "64") > .config("spark.sql.tungsten.enabled", "true") > .config("spark.app.id", appname) > .config("spark.speculation","false") > .config("spark.sql.parquet.mergeSchema", "false") > .getOrCreate() > Note: spark.streaming.kafka.consumer.cache.enabled is not made false. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org