Why are you calling foreachRdd / collect in the first place?

Instead of using a custom decoder, you should simply do – this is code executed 
on the workers and allows the computation to continue. ForeachRdd and collect 
are output operations and force the data to be collected on the driver 
(assuming you don’t want that…)

val events = kafkaDStream.map { case(devId,byteArray)=> 
KafkaGenericEvent.parseFrom(byteArray) }

From: srungarapu vamsi
Date: Thursday, September 17, 2015 at 4:03 PM
To: user
Subject: Spark Streaming kafka directStream value decoder issue

I am using KafkaUtils.createDirectStream to read the data from kafka bus.

On the producer end, i am generating in the following way:

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, KafkaGenericEvent](props)

    // Send some messages
        println("Sending message")
        val kafkaGenericEvent = new 
KafkaGenericEvent("event-id",EventType.site,"6",1440500400000L)
        val message = new ProducerRecord[String, 
KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
        producer.send(message)
      }

I am connecting to kafka using the console consumer script and am able to see 
proper data. The KafkaGenericEvent used in the above code is  the class 
generated using ScalaBuff from a protobuff file.

On the consumer end,
If i read the value as a normal byte array and the convert it into 
KafkaGenericEvent in the following way, i get proper data:

 val kafkaDStream  = 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
 kafkaConf, Set(topics))

    kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
      case(devId,byteArray)=>{
        println(KafkaGenericEvent.parseFrom(byteArray))
      }
    })

But if change the value to KafkaGenericEvent and use a custom decoder like this:

class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends 
Decoder[KafkaGenericEvent]{
 override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
   KafkaGenericEvent.parseFrom(bytes)
 }
}

and in consumer:

    val kafkaDStream  = 
KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
 kafkaConf, Set(topics))
    kafkaDStream foreachRDD(rdd=>rdd.collect().map{
      case(devId,genericEvent)=>{
        println(genericEvent)
      }
    })

Now, i my value object KafkaGenericEvent   is not created based on the sent 
data instead it is creating an empty Object of KafkaGenericEvent with default 
values.

Even if i read the value as array of bytes in the createDirectStream and than 
apply a transformation in the following way i am getting in correct values:

val kafkaDStream  = 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
 kafkaConf, Set(topics))

    kafkaDStream.map{
      case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
    } foreachRDD(rdd=>rdd.collect().map{
      case(devId,genericEvent)=>{
        println(genericEvent)
      }
    })

I get the default KafkaGenericEvent Object in the line println(genericEvent)
Does this mean that I can transform the values only on the driver and not on 
the executors?

I am completely confused here!
I am using :
 scala-2.10.4
 spark-1.3.1
 kafka_2.10-0.8.2.1

-
/Vamsi

Reply via email to