kaushik srinivas created SPARK-23663:
----------------------------------------

             Summary: Spark Streaming Kafka 010 , fails with 
"java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access"
                 Key: SPARK-23663
                 URL: https://issues.apache.org/jira/browse/SPARK-23663
             Project: Spark
          Issue Type: Bug
          Components: DStreams
    Affects Versions: 2.2.0
         Environment: Spark 2.2.0 

Spark streaming kafka 010

 
            Reporter: kaushik srinivas


test being tried:

10 kafka topics created. Streamed with avro data from kafka producers.

org.apache.spark.streaming.kafka010 used for creating directStream to kafka.

A single direct stream is created for all the ten topics.

And on each RDD(batch of 50 seconds), key of kafka consumer record is checked 
and seperate RDDs are created for seperate topics.

Each topic has records with key as topic name and value of avro messages.

Finally ten RDDs are converted to data frames and registered as separate temp 
tables.

Once all the temp tables are registered, few sql queries are run on these temp 
tables.

 

Exception seen:

18/03/12 11:58:34 WARN TaskSetManager: Lost task 23.0 in stage 4.0 (TID 269, 
192.168.24.145, executor 7): java.util.ConcurrentModificationException: 
KafkaConsumer is not safe for multi-threaded access
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
 at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
 at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:80)
 at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
 at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893)
 at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
 at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
 at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
 at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
 at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:108)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.1 in stage 4.0 (TID 
828, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes)
18/03/12 11:58:34 INFO TaskSetManager: Lost task 23.1 in stage 4.0 (TID 828) on 
192.168.24.145, executor 7: java.util.ConcurrentModificationException 
(KafkaConsumer is not safe for multi-threaded access) [duplicate 1]
18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.2 in stage 4.0 (TID 
829, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes)
18/03/12 11:58:40 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 30, 
192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer has 
already been closed.
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
 at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
 at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
 at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
 at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893)
 at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
 at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
 at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
 at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
 at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:108)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

18/03/12 11:58:40 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 830, 
192.168.24.147, executor 6, partition 0, PROCESS_LOCAL, 4758 bytes)
18/03/12 11:58:45 WARN TaskSetManager: Lost task 0.1 in stage 4.0 (TID 296, 
192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer has 
already been closed.
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
 at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
 at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
 at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
 at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893)
 at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
 at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
 at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
 at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
 at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:108)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 

Code snippet:

val stream = KafkaUtils.createDirectStream[Object, Object](ssc,
 PreferConsistent,
 Subscribe[Object, Object](topicsArr, kafkaParams)
 )
 val tbl = topicsArr(0).toString
 stream.foreachRDD(rdd => {
 var ca = new Array[String](0)
 var ss = new Array[String](0)
 if (!rdd.isEmpty())
 {
 val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
 import sqlContext.implicits._
 rdd.foreach(record =>
 {
 record.key() match {
 case "customer_address" => ca=Array.concat(ca,Array(record.value().toString))
 case "store_sales" => ss=Array.concat(ss,Array(record.value().toString))
 case _ => println("Invalid Key")
 };
 })
 //val topicValueStrings = rdd.map(record => (record.value()).toString)
 val df_ca = sqlContext.read.json(spark.sparkContext.parallelize(ca))
 val df_ss = sqlContext.read.json(spark.sparkContext.parallelize(ss))
 try{
 df_ca.registerTempTable("customer_address")
 df_ss.registerTempTable("store_sales")
 }
 catch{
 case e : Throwable => {
 println(e.getStackTrace())
 }
 }
 try{
 //spark.sql("show tables")
 println ("======New Batch=======")
 spark.sql(s"select count(1) as cnt,'customer_address' as tableName from 
customer_address").show()
 spark.sql(s"select count(1) as cnt,'store_sales' as tableName from 
store_sales").show()
 }
 catch{
 case e : Throwable => {
 println(e.getStackTrace())}
 }

 

Spark session is created with below confs:

val spark = SparkSession.builder()
 .appName(appname)
 .config("hive.metastore.uris", hivemeta)
 .enableHiveSupport()
 .config("hive.exec.dynamic.partition", "true")
 .config("hive.exec.dynamic.partition.mode", "nonstrict")
 .config("spark.driver.allowMultipleContexts", "true")
 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
 .config("spark.kryoserializer.buffer.mb", "64")
 .config("spark.sql.tungsten.enabled", "true")
 .config("spark.app.id", appname)
 .config("spark.speculation","false")
 .config("spark.sql.parquet.mergeSchema", "false")
 .getOrCreate()

Note: spark.streaming.kafka.consumer.cache.enabled is not made false.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to