You should be able to cast the object type to the real underlying type
(GenericRecord (if generic, which is so by default), or the actual type
class (if specific)). The underlying implementation of KafkaAvroDecoder
seems to use either one of those depending on a config switch:

Once you have the right underlying class, extracting fields should be
simpler/direct, and would not need a mid-transformation to JSON.

On Fri, 26 Feb 2016

> I got it working by using jsonRDD. This is what I had to do in order to
> make it work :
>       val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
>       val lines =
>       lines.foreachRDD(jsonRDD => {
>         val sqlContext =
> SQLContextSingleton.getInstance(jsonRDD.sparkContext)
>         val data =
>         data.printSchema()
>         data.groupBy("COL_NAME").count().show()
>       })
> Not sure though if it's the best way to achieve this.
On Fri, Feb 26, 2016
>> wrote:
>> You can use `` to transform objects to anything you want.
On Thu, Feb 25, 2016
>> wrote:
>>> Hi group,
>>> I have just started working with confluent platform and spark streaming,
>>> and was wondering if it is possible to access individual fields from an
>>> Avro object read from a kafka topic through spark streaming. As per its
>>> default behaviour *KafkaUtils.createDirectStream[Object, Object,
>>> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)* return
>>> a *DStream[Object, Object]*, and don't have any schema associated with
>>> *Object*(or I am unable to figure it out). This makes it impossible to
>>> perform some operations on this DStream, for example, converting it to a
>>> Spark DataFrame.
>>> Since *KafkaAvroDecoder *doesn't allow us to have any other Class but
>>> *Object *I think I am going in the wrong direction. Any
>>> pointers/suggestions would be really helpful.
>>> *Versions used :*
>>> confluent-1.0.1
>>> spark-1.6.0-bin-hadoop2.4
>>> Scala code runner version - 2.11.6
>>> And this is the small piece of code I am using :
>>> package org.myorg.scalaexamples
>>> import org.apache.spark.rdd.RDD
>>> import org.apache.spark.SparkConf
>>> import org.apache.spark.streaming._
>>> import org.apache.spark.SparkContext
>>> import org.apache.avro.mapred.AvroKey
>>> import org.apache.spark.sql.SQLContext
>>> //import org.apache.avro.mapred.AvroValue
>>> import org.apache.spark.streaming.kafka._
>>> import
>>> import org.apache.avro.generic.GenericRecord
>>> import org.apache.spark.streaming.dstream.DStream
>>> import io.confluent.kafka.serializers.KafkaAvroDecoder
>>> //import
>>> //import org.apache.spark.streaming.dstream.ForEachDStream
>>> import org.apache.spark.sql.SQLContext
>>> import org.apache.kafka.common.serialization.Deserializer
>>> object DirectKafkaWordCount {
>>>   def main(args: Array[String]) {
>>>     if (args.length < 2) {
>>>       System.err.println(s"""
>>>         |Usage: DirectKafkaWordCount <brokers> <topics>
>>>         |  <brokers> is a list of one or more Kafka brokers
>>>         |  <topics> is a list of one or more kafka topics to consume from
>>>         |
>>>         """.stripMargin)
>>>       System.exit(1)
>>>     }
>>>     val Array(brokers, topics) = args
>>>     val sparkConf = new
>>> SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")
>>> sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]]))
>>>     val ssc = new StreamingContext(sparkConf, Seconds(5))
>>>     val topicsSet = topics.split(",").toSet
>>>     val kafkaParams = Map[String, String]("" ->
>>> brokers, "" -> "consumer",
>>>       "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> "
>>> http://localhost:8081";)
>>>     val messages = KafkaUtils.createDirectStream[Object, Object,
>>> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
>>>     messages.print()
>>>     ssc.start()
>>>     ssc.awaitTermination()
>>>   }
>>> }
>>> Thank you so much for your valuable time!
