I'd say unzip your actual assembly jar and verify whether the kafka
consumer classes are 0.10.1 or 0.10.0.  We've seen reports of odd
behavior with 0.10.1 classes.  Possibly unrelated, but good to
eliminate.

On Fri, Dec 9, 2016 at 10:38 AM, Debasish Ghosh
<ghosh.debas...@gmail.com> wrote:
> oops .. it's 0.10.0 .. sorry for the confusion ..
>
> On Fri, Dec 9, 2016 at 10:07 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>>
>> My assembly contains the 0.10.1 classes .. Here are the dependencies
>> related to kafka & spark that my assembly has ..
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.kafka"      %   "kafka-streams"                  %
>> "0.10.0.0",
>>   "org.apache.spark"     %%   "spark-streaming-kafka-0-10"     % spark,
>>   "org.apache.spark"     %%   "spark-core"                     % spark %
>> "provided",
>>   "org.apache.spark"     %%   "spark-streaming"                % spark %
>> "provided",
>>   "org.apache.spark"     %%   "spark-mllib"                    % spark %
>> "provided",
>>   "org.apache.spark"     %%   "spark-sql"                      % spark %
>> "provided"
>> )
>>
>> regards.
>>
>> On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>>
>>> When you say 0.10.1 do you mean broker version only, or does your
>>> assembly contain classes from the 0.10.1 kafka consumer?
>>>
>>> On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com>
>>> wrote:
>>> > Hello -
>>> >
>>> > I am facing some issues with the following snippet of code that reads
>>> > from
>>> > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..)
>>> > with
>>> > Kafka 0.10.1 and Spark 2.0.1.
>>> >
>>> > // get the data from kafka
>>> > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
>>> >   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
>>> >     streamingContext,
>>> >     PreferConsistent,
>>> >     Subscribe[Array[Byte], (String, String)](topicToReadFrom,
>>> > kafkaParams)
>>> >   )
>>> >
>>> > // label and vectorize the value
>>> > val projected: DStream[(String, Vector)] = stream.map { record =>
>>> >   val (label, value) = record.value
>>> >   val vector = Vectors.dense(value.split(",").map(_.toDouble))
>>> >   (label, vector)
>>> > }.transform(projectToLowerDimension)
>>> >
>>> > In the above snippet if I have the call to transform in the last line,
>>> > I get
>>> > the following exception ..
>>> >
>>> > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is
>>> > not
>>> > safe for multi-threaded access
>>> >     at
>>> >
>>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>>> >     at
>>> >
>>> > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
>>> >     at
>>> >
>>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
>>> >     at
>>> >
>>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
>>> >     at
>>> >
>>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>>> >     at
>>> >
>>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>>> >     at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>>> >     at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
>>> >     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>> >     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>> >     at
>>> >
>>> > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>>> >     at
>>> >
>>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>>> >     at
>>> >
>>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>>> >     at
>>> > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>>> >     at scala.collection.AbstractIterator.to(Iterator.scala:1336)
>>> >     at
>>> >
>>> > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
>>> >     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
>>> >     ....
>>> >
>>> > The transform method does a PCA and gives the top 2 principal
>>> > components ..
>>> >
>>> > private def projectToLowerDimension: RDD[(String, Vector)] =>
>>> > RDD[(String,
>>> > Vector)] = { rdd =>
>>> >   if (rdd.isEmpty) rdd else {
>>> >     // reduce to 2 dimensions
>>> >     val pca = new PCA(2).fit(rdd.map(_._2))
>>> >
>>> >     // Project vectors to the linear space spanned by the top 2
>>> > principal
>>> >     // components, keeping the label
>>> >     rdd.map(p => (p._1, pca.transform(p._2)))
>>> >   }
>>> > }
>>> >
>>> > However if I remove the transform call, I can process everything
>>> > correctly.
>>> >
>>> > Any help will be most welcome ..
>>> >
>>> > regards.
>>> > - Debasish
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> > http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> > Nabble.com.
>>> >
>>> > ---------------------------------------------------------------------
>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >
>>
>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to