[ https://issues.apache.org/jira/browse/SPARK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527204#comment-16527204 ]
Ted Yu commented on SPARK-23636: -------------------------------- It seems in KafkaDataConsumer#close : {code} def close(): Unit = consumer.close() {code} The code should catch ConcurrentModificationException and try closing the consumer again. > [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > --------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-23636 > URL: https://issues.apache.org/jira/browse/SPARK-23636 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.1.1, 2.2.0 > Reporter: Deepak > Priority: Major > Labels: performance > > h2. > h2. Summary > > While using the KafkaUtils.createRDD API - we receive below listed error, > specifically when 1 executor connects to 1 kafka topic-partition, but with > more than 1 core & fetches an Array(OffsetRanges) > > _I've tagged this issue to "Structured Streaming" - as I could not find a > more appropriate component_ > > ---- > h2. Error Faced > {noformat} > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access{noformat} > Stack Trace > {noformat} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in > stage 1.0 (TID 17, host, executor 16): > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:204) > at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} > > ---- > h2. Config Used to simulate the error > A session with : > * Executors - 1 > * Cores - 2 or More > * Kafka Topic - has only 1 partition > * While fetching - More than one Array of Offset Range , Example > {noformat} > Array(OffsetRange("kafka_topic",0,608954201,608954202), > OffsetRange("kafka_topic",0,608954202,608954203) > ){noformat} > > ---- > h2. Was this approach working before? > > This was working in spark 1.6.2 > However, from spark 2.1 onwards - the approach throws exception > > ---- > h2. Why are we fetching from kafka as mentioned above. > > This gives us the capability to establish a connection to Kafka Broker for > every spark executor's core, thus each core can fetch/process its own set of > messages based on the specified (offset ranges). > > > ---- > h2. Sample Code > > {quote}scala snippet - on versions spark 2.2.0 or 2.1.0 > // Bunch of imports > import kafka.serializer.\{DefaultDecoder, StringDecoder} > import org.apache.avro.generic.GenericRecord > import org.apache.kafka.clients.consumer.ConsumerRecord > import org.apache.kafka.common.serialization._ > import org.apache.spark.rdd.RDD > import org.apache.spark.sql.\{DataFrame, Row, SQLContext} > import org.apache.spark.sql.Row > import org.apache.spark.sql.hive.HiveContext > import org.apache.spark.sql.types.\{StringType, StructField, StructType} > import org.apache.spark.streaming.kafka010._ > import org.apache.spark.streaming.kafka010.KafkaUtils._ > {quote} > {quote}// This forces two connections - from a single executor - to > topic-partition <kafka_topic-0>. > // And with 2 cores assigned to 1 executor : each core has a task - pulling > respective offsets : OffsetRange("kafka_topic",0,1,2) & > OffsetRange("kafka_topic",0,2,3) > val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching > sample 2 records > OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records > ) > > // Initiate kafka properties > val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() > // kafkaParams1.put("key","val") add all the parameters such as broker, > topic.... Not listing every property here. > > // Create RDD > val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = > createRDD[String, String](sparkContext > , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent) > > // Map Function > val data: RDD[Row] = rDDConsumerRec.map \{ x => Row(x.topic().toString, > x.partition().toString, x.offset().toString, x.timestamp().toString, > x.value() ) } > > // Create a DataFrame > val df = sqlContext.createDataFrame(data, StructType( > Seq( > StructField("topic", StringType), > StructField("partition", StringType), > StructField("offset", StringType), > StructField("timestamp", StringType), > StructField("value", BinaryType) > ))) > > df.show() // You will see the error reported. > {quote} > > ---- > > h2. Similar Issue reported earlier, but on a different API > > A similar issue reported for DirectStream is > https://issues.apache.org/jira/browse/SPARK-19185 > > > > ---- > h2. What is the impact - if a fix is not available for this problem? > > > We have a lot of Spark Applications that are running in production, making > parallel connections to the 1 topic-partition from each spark-executor: so > parallelism is directly proportional to the num-cores in each executor. > With spark 2.1 onwards : we are not allowed to make concurrent connections > from 1 executor to 1 topic-partition. Only workaround is to start our > applications with executor-cores = 1, with dynamic resource allocation > enabled. > With above configuration - for every offset range we ask kafka - a new > executor is spawned to run the fetch task. > Downside of Workaround - > Above approach is not allowing us to leverage more than 1 spark-core per > spark-executor. > And asking for an executor - for each offset range - is costly : in terms of > scheduling and allocation. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org