As a post-script, when running the example in precompiled form:

/bin/run-example org.apache.spark.streaming.examples.NetworkWordCount
local[2] localhost 9999


... I don't need to send a ^D to the netcat stream. It does print the
batches to stdout in the manner I'd expect. So is this more repl weirdness
than spark weirdness?

- Aaron


On Fri, Feb 28, 2014 at 8:46 PM, Aaron Kimball <akimbal...@gmail.com> wrote:

> Hi folks,
>
> I was trying to work through the streaming word count example at
> http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmland
>  couldn't get the code as-written to run. In fairness, I was trying to
> do this inside the REPL rather than compiling a separate project; would the
> types be different?
>
> In any case, here's the code I ran:
>
> $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell
>
> scala> import org.apache.spark.streaming._
> scala> val ssc = new StreamingContext(sc, Seconds(2))
> scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
> scala> val words = lines.flatMap(_.split(" "))
>
> // *** The following code from the html page doesn't work
> // because pairs has type DStream[(String, Int)] and
> // there is no reduceByKey method on this type.
>
> // Count each word in each batch
> scala> val pairs = words.map(word => (word, 1))
> scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no
> reduceByKey()
>
> // Print a few of the counts to the console
> scala> wordCount.print()   // ... and even if the above did work,
> 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile
> as written.
>
>
> Instead, I got the following to run instead:
> scala> val wordCounts = words.countByValue()
> scala> wordCounts.print()
> scala> ssc.start()             // Start the computation
> scala> ssc.awaitTermination()
>
> This worked if I ran 'nc -lk 1234' in another terminal and typed some
> words into it.. but the 'wordCounts.print()' statement would only emit
> things to stdout if I sent a ^D into the netcat stream. It seems to print
> the output for all 2-second windows all-at-once after the ^D in the network
> stream. Is this an expected effect? I don't understand the semantics of
> ssc.start / awaitTermination well enough to know how it interacts with the
> print statement on wordCounts (which I think is a DStream of RRDs?)
>
> I set spark.cleaner.ttl to a relatively high value (I'm not sure what
> units those are.. seconds or millis) because a lower value caused stderr to
> spam everywhere and make my terminal unreadable. Is that part of my issue?
> the spark repl said I had to set it, so I just picked a number.
>
> I kind of expected wordCounts.print() to be constantly emitting (word, N)
> pairs to my spark terminal as I typed into the netcat side of things.
>
> I'm using Spark built from github source that I pulled from source earlier
> today.
>
> I am using the following as my 'origin':
>   Fetch URL: git://github.com/apache/incubator-spark.git
>
> ... and the most recent commit (master a.k.a. HEAD) is:
> commit 4d880304867b55a4f2138617b30600b7fa013b14
> Author: Bryn Keller <bryn.kel...@intel.com>
> Date:   Mon Feb 24 17:35:22 2014 -0800
>
>
> In any case, I'm happy to help update the docs (or the code) if this is a
> bug. I realize this is getting long-winded. But in any case, I think my
> questions really boil down to:
>
> 1) should there be a reduceByKey() method on DStream? The documentation at
> http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmlsays
>  so in the "Transformations" section, but the scaladoc at
> https://spark.incubator.apache.org/docs/latest/api/streaming/index.html#org.apache.spark.streaming.dstream.DStreamdoesn't
>  list it.  DStream.scala also doesn't have a definition for such a
> method...
>
> (and based on reading the source of NetworkWordCount.scala, I can't
> spot-identify why this *does* work there (i.e., reduceByKey compiles) but
> it doesn't do so in the terminal)
>
> 2) Why do I have to wait for the stream to "terminate" with a ^D before
> seeing any stdout in the repl from the wordCounts.print() statement?
>  Doesn't this defeat the point of "streaming"?
> 2a) how does the print() statement interact with ssc.start() and
> ssc.awaitTermination() ?
>
> 3) is the cleaner TTL something that, as a user, I should be adjusting to
> change my observed effects? i.e., would adjusting this change the frequency
> of emissions to stdout of prior window data?  Or is this just a background
> property that happens to affect the spamminess of my stderr that is routed
> to the same console?
>
> 4) Should I update the documentation to match my example (i.e., no
> reduceByKey, but use words.countByValue() instead)?
>
> 5) Now that Spark is a TLP, are my references to the incubator-spark.git
> and the http://spark.incubator.apache.org docs woefully out of date,
> making this entire exercise a goof? :)
>
> Thanks for the help!
>
> Cheers,
> - Aaron
>
>

Reply via email to