Actually Time was printed out always.

Is there any better method to debug the problem? I want to update spark/mqtt 
code and rebuild again to debug further.


Thanks,

Jared


________________________________
From: Saisai Shao <sai.sai.s...@gmail.com>
Sent: Wednesday, July 6, 2016 9:24 PM
To: Yu Wei
Cc: Sean Owen; Rabin Banerjee; user@spark.apache.org
Subject: Re: It seemed JavaDStream.print() did not work when launching via yarn 
on a single node

DStream.print() will collect some of the data to driver and display, please see 
the implementation of DStream.print()

RDD.take() will collect some of the data to driver.

Normally the behavior should be consistent between cluster and local mode, 
please find out the root cause of this problem, like MQTT connection or 
something else.


def print(num: Int): Unit = ssc.withScope {
  def foreachFunc: (RDD[T], Time) => Unit = {
    (rdd: RDD[T], time: Time) => {
      val firstNum = rdd.take(num + 1)
      // scalastyle:off println
      println("-------------------------------------------")
      println(s"Time: $time")
      println("-------------------------------------------")
      firstNum.take(num).foreach(println)
      if (firstNum.length > num) println("...")
      println()
      // scalastyle:on println
    }
  }
  foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = 
false)
}

On Wed, Jul 6, 2016 at 9:17 PM, Yu Wei 
<yu20...@hotmail.com<mailto:yu20...@hotmail.com>> wrote:

How about DStream.print().

Does it invoke collect before print on driver?

________________________________
From: Sean Owen <so...@cloudera.com<mailto:so...@cloudera.com>>
Sent: Wednesday, July 6, 2016 8:20:36 PM
To: Rabin Banerjee
Cc: Yu Wei; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: It seemed JavaDStream.print() did not work when launching via yarn 
on a single node

dstream.foreachRDD(_.collect.foreach(println))

On Wed, Jul 6, 2016 at 1:19 PM, Rabin Banerjee
<dev.rabin.baner...@gmail.com<mailto:dev.rabin.baner...@gmail.com>> wrote:
> Collect will help then . May be something like this,
> foreachRDD( rdd => { for(item <- rdd.collect().toArray) { println(item); }
> })
>

Reply via email to