I am using KafkaUtils.createDirectStream to read the data from kafka bus. On the producer end, i am generating in the following way:
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, KafkaGenericEvent](props) // Send some messages println("Sending message") val kafkaGenericEvent = new KafkaGenericEvent("event-id",EventType.site,"6",1440500400000L) val message = new ProducerRecord[String, KafkaGenericEvent](topic,"myKey", kafkaGenericEvent) producer.send(message) } I am connecting to kafka using the console consumer script and am able to see proper data. The KafkaGenericEvent used in the above code is the class generated using ScalaBuff from a protobuff file. On the consumer end, If i read the value as a normal byte array and the convert it into KafkaGenericEvent in the following way, i get proper data: val kafkaDStream = KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, kafkaConf, Set(topics)) kafkaDStream.foreachRDD(rdd=>rdd.collect().map{ case(devId,byteArray)=>{ println(KafkaGenericEvent.parseFrom(byteArray)) } }) But if change the value to KafkaGenericEvent and use a custom decoder like this: class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends Decoder[KafkaGenericEvent]{ override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = { KafkaGenericEvent.parseFrom(bytes) } } and in consumer: val kafkaDStream = KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc, kafkaConf, Set(topics)) kafkaDStream foreachRDD(rdd=>rdd.collect().map{ case(devId,genericEvent)=>{ println(genericEvent) } }) Now, i my value object KafkaGenericEvent is not created based on the sent data instead it is creating an empty Object of KafkaGenericEvent with default values. Even if i read the value as array of bytes in the createDirectStream and than apply a transformation in the following way i am getting in correct values: val kafkaDStream = KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, kafkaConf, Set(topics)) kafkaDStream.map{ case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray)) } foreachRDD(rdd=>rdd.collect().map{ case(devId,genericEvent)=>{ println(genericEvent) } }) I get the default KafkaGenericEvent Object in the line println(genericEvent) Does this mean that I can transform the values only on the driver and not on the executors? I am completely confused here! I am using : scala-2.10.4 spark-1.3.1 kafka_2.10-0.8.2.1 - /Vamsi