@Adrian,
I am doing collect for debugging purpose. But i have to use foreachRDD so
that i can operate on top of this rdd and eventually save to DB.

But my actual problem here is to properly convert Array[Byte] to my custom
object.

On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase <atan...@adobe.com> wrote:

> Why are you calling foreachRdd / collect in the first place?
>
> Instead of using a custom decoder, you should simply do – this is code
> executed on the workers and allows the computation to continue. ForeachRdd
> and collect are output operations and force the data to be collected on the
> driver (assuming you don’t want that…)
>
> val events = kafkaDStream.map { case(devId,byteArray)=> 
> KafkaGenericEvent.parseFrom(byteArray) }
>
> From: srungarapu vamsi
> Date: Thursday, September 17, 2015 at 4:03 PM
> To: user
> Subject: Spark Streaming kafka directStream value decoder issue
>
> I am using KafkaUtils.createDirectStream to read the data from kafka bus.
>
> On the producer end, i am generating in the following way:
>
>     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>       "org.apache.kafka.common.serialization.StringSerializer")
>     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>       "org.apache.kafka.common.serialization.StringSerializer")
>     val producer = new KafkaProducer[String, KafkaGenericEvent](props)
>
>     // Send some messages
>         println("Sending message")
>         val kafkaGenericEvent = new 
> KafkaGenericEvent("event-id",EventType.site,"6",1440500400000L)
>         val message = new ProducerRecord[String, 
> KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
>         producer.send(message)
>       }
>
> I am connecting to kafka using the console consumer script and am able to
> see proper data. The KafkaGenericEvent used in the above code is  the class
> generated using ScalaBuff from a protobuff file.
>
> On the consumer end,
> If i read the value as a normal byte array and the convert it into
> KafkaGenericEvent in the following way, i get proper data:
>
>  val kafkaDStream  = 
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>  kafkaConf, Set(topics))
>
>     kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
>       case(devId,byteArray)=>{
>         println(KafkaGenericEvent.parseFrom(byteArray))
>       }
>     })
>
> But if change the value to KafkaGenericEvent and use a custom decoder like
> this:
>
> class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends 
> Decoder[KafkaGenericEvent]{
>  override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
>    KafkaGenericEvent.parseFrom(bytes)
>  }
> }
>
> and in consumer:
>
>     val kafkaDStream  = 
> KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
>  kafkaConf, Set(topics))
>     kafkaDStream foreachRDD(rdd=>rdd.collect().map{
>       case(devId,genericEvent)=>{
>         println(genericEvent)
>       }
>     })
>
> Now, i my value object KafkaGenericEvent   is not created based on the
> sent data instead it is creating an empty Object of KafkaGenericEvent with
> default values.
>
> Even if i read the value as array of bytes in the createDirectStream and
> than apply a transformation in the following way i am getting in correct
> values:
>
> val kafkaDStream  = 
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>  kafkaConf, Set(topics))
>
>     kafkaDStream.map{
>       case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
>     } foreachRDD(rdd=>rdd.collect().map{
>       case(devId,genericEvent)=>{
>         println(genericEvent)
>       }
>     })
>
> I get the default KafkaGenericEvent Object in the line println
> (genericEvent)
> Does this mean that I can transform the values only on the driver and not
> on the executors?
>
> I am completely confused here!
> I am using :
>  scala-2.10.4
>  spark-1.3.1
>  kafka_2.10-0.8.2.1
>
> -
> /Vamsi
>



-- 
/Vamsi

Reply via email to