Hi Diana, I'll answer Q3:
You can check if an RDD is empty in several ways. Someone here mentioned that using an iterator was safer: val isEmpty = rdd.mapPartitions(iter => Iterator(! iter.hasNext)).reduce(_&&_) You can also check with a fold or rdd.count rdd.reduce(_ + _) // can't handle empty RDD rdd.fold(0)(_ + _) // no problem with empty RDD A From: Diana Carroll [mailto:dcarr...@cloudera.com] Sent: March-26-14 2:09 PM To: user Subject: streaming questions I'm trying to understand Spark streaming, hoping someone can help. I've kinda-sorta got a version of Word Count running, and it looks like this: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ object StreamingWordCount { def main(args: Array[String]) { if (args.length < 3) { System.err.println("Usage: StreamingWordCount <master> <hostname> <port>") System.exit(1) } val master = args(0) val hostname = args(1) val port = args(2).toInt val ssc = new StreamingContext(master, "Streaming Word Count",Seconds(2)) val lines = ssc.socketTextStream(hostname, port) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y) wordCounts.print() ssc.start() ssc.awaitTermination() } } (I also have a small script that sends text to that port.) Question 1: When I run this, I don't get any output from the wordCounts.print as long as my data is still streaming. I have to stop my streaming data script before my program will display the word counts. Why is that? What if my stream is indefinite? I thought the point of Streaming was that it would process it in real time? Question 2: While I run this (and the stream is still sending) I get continuous warning messages like this: 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623200 already exists on this machine; not re-adding it 14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623400 already exists on this machine; not re-adding it What does that mean? Question 3: I tried replacing the wordCounts.print() line with wordCounts.saveAsTextFiles("file:/my/path/outdir"). This results in the creation of a new outdir-timestamp file being created every two seconds...even if there's no data during that time period. Is there a way to tell it to save only if there's data? Thanks!