Hi group, I have just started working with confluent platform and spark streaming, and was wondering if it is possible to access individual fields from an Avro object read from a kafka topic through spark streaming. As per its default behaviour *KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)* return a *DStream[Object, Object]*, and don't have any schema associated with *Object*(or I am unable to figure it out). This makes it impossible to perform some operations on this DStream, for example, converting it to a Spark DataFrame.
Since *KafkaAvroDecoder *doesn't allow us to have any other Class but *Object *I think I am going in the wrong direction. Any pointers/suggestions would be really helpful. *Versions used :* confluent-1.0.1 spark-1.6.0-bin-hadoop2.4 Scala code runner version - 2.11.6 And this is the small piece of code I am using : package org.myorg.scalaexamples import org.apache.spark.rdd.RDD import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.SparkContext import org.apache.avro.mapred.AvroKey import org.apache.spark.sql.SQLContext //import org.apache.avro.mapred.AvroValue import org.apache.spark.streaming.kafka._ import org.apache.spark.storage.StorageLevel import org.apache.avro.generic.GenericRecord import org.apache.spark.streaming.dstream.DStream import io.confluent.kafka.serializers.KafkaAvroDecoder //import org.apache.hadoop.io.serializer.avro.AvroRecord //import org.apache.spark.streaming.dstream.ForEachDStream import org.apache.spark.sql.SQLContext import org.apache.kafka.common.serialization.Deserializer object DirectKafkaWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println(s""" |Usage: DirectKafkaWordCount <brokers> <topics> | <brokers> is a list of one or more Kafka brokers | <topics> is a list of one or more kafka topics to consume from | """.stripMargin) System.exit(1) } val Array(brokers, topics) = args val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]") sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]])) val ssc = new StreamingContext(sparkConf, Seconds(5)) val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "group.id" -> "consumer", "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> " http://localhost:8081") val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet) messages.print() ssc.start() ssc.awaitTermination() } } Thank you so much for your valuable time! [image: http://] Tariq, Mohammad about.me/mti [image: http://] <http://about.me/mti>