kaushik srinivas created SPARK-23663: ----------------------------------------
Summary: Spark Streaming Kafka 010 , fails with "java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access" Key: SPARK-23663 URL: https://issues.apache.org/jira/browse/SPARK-23663 Project: Spark Issue Type: Bug Components: DStreams Affects Versions: 2.2.0 Environment: Spark 2.2.0 Spark streaming kafka 010 Reporter: kaushik srinivas test being tried: 10 kafka topics created. Streamed with avro data from kafka producers. org.apache.spark.streaming.kafka010 used for creating directStream to kafka. A single direct stream is created for all the ten topics. And on each RDD(batch of 50 seconds), key of kafka consumer record is checked and seperate RDDs are created for seperate topics. Each topic has records with key as topic name and value of avro messages. Finally ten RDDs are converted to data frames and registered as separate temp tables. Once all the temp tables are registered, few sql queries are run on these temp tables. Exception seen: 18/03/12 11:58:34 WARN TaskSetManager: Lost task 23.0 in stage 4.0 (TID 269, 192.168.24.145, executor 7): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:80) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.1 in stage 4.0 (TID 828, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes) 18/03/12 11:58:34 INFO TaskSetManager: Lost task 23.1 in stage 4.0 (TID 828) on 192.168.24.145, executor 7: java.util.ConcurrentModificationException (KafkaConsumer is not safe for multi-threaded access) [duplicate 1] 18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.2 in stage 4.0 (TID 829, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes) 18/03/12 11:58:40 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 30, 192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417) at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 18/03/12 11:58:40 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 830, 192.168.24.147, executor 6, partition 0, PROCESS_LOCAL, 4758 bytes) 18/03/12 11:58:45 WARN TaskSetManager: Lost task 0.1 in stage 4.0 (TID 296, 192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417) at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Code snippet: val stream = KafkaUtils.createDirectStream[Object, Object](ssc, PreferConsistent, Subscribe[Object, Object](topicsArr, kafkaParams) ) val tbl = topicsArr(0).toString stream.foreachRDD(rdd => { var ca = new Array[String](0) var ss = new Array[String](0) if (!rdd.isEmpty()) { val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext.implicits._ rdd.foreach(record => { record.key() match { case "customer_address" => ca=Array.concat(ca,Array(record.value().toString)) case "store_sales" => ss=Array.concat(ss,Array(record.value().toString)) case _ => println("Invalid Key") }; }) //val topicValueStrings = rdd.map(record => (record.value()).toString) val df_ca = sqlContext.read.json(spark.sparkContext.parallelize(ca)) val df_ss = sqlContext.read.json(spark.sparkContext.parallelize(ss)) try{ df_ca.registerTempTable("customer_address") df_ss.registerTempTable("store_sales") } catch{ case e : Throwable => { println(e.getStackTrace()) } } try{ //spark.sql("show tables") println ("======New Batch=======") spark.sql(s"select count(1) as cnt,'customer_address' as tableName from customer_address").show() spark.sql(s"select count(1) as cnt,'store_sales' as tableName from store_sales").show() } catch{ case e : Throwable => { println(e.getStackTrace())} } Spark session is created with below confs: val spark = SparkSession.builder() .appName(appname) .config("hive.metastore.uris", hivemeta) .enableHiveSupport() .config("hive.exec.dynamic.partition", "true") .config("hive.exec.dynamic.partition.mode", "nonstrict") .config("spark.driver.allowMultipleContexts", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.kryoserializer.buffer.mb", "64") .config("spark.sql.tungsten.enabled", "true") .config("spark.app.id", appname) .config("spark.speculation","false") .config("spark.sql.parquet.mergeSchema", "false") .getOrCreate() Note: spark.streaming.kafka.consumer.cache.enabled is not made false. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org