@Saisai Shao, Thanks for the pointer. It turned out to be the serialization
issue. I was using scalabuff to generate my "KafkaGenericEvent" class. But
when i went through the generated class code, i figured out that it is not
serializable.
Now i am generating my classes using scalapb (
https://github.com/trueaccord/ScalaPB) and my problem is solved.

Thanks

On Thu, Sep 17, 2015 at 10:43 PM, Saisai Shao <sai.sai.s...@gmail.com>
wrote:

> Is your "KafkaGenericEvent" serializable? Since you call rdd.collect() to
> fetch the data to local driver, so this KafkaGenericEvent need to be
> serialized and deserialized through Java or Kryo (depends on your
> configuration) serializer, not sure if it is your problem to always get a
> default object.
>
> Also would you provide the implementation of `parseFrom`, so we could
> better understand the details of how you do deserialization.
>
> Thanks
> Saisai
>
> On Thu, Sep 17, 2015 at 9:49 AM, srungarapu vamsi <
> srungarapu1...@gmail.com> wrote:
>
>> If i understand correctly, i guess you are suggesting me to do this  :
>>
>> val kafkaDStream  = 
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>>  kafkaConf, Set(topics))
>>
>>     kafkaDStream.map{
>>       case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
>>     } foreachRDD(rdd=>rdd.collect().map{
>>       case(devId,genericEvent)=>{
>>         println(genericEvent)
>>       }
>>     })
>>
>> I read from Kafka as a Byte Array => applied a transformation on the
>> byteArray to Custom Class => Printed the custom class for debugging purpose.
>>
>> But this is not helping me. i.e i am getting an empty object with default
>> values when i printed "genericEvent"
>>
>> Please correct me if i did not get what you are suggesting me to try.
>>
>>
>> On Thu, Sep 17, 2015 at 9:30 PM, Adrian Tanase <atan...@adobe.com> wrote:
>>
>>> I guess what I'm asking is why not start with a Byte array like in the
>>> example that works (using the DefaultDecoder) then map over it and do the
>>> decoding manually like I'm suggesting below.
>>>
>>> Have you tried this approach? We have the same workflow (kafka =>
>>> protobuf => custom class) and it works.
>>> If you expect invalid messages, you can use flatMap instead and wrap
>>> .parseFrom in a Try {....} .toOption.
>>>
>>> Sent from my iPhone
>>>
>>> On 17 Sep 2015, at 18:23, srungarapu vamsi <srungarapu1...@gmail.com>
>>> wrote:
>>>
>>> @Adrian,
>>> I am doing collect for debugging purpose. But i have to use foreachRDD
>>> so that i can operate on top of this rdd and eventually save to DB.
>>>
>>> But my actual problem here is to properly convert Array[Byte] to my
>>> custom object.
>>>
>>> On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase <atan...@adobe.com>
>>> wrote:
>>>
>>>> Why are you calling foreachRdd / collect in the first place?
>>>>
>>>> Instead of using a custom decoder, you should simply do – this is code
>>>> executed on the workers and allows the computation to continue. ForeachRdd
>>>> and collect are output operations and force the data to be collected on the
>>>> driver (assuming you don’t want that…)
>>>>
>>>> val events = kafkaDStream.map { case(devId,byteArray)=> 
>>>> KafkaGenericEvent.parseFrom(byteArray) }
>>>>
>>>> From: srungarapu vamsi
>>>> Date: Thursday, September 17, 2015 at 4:03 PM
>>>> To: user
>>>> Subject: Spark Streaming kafka directStream value decoder issue
>>>>
>>>> I am using KafkaUtils.createDirectStream to read the data from kafka
>>>> bus.
>>>>
>>>> On the producer end, i am generating in the following way:
>>>>
>>>>     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>>>>     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>>>>       "org.apache.kafka.common.serialization.StringSerializer")
>>>>     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>>>>       "org.apache.kafka.common.serialization.StringSerializer")
>>>>     val producer = new KafkaProducer[String, KafkaGenericEvent](props)
>>>>
>>>>     // Send some messages
>>>>         println("Sending message")
>>>>         val kafkaGenericEvent = new 
>>>> KafkaGenericEvent("event-id",EventType.site,"6",1440500400000L)
>>>>         val message = new ProducerRecord[String, 
>>>> KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
>>>>         producer.send(message)
>>>>       }
>>>>
>>>> I am connecting to kafka using the console consumer script and am able
>>>> to see proper data. The KafkaGenericEvent used in the above code is  the
>>>> class generated using ScalaBuff from a protobuff file.
>>>>
>>>> On the consumer end,
>>>> If i read the value as a normal byte array and the convert it into
>>>> KafkaGenericEvent in the following way, i get proper data:
>>>>
>>>>  val kafkaDStream  = 
>>>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>>>>  kafkaConf, Set(topics))
>>>>
>>>>     kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
>>>>       case(devId,byteArray)=>{
>>>>         println(KafkaGenericEvent.parseFrom(byteArray))
>>>>       }
>>>>     })
>>>>
>>>> But if change the value to KafkaGenericEvent and use a custom decoder
>>>> like this:
>>>>
>>>> class KafkaGenericEventsDecoder(props: VerifiableProperties = null) 
>>>> extends Decoder[KafkaGenericEvent]{
>>>>  override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
>>>>    KafkaGenericEvent.parseFrom(bytes)
>>>>  }
>>>> }
>>>>
>>>> and in consumer:
>>>>
>>>>     val kafkaDStream  = 
>>>> KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
>>>>  kafkaConf, Set(topics))
>>>>     kafkaDStream foreachRDD(rdd=>rdd.collect().map{
>>>>       case(devId,genericEvent)=>{
>>>>         println(genericEvent)
>>>>       }
>>>>     })
>>>>
>>>> Now, i my value object KafkaGenericEvent   is not created based on the
>>>> sent data instead it is creating an empty Object of KafkaGenericEvent with
>>>> default values.
>>>>
>>>> Even if i read the value as array of bytes in the createDirectStream
>>>> and than apply a transformation in the following way i am getting in
>>>> correct values:
>>>>
>>>> val kafkaDStream  = 
>>>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>>>>  kafkaConf, Set(topics))
>>>>
>>>>     kafkaDStream.map{
>>>>       case(devId,byteArray) 
>>>> =>(devId,KafkaGenericEvent.parseFrom(byteArray))
>>>>     } foreachRDD(rdd=>rdd.collect().map{
>>>>       case(devId,genericEvent)=>{
>>>>         println(genericEvent)
>>>>       }
>>>>     })
>>>>
>>>> I get the default KafkaGenericEvent Object in the line println
>>>> (genericEvent)
>>>> Does this mean that I can transform the values only on the driver and
>>>> not on the executors?
>>>>
>>>> I am completely confused here!
>>>> I am using :
>>>>  scala-2.10.4
>>>>  spark-1.3.1
>>>>  kafka_2.10-0.8.2.1
>>>>
>>>> -
>>>> /Vamsi
>>>>
>>>
>>>
>>>
>>> --
>>> /Vamsi
>>>
>>>
>>
>>
>> --
>> /Vamsi
>>
>
>


-- 
/Vamsi

Reply via email to