No, you don't need a conditional. read.json on an empty rdd will return an empty dataframe. Foreach on an empty dataframe or an empty rdd won't do anything (a task will still get run, but it won't do anything).
Leave the conditional out. Add one thing at a time to the working rdd.foreach example and see when it stops working, then take a closer look at the logs. On Tue, Aug 9, 2016 at 10:20 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: > Hi Cody, > > Without conditional . It is going with fine. But any processing inside > conditional get on to waiting (or) something. > Facing this issue with partitioned topics. I would need conditional to skip > processing when batch is empty. > kafkaStream.foreachRDD( > rdd => { > > val dataFrame = sqlContext.read.json(rdd.map(_._2)) > /*if (dataFrame.count() > 0) { > dataFrame.foreach(println) > } > else > { > println("Empty DStream ") > }*/ > }) > > On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger <c...@koeninger.org> wrote: >> >> Take out the conditional and the sqlcontext and just do >> >> rdd => { >> rdd.foreach(println) >> >> >> as a base line to see if you're reading the data you expect >> >> On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi >> <diwakar.dhanusk...@gmail.com> wrote: >> > Hi, >> > >> > I am reading json messages from kafka . Topics has 2 partitions. When >> > running streaming job using spark-submit, I could see that val >> > dataFrame = >> > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing >> > something wrong here. Below is code .This environment is cloudera >> > sandbox >> > env. Same issue in hadoop production cluster mode except that it is >> > restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka >> > 0.10 and Spark 1.4. >> > >> > val kafkaParams = >> > Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", >> > "group.id" -> "xyz","auto.offset.reset"->"smallest") >> > val conf = new SparkConf().setMaster("local[3]").setAppName("topic") >> > val ssc = new StreamingContext(conf, Seconds(1)) >> > >> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) >> > >> > val topics = Set("gpp.minf") >> > val kafkaStream = KafkaUtils.createDirectStream[String, String, >> > StringDecoder,StringDecoder](ssc, kafkaParams, topics) >> > >> > kafkaStream.foreachRDD( >> > rdd => { >> > if (rdd.count > 0){ >> > val dataFrame = sqlContext.read.json(rdd.map(_._2)) >> > dataFrame.printSchema() >> > //dataFrame.foreach(println) >> > } >> > } > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org