I am using KafkaUtils.createDirectStream to read the data from kafka bus.

On the producer end, i am generating in the following way:

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, KafkaGenericEvent](props)

    // Send some messages
        println("Sending message")
        val kafkaGenericEvent = new
KafkaGenericEvent("event-id",EventType.site,"6",1440500400000L)
        val message = new ProducerRecord[String,
KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
        producer.send(message)
      }

I am connecting to kafka using the console consumer script and am able to
see proper data. The KafkaGenericEvent used in the above code is  the class
generated using ScalaBuff from a protobuff file.

On the consumer end,
If i read the value as a normal byte array and the convert it into
KafkaGenericEvent in the following way, i get proper data:

 val kafkaDStream  =
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
kafkaConf, Set(topics))

    kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
      case(devId,byteArray)=>{
        println(KafkaGenericEvent.parseFrom(byteArray))
      }
    })

But if change the value to KafkaGenericEvent and use a custom decoder like
this:

class KafkaGenericEventsDecoder(props: VerifiableProperties = null)
extends Decoder[KafkaGenericEvent]{
 override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
   KafkaGenericEvent.parseFrom(bytes)
 }
}

and in consumer:

    val kafkaDStream  =
KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
kafkaConf, Set(topics))
    kafkaDStream foreachRDD(rdd=>rdd.collect().map{
      case(devId,genericEvent)=>{
        println(genericEvent)
      }
    })

Now, i my value object KafkaGenericEvent   is not created based on the sent
data instead it is creating an empty Object of KafkaGenericEvent with
default values.

Even if i read the value as array of bytes in the createDirectStream and
than apply a transformation in the following way i am getting in correct
values:

val kafkaDStream  =
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
kafkaConf, Set(topics))

    kafkaDStream.map{
      case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
    } foreachRDD(rdd=>rdd.collect().map{
      case(devId,genericEvent)=>{
        println(genericEvent)
      }
    })

I get the default KafkaGenericEvent Object in the line println(genericEvent)
Does this mean that I can transform the values only on the driver and not
on the executors?

I am completely confused here!
I am using :
 scala-2.10.4
 spark-1.3.1
 kafka_2.10-0.8.2.1

-
/Vamsi

Reply via email to