Deepak created SPARK-23636:
------------------------------

             Summary: [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD 
throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is 
not safe for multi-threaded access
                 Key: SPARK-23636
                 URL: https://issues.apache.org/jira/browse/SPARK-23636
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.2.0, 2.1.1
            Reporter: Deepak


While using the KafkaUtils.createRDD API - we receive below listed error, 
especially when a 1 executor connects to 1 kafka topic-partition, but with more 
than 1 core & fetches an Array(OffsetRanges)

 
h2. Error Faced

 
{noformat}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 
1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: 
KafkaConsumer is not safe for multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:204)
at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat}
 
h2. Config Used to simulate the error

A session with : 
 * Executors - 1
 * Cores - 2 or More
 * Kafka Topic - has only 1 partition
 * While fetching - More than one Array of Offset Range , Example 

{noformat}
Array(OffsetRange("kafka_topic",0,608954201,608954202),
OffsetRange("kafka_topic",0,608954202,608954203)
){noformat}
 
h2. Why are we fetching from kafka as mentioned above.

 

This gives us the capability to use a connection each for every core available 
in the spark executor - to fetch and process its own set of messages (offset 
ranges) from kafka.

This was working in spark 1.6.2

However, from spark 2.1 onwards - the pattern throws exception.
h2. Sample Code

 
{quote}scala

// This forces two connections to same broker for the partition specified below.

val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching 
sample 2 records 
 OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records 
 );
 
val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap()


val rDDConsumerRec: RDD[ConsumerRecord[String, String]] =
 createRDD[String, String](hiveContext.sparkContext
 , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent);

val data: RDD[Row] = rDDConsumerRec.map { x =>
 Row(x.topic().toString,
 x.partition().toString,
 x.offset().toString,
 x.timestamp().toString,
 x.value()
 )
 };

val df = sqlContext.createDataFrame(data, StructType(
 Seq(
 StructField("topic", StringType),
 StructField("partition", StringType),
 StructField("offset", StringType),
 StructField("timestamp", StringType),
 StructField("value", BinaryType)
 )));

df.cache;

df.registerTempTable("kafka_topic");

hiveContext.sql("""
select *
from kafka_topic 
""").show
{quote}
 
h2. Related Issue

 

A similar issue reported for DirectStream is 

https://issues.apache.org/jira/browse/SPARK-19185



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to