It stops working at sqlContext.read.json(rdd.map(_._2)) . Topics without
partitions is working fine. Do I need to set any other configs
val kafkaParams =
Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", "
group.id" -> "xyz","auto.offset.reset"->"smallest")
Spark version is 1.6.2

kafkaStream.foreachRDD(
  rdd => {
   rdd.foreach(println)
   val dataFrame = sqlContext.read.json(rdd.map(_._2))
   dataFrame.foreach(println)
}
)

On Wed, Aug 10, 2016 at 9:05 AM, Cody Koeninger <c...@koeninger.org> wrote:

> No, you don't need a conditional.  read.json on an empty rdd will
> return an empty dataframe.  Foreach on an empty dataframe or an empty
> rdd won't do anything (a task will still get run, but it won't do
> anything).
>
> Leave the conditional out.  Add one thing at a time to the working
> rdd.foreach example and see when it stops working, then take a closer
> look at the logs.
>
>
> On Tue, Aug 9, 2016 at 10:20 PM, Diwakar Dhanuskodi
> <diwakar.dhanusk...@gmail.com> wrote:
> > Hi Cody,
> >
> > Without conditional . It is going with fine. But any processing inside
> > conditional get on to waiting (or) something.
> > Facing this issue with partitioned topics. I would need conditional to
> skip
> > processing when batch is empty.
> > kafkaStream.foreachRDD(
> >   rdd => {
> >
> >    val dataFrame = sqlContext.read.json(rdd.map(_._2))
> >    /*if (dataFrame.count() > 0) {
> >    dataFrame.foreach(println)
> >    }
> >    else
> >    {
> >      println("Empty DStream ")
> >    }*/
> > })
> >
> > On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> Take out the conditional and the sqlcontext and just do
> >>
> >> rdd => {
> >>   rdd.foreach(println)
> >>
> >>
> >> as a base line to see if you're reading the data you expect
> >>
> >> On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi
> >> <diwakar.dhanusk...@gmail.com> wrote:
> >> > Hi,
> >> >
> >> > I am reading json messages from kafka . Topics has 2 partitions. When
> >> > running streaming job using spark-submit, I could see that  val
> >> > dataFrame =
> >> > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing
> >> > something wrong here. Below is code .This environment is cloudera
> >> > sandbox
> >> > env. Same issue in hadoop production cluster mode except that it is
> >> > restricted thats why tried to reproduce issue in Cloudera sandbox.
> Kafka
> >> > 0.10 and  Spark 1.4.
> >> >
> >> > val kafkaParams =
> >> > Map[String,String]("bootstrap.servers"->"localhost:9093,
> localhost:9092",
> >> > "group.id" -> "xyz","auto.offset.reset"->"smallest")
> >> > val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> >> > val ssc = new StreamingContext(conf, Seconds(1))
> >> >
> >> > val sqlContext = new org.apache.spark.sql.
> SQLContext(ssc.sparkContext)
> >> >
> >> > val topics = Set("gpp.minf")
> >> > val kafkaStream = KafkaUtils.createDirectStream[String, String,
> >> > StringDecoder,StringDecoder](ssc, kafkaParams, topics)
> >> >
> >> > kafkaStream.foreachRDD(
> >> >   rdd => {
> >> >     if (rdd.count > 0){
> >> >         val dataFrame = sqlContext.read.json(rdd.map(_._2))
> >> >        dataFrame.printSchema()
> >> > //dataFrame.foreach(println)
> >> > }
> >> > }
> >
> >
>

Reply via email to