Hi,

Here is a working example I did.

HTH

Regards,

Sivakumaran S

val topics = "test"
val brokers = "localhost:9092"
val topicsSet = topics.split(",").toSet
val sparkConf = new 
SparkConf().setAppName("KafkaWeatherCalc").setMaster("local") 
//spark://localhost:7077
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(60))
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topicsSet)
messages.foreachRDD(rdd => {
  if (rdd.isEmpty()) {
    println("Failed to get data from Kafka. Please check that the Kafka 
producer is streaming data.")
    System.exit(-1)
  }
  val sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
  val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF()
  //Process your DF as required here on
}



> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> 
> wrote:
> 
> Hi,
> 
> I am reading json messages from kafka . Topics has 2 partitions. When running 
> streaming job using spark-submit, I could see that  val dataFrame = 
> sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing 
> something wrong here. Below is code .This environment is cloudera sandbox 
> env. Same issue in hadoop production cluster mode except that it is 
> restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka 0.10 
> and  Spark 1.4.
> 
> val kafkaParams = 
> Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", 
> "group.id <http://group.id/>" -> "xyz","auto.offset.reset"->"smallest")
> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> val ssc = new StreamingContext(conf, Seconds(1))
> 
> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
> 
> val topics = Set("gpp.minf")
> val kafkaStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
> 
> kafkaStream.foreachRDD(
>   rdd => {
>     if (rdd.count > 0){
>         val dataFrame = sqlContext.read.json(rdd.map(_._2)) 
>        dataFrame.printSchema()
> //dataFrame.foreach(println)
> }
> }

Reply via email to