Hi, Here is a working example I did.
HTH Regards, Sivakumaran S val topics = "test" val brokers = "localhost:9092" val topicsSet = topics.split(",").toSet val sparkConf = new SparkConf().setAppName("KafkaWeatherCalc").setMaster("local") //spark://localhost:7077 val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(60)) val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) messages.foreachRDD(rdd => { if (rdd.isEmpty()) { println("Failed to get data from Kafka. Please check that the Kafka producer is streaming data.") System.exit(-1) } val sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext) val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF() //Process your DF as required here on } > On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> > wrote: > > Hi, > > I am reading json messages from kafka . Topics has 2 partitions. When running > streaming job using spark-submit, I could see that val dataFrame = > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing > something wrong here. Below is code .This environment is cloudera sandbox > env. Same issue in hadoop production cluster mode except that it is > restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka 0.10 > and Spark 1.4. > > val kafkaParams = > Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", > "group.id <http://group.id/>" -> "xyz","auto.offset.reset"->"smallest") > val conf = new SparkConf().setMaster("local[3]").setAppName("topic") > val ssc = new StreamingContext(conf, Seconds(1)) > > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) > > val topics = Set("gpp.minf") > val kafkaStream = KafkaUtils.createDirectStream[String, String, > StringDecoder,StringDecoder](ssc, kafkaParams, topics) > > kafkaStream.foreachRDD( > rdd => { > if (rdd.count > 0){ > val dataFrame = sqlContext.read.json(rdd.map(_._2)) > dataFrame.printSchema() > //dataFrame.foreach(println) > } > }