I'm trying to understand Spark streaming, hoping someone can help.
I've kinda-sorta got a version of Word Count running, and it looks like
this:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
object StreamingWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: StreamingWordCount <master> <hostname>
<port>")
System.exit(1)
}
val master = args(0)
val hostname = args(1)
val port = args(2).toInt
val ssc = new StreamingContext(master, "Streaming Word
Count",Seconds(2))
val lines = ssc.socketTextStream(hostname, port)
val words = lines.flatMap(line => line.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
(I also have a small script that sends text to that port.)
*Question 1:*
When I run this, I don't get any output from the wordCounts.print as long
as my data is still streaming. I have to stop my streaming data script
before my program will display the word counts.
Why is that? What if my stream is indefinite? I thought the point of
Streaming was that it would process it in real time?
*Question 2:*
While I run this (and the stream is still sending) I get continuous warning
messages like this:
14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623200 already
exists on this machine; not re-adding it
14/03/26 10:57:03 WARN BlockManager: Block input-0-1395856623400 already
exists on this machine; not re-adding it
What does that mean?
*Question 3:*
I tried replacing the wordCounts.print() line with
wordCounts.saveAsTextFiles("file:/my/path/outdir").
This results in the creation of a new outdir-timestamp file being created
every two seconds...even if there's no data during that time period. Is
there a way to tell it to save only if there's data?
Thanks!