It stops working at sqlContext.read.json(rdd.map(_._2)) . Topics without partitions is working fine. Do I need to set any other configs val kafkaParams = Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", " group.id" -> "xyz","auto.offset.reset"->"smallest") Spark version is 1.6.2
kafkaStream.foreachRDD( rdd => { rdd.foreach(println) val dataFrame = sqlContext.read.json(rdd.map(_._2)) dataFrame.foreach(println) } ) On Wed, Aug 10, 2016 at 9:05 AM, Cody Koeninger <c...@koeninger.org> wrote: > No, you don't need a conditional. read.json on an empty rdd will > return an empty dataframe. Foreach on an empty dataframe or an empty > rdd won't do anything (a task will still get run, but it won't do > anything). > > Leave the conditional out. Add one thing at a time to the working > rdd.foreach example and see when it stops working, then take a closer > look at the logs. > > > On Tue, Aug 9, 2016 at 10:20 PM, Diwakar Dhanuskodi > <diwakar.dhanusk...@gmail.com> wrote: > > Hi Cody, > > > > Without conditional . It is going with fine. But any processing inside > > conditional get on to waiting (or) something. > > Facing this issue with partitioned topics. I would need conditional to > skip > > processing when batch is empty. > > kafkaStream.foreachRDD( > > rdd => { > > > > val dataFrame = sqlContext.read.json(rdd.map(_._2)) > > /*if (dataFrame.count() > 0) { > > dataFrame.foreach(println) > > } > > else > > { > > println("Empty DStream ") > > }*/ > > }) > > > > On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> Take out the conditional and the sqlcontext and just do > >> > >> rdd => { > >> rdd.foreach(println) > >> > >> > >> as a base line to see if you're reading the data you expect > >> > >> On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi > >> <diwakar.dhanusk...@gmail.com> wrote: > >> > Hi, > >> > > >> > I am reading json messages from kafka . Topics has 2 partitions. When > >> > running streaming job using spark-submit, I could see that val > >> > dataFrame = > >> > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing > >> > something wrong here. Below is code .This environment is cloudera > >> > sandbox > >> > env. Same issue in hadoop production cluster mode except that it is > >> > restricted thats why tried to reproduce issue in Cloudera sandbox. > Kafka > >> > 0.10 and Spark 1.4. > >> > > >> > val kafkaParams = > >> > Map[String,String]("bootstrap.servers"->"localhost:9093, > localhost:9092", > >> > "group.id" -> "xyz","auto.offset.reset"->"smallest") > >> > val conf = new SparkConf().setMaster("local[3]").setAppName("topic") > >> > val ssc = new StreamingContext(conf, Seconds(1)) > >> > > >> > val sqlContext = new org.apache.spark.sql. > SQLContext(ssc.sparkContext) > >> > > >> > val topics = Set("gpp.minf") > >> > val kafkaStream = KafkaUtils.createDirectStream[String, String, > >> > StringDecoder,StringDecoder](ssc, kafkaParams, topics) > >> > > >> > kafkaStream.foreachRDD( > >> > rdd => { > >> > if (rdd.count > 0){ > >> > val dataFrame = sqlContext.read.json(rdd.map(_._2)) > >> > dataFrame.printSchema() > >> > //dataFrame.foreach(println) > >> > } > >> > } > > > > >