@Adrian, I am doing collect for debugging purpose. But i have to use foreachRDD so that i can operate on top of this rdd and eventually save to DB.
But my actual problem here is to properly convert Array[Byte] to my custom object. On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase <atan...@adobe.com> wrote: > Why are you calling foreachRdd / collect in the first place? > > Instead of using a custom decoder, you should simply do – this is code > executed on the workers and allows the computation to continue. ForeachRdd > and collect are output operations and force the data to be collected on the > driver (assuming you don’t want that…) > > val events = kafkaDStream.map { case(devId,byteArray)=> > KafkaGenericEvent.parseFrom(byteArray) } > > From: srungarapu vamsi > Date: Thursday, September 17, 2015 at 4:03 PM > To: user > Subject: Spark Streaming kafka directStream value decoder issue > > I am using KafkaUtils.createDirectStream to read the data from kafka bus. > > On the producer end, i am generating in the following way: > > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > val producer = new KafkaProducer[String, KafkaGenericEvent](props) > > // Send some messages > println("Sending message") > val kafkaGenericEvent = new > KafkaGenericEvent("event-id",EventType.site,"6",1440500400000L) > val message = new ProducerRecord[String, > KafkaGenericEvent](topic,"myKey", kafkaGenericEvent) > producer.send(message) > } > > I am connecting to kafka using the console consumer script and am able to > see proper data. The KafkaGenericEvent used in the above code is the class > generated using ScalaBuff from a protobuff file. > > On the consumer end, > If i read the value as a normal byte array and the convert it into > KafkaGenericEvent in the following way, i get proper data: > > val kafkaDStream = > KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, > kafkaConf, Set(topics)) > > kafkaDStream.foreachRDD(rdd=>rdd.collect().map{ > case(devId,byteArray)=>{ > println(KafkaGenericEvent.parseFrom(byteArray)) > } > }) > > But if change the value to KafkaGenericEvent and use a custom decoder like > this: > > class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends > Decoder[KafkaGenericEvent]{ > override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = { > KafkaGenericEvent.parseFrom(bytes) > } > } > > and in consumer: > > val kafkaDStream = > KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc, > kafkaConf, Set(topics)) > kafkaDStream foreachRDD(rdd=>rdd.collect().map{ > case(devId,genericEvent)=>{ > println(genericEvent) > } > }) > > Now, i my value object KafkaGenericEvent is not created based on the > sent data instead it is creating an empty Object of KafkaGenericEvent with > default values. > > Even if i read the value as array of bytes in the createDirectStream and > than apply a transformation in the following way i am getting in correct > values: > > val kafkaDStream = > KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, > kafkaConf, Set(topics)) > > kafkaDStream.map{ > case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray)) > } foreachRDD(rdd=>rdd.collect().map{ > case(devId,genericEvent)=>{ > println(genericEvent) > } > }) > > I get the default KafkaGenericEvent Object in the line println > (genericEvent) > Does this mean that I can transform the values only on the driver and not > on the executors? > > I am completely confused here! > I am using : > scala-2.10.4 > spark-1.3.1 > kafka_2.10-0.8.2.1 > > - > /Vamsi > -- /Vamsi