I am testing with one partition now. I am using Kafka 0.9 and Spark 1.6.1 (Scala 2.11). Just start with one topic first and then add more. I am not partitioning the topic.
HTH, Regards, Sivakumaran > On 10-Aug-2016, at 5:56 AM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> > wrote: > > Hi Siva, > > Does topic has partitions? which version of Spark you are using? > > On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kuma...@me.com > <mailto:siva.kuma...@me.com>> wrote: > Hi, > > Here is a working example I did. > > HTH > > Regards, > > Sivakumaran S > > val topics = "test" > val brokers = "localhost:9092" > val topicsSet = topics.split(",").toSet > val sparkConf = new > SparkConf().setAppName("KafkaWeatherCalc").setMaster("local") > //spark://localhost:7077 <> > val sc = new SparkContext(sparkConf) > val ssc = new StreamingContext(sc, Seconds(60)) > val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) > val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topicsSet) > messages.foreachRDD(rdd => { > if (rdd.isEmpty()) { > println("Failed to get data from Kafka. Please check that the Kafka > producer is streaming data.") > System.exit(-1) > } > val sqlContext = > org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext) > val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF() > //Process your DF as required here on > } > > > >> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com >> <mailto:diwakar.dhanusk...@gmail.com>> wrote: >> >> Hi, >> >> I am reading json messages from kafka . Topics has 2 partitions. When >> running streaming job using spark-submit, I could see that val dataFrame = >> sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing >> something wrong here. Below is code .This environment is cloudera sandbox >> env. Same issue in hadoop production cluster mode except that it is >> restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka >> 0.10 and Spark 1.4. >> >> val kafkaParams = >> Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", >> "group.id <http://group.id/>" -> "xyz","auto.offset.reset"->"smallest") >> val conf = new SparkConf().setMaster("local[3]").setAppName("topic") >> val ssc = new StreamingContext(conf, Seconds(1)) >> >> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) >> >> val topics = Set("gpp.minf") >> val kafkaStream = KafkaUtils.createDirectStream[String, String, >> StringDecoder,StringDecoder](ssc, kafkaParams, topics) >> >> kafkaStream.foreachRDD( >> rdd => { >> if (rdd.count > 0){ >> val dataFrame = sqlContext.read.json(rdd.map(_._2)) >> dataFrame.printSchema() >> //dataFrame.foreach(println) >> } >> } > >