Hello - I am facing some issues with the following snippet of code that reads from Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with Kafka 0.10.1 and Spark 2.0.1.
// get the data from kafka val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = KafkaUtils.createDirectStream[Array[Byte], (String, String)]( streamingContext, PreferConsistent, Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams) ) // label and vectorize the value val projected: DStream[(String, Vector)] = stream.map { record => val (label, value) = record.value val vector = Vectors.dense(value.split(",").map(_.toDouble)) (label, vector) }.transform(projectToLowerDimension) In the above snippet if I have the call to transform in the last line, I get the following exception .. Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not > safe for multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at scala.collection.TraversableOnce$class.to > (TraversableOnce.scala:310) > at scala.collection.AbstractIterator.to(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) > .... The transform method does a PCA and gives the top 2 principal components .. private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String, Vector)] = { rdd => if (rdd.isEmpty) rdd else { // reduce to 2 dimensions val pca = new PCA(2).fit(rdd.map(_._2)) // Project vectors to the linear space spanned by the top 2 principal // components, keeping the label rdd.map(p => (p._1, pca.transform(p._2))) } } However if I remove the transform call, I can process everything correctly. Any help will be most welcome .. regards. -- Debasish Ghosh