As a post-script, when running the example in precompiled form: /bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
... I don't need to send a ^D to the netcat stream. It does print the batches to stdout in the manner I'd expect. So is this more repl weirdness than spark weirdness? - Aaron On Fri, Feb 28, 2014 at 8:46 PM, Aaron Kimball <akimbal...@gmail.com> wrote: > Hi folks, > > I was trying to work through the streaming word count example at > http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmland > couldn't get the code as-written to run. In fairness, I was trying to > do this inside the REPL rather than compiling a separate project; would the > types be different? > > In any case, here's the code I ran: > > $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell > > scala> import org.apache.spark.streaming._ > scala> val ssc = new StreamingContext(sc, Seconds(2)) > scala> val lines = ssc.socketTextStream("127.0.0.1", 1234) > scala> val words = lines.flatMap(_.split(" ")) > > // *** The following code from the html page doesn't work > // because pairs has type DStream[(String, Int)] and > // there is no reduceByKey method on this type. > > // Count each word in each batch > scala> val pairs = words.map(word => (word, 1)) > scala> val wordCounts = pairs.reduceByKey(_ + _) // <-- error here. no > reduceByKey() > > // Print a few of the counts to the console > scala> wordCount.print() // ... and even if the above did work, > 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile > as written. > > > Instead, I got the following to run instead: > scala> val wordCounts = words.countByValue() > scala> wordCounts.print() > scala> ssc.start() // Start the computation > scala> ssc.awaitTermination() > > This worked if I ran 'nc -lk 1234' in another terminal and typed some > words into it.. but the 'wordCounts.print()' statement would only emit > things to stdout if I sent a ^D into the netcat stream. It seems to print > the output for all 2-second windows all-at-once after the ^D in the network > stream. Is this an expected effect? I don't understand the semantics of > ssc.start / awaitTermination well enough to know how it interacts with the > print statement on wordCounts (which I think is a DStream of RRDs?) > > I set spark.cleaner.ttl to a relatively high value (I'm not sure what > units those are.. seconds or millis) because a lower value caused stderr to > spam everywhere and make my terminal unreadable. Is that part of my issue? > the spark repl said I had to set it, so I just picked a number. > > I kind of expected wordCounts.print() to be constantly emitting (word, N) > pairs to my spark terminal as I typed into the netcat side of things. > > I'm using Spark built from github source that I pulled from source earlier > today. > > I am using the following as my 'origin': > Fetch URL: git://github.com/apache/incubator-spark.git > > ... and the most recent commit (master a.k.a. HEAD) is: > commit 4d880304867b55a4f2138617b30600b7fa013b14 > Author: Bryn Keller <bryn.kel...@intel.com> > Date: Mon Feb 24 17:35:22 2014 -0800 > > > In any case, I'm happy to help update the docs (or the code) if this is a > bug. I realize this is getting long-winded. But in any case, I think my > questions really boil down to: > > 1) should there be a reduceByKey() method on DStream? The documentation at > http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmlsays > so in the "Transformations" section, but the scaladoc at > https://spark.incubator.apache.org/docs/latest/api/streaming/index.html#org.apache.spark.streaming.dstream.DStreamdoesn't > list it. DStream.scala also doesn't have a definition for such a > method... > > (and based on reading the source of NetworkWordCount.scala, I can't > spot-identify why this *does* work there (i.e., reduceByKey compiles) but > it doesn't do so in the terminal) > > 2) Why do I have to wait for the stream to "terminate" with a ^D before > seeing any stdout in the repl from the wordCounts.print() statement? > Doesn't this defeat the point of "streaming"? > 2a) how does the print() statement interact with ssc.start() and > ssc.awaitTermination() ? > > 3) is the cleaner TTL something that, as a user, I should be adjusting to > change my observed effects? i.e., would adjusting this change the frequency > of emissions to stdout of prior window data? Or is this just a background > property that happens to affect the spamminess of my stderr that is routed > to the same console? > > 4) Should I update the documentation to match my example (i.e., no > reduceByKey, but use words.countByValue() instead)? > > 5) Now that Spark is a TLP, are my references to the incubator-spark.git > and the http://spark.incubator.apache.org docs woefully out of date, > making this entire exercise a goof? :) > > Thanks for the help! > > Cheers, > - Aaron > >