Hi group,

I have just started working with confluent platform and spark streaming,
and was wondering if it is possible to access individual fields from an
Avro object read from a kafka topic through spark streaming. As per its
default behaviour *KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)*
return a *DStream[Object,
Object]*, and don't have any schema associated with *Object*(or I am unable
to figure it out). This makes it impossible to perform some operations on
this DStream, for example, converting it to a Spark DataFrame.

Since *KafkaAvroDecoder *doesn't allow us to have any other Class but
*Object *I think I am going in the wrong direction. Any
pointers/suggestions would be really helpful.

*Versions used :*
confluent-1.0.1
spark-1.6.0-bin-hadoop2.4
Scala code runner version - 2.11.6

And this is the small piece of code I am using :

package org.myorg.scalaexamples

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.SparkContext
import org.apache.avro.mapred.AvroKey
import org.apache.spark.sql.SQLContext
//import org.apache.avro.mapred.AvroValue
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
import org.apache.avro.generic.GenericRecord
import org.apache.spark.streaming.dstream.DStream
import io.confluent.kafka.serializers.KafkaAvroDecoder
//import org.apache.hadoop.io.serializer.avro.AvroRecord
//import org.apache.spark.streaming.dstream.ForEachDStream
import org.apache.spark.sql.SQLContext
import org.apache.kafka.common.serialization.Deserializer

object DirectKafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(s"""
        |Usage: DirectKafkaWordCount <brokers> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }
    val Array(brokers, topics) = args
    val sparkConf = new
SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")

sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]]))
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" ->
brokers, "group.id" -> "consumer",
      "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> "
http://localhost:8081";)
    val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
    messages.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Thank you so much for your valuable time!


[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]
<http://about.me/mti>

Reply via email to