Hi Diana,

I'll answer Q3:

You can check if an RDD is empty in several ways.
Someone here mentioned that using an iterator was safer:
 val isEmpty = rdd.mapPartitions(iter => Iterator(! iter.hasNext)).reduce(_&&_)

You can also check with a fold or rdd.count
  rdd.reduce(_ + _)  // can't handle empty RDD
  rdd.fold(0)(_ + _)  // no problem with empty RDD

A
From: Diana Carroll [mailto:dcarr...@cloudera.com]
Sent: March-26-14 2:09 PM
To: user
Subject: streaming questions

I'm trying to understand Spark streaming, hoping someone can help.

I've kinda-sorta got a version of Word Count running, and it looks like this:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

object StreamingWordCount {

  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage: StreamingWordCount <master> <hostname> <port>")
      System.exit(1)
    }

    val master = args(0)
    val hostname = args(1)
    val port = args(2).toInt

    val ssc = new StreamingContext(master, "Streaming Word Count",Seconds(2))
    val lines = ssc.socketTextStream(hostname, port)
    val words = lines.flatMap(line => line.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
 }
}

(I also have a small script that sends text to that port.)

Question 1:
When I run this, I don't get any output from the wordCounts.print as long as my 
data is still streaming.  I have to stop my streaming data script before my 
program will display the word counts.

Why is that?  What if my stream is indefinite?  I thought the point of 
Streaming was that it would process it in real time?

Question 2:
While I run this (and the stream is still sending) I get continuous warning 
messages like this:
14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623200 already exists 
on this machine; not re-adding it
14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623400 already exists 
on this machine; not re-adding it

What does that mean?

Question 3:
I tried replacing the wordCounts.print() line with 
wordCounts.saveAsTextFiles("file:/my/path/outdir").
This results in the creation of a new outdir-timestamp file being created every 
two seconds...even if there's no data during that time period.  Is there a way 
to tell it to save only if there's data?

Thanks!

Reply via email to