Very good questions! Responses inline. TD
On Thu, Mar 27, 2014 at 8:02 AM, Diana Carroll <dcarr...@cloudera.com> wrote: > I'm working with spark streaming using spark-shell, and hoping folks could > answer a few questions I have. > > I'm doing WordCount on a socket stream: > > import org.apache.spark.streaming.StreamingContext > import org.apache.spark.streaming.StreamingContext._ > import org.apache.spark.streaming.Seconds > var ssc = new StreamingContext(sc,Seconds(5)) > var mystream = ssc.socketTextStream("localhost",4444) > var words = mystream.flatMap(line => line.split(" ")) > var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y) > wordCounts.print() > ssc.start() > > > > 1. I'm assuming that using spark shell is an edge case, and that spark > streaming is really intended mostly for batch use. True? > Yes. Currently the spark-shell is not the intended execution mode for Spark Streaming, even though it can be done for quick testing. > 2. I notice that once I start ssc.start(), my stream starts processing and > continues indefinitely...even if I close the socket on the server end (I'm > using unix command "nc" to mimic a server as explained in the streaming > programming guide .) Can I tell my stream to detect if it's lost a > connection and therefore stop executing? (Or even better, to attempt to > re-establish the connection?) > Currently, not yet. But I am aware of this and this behavior will be improved in the future. > 3. I tried entering ssc.stop which resulted in an error: > > Exception in thread "Thread-43" org.apache.spark.SparkException: Job > cancelled because SparkContext was shut down > 14/03/27 07:36:13 ERROR ConnectionManager: Corresponding > SendingConnectionManagerId not found > > But it did stop the DStream execution. > Ah, that happens sometimes. The existing behavior of ssc.stop() is that it will stop everything immediately. I just opened a pull request for a more graceful shutting down of the Spark streaming program. https://github.com/apache/spark/pull/247 > 4. Then I tried restarting the ssc again (ssc.start) and got another error: > org.apache.spark.SparkException: JobScheduler already started > Is restarting an ssc supported? > Restarting is ideally not supported. However, the behavior was not explicitly checked. The above pull requests makes the behavior more explicitly by throwing the right warnings and exceptions. > 5. When I perform an operation like wordCounts.print(), that operation will > execution on each batch, ever n seconds. Is there a way I can undo that > operation? That is, I want it to *stop* executing that print ever n > seconds...without having to stop the stream. > > What I'm really asking is...can I explore DStreams interactively the way I > can explore my data in regular Spark. In regular Spark, I might perform > various operations on an RDD to see what happens. So at first, I might have > used "split(" ") to tokenize my input text, but now I want to try using > split(",") instead, after the stream has already started running. Can I do > that? > > I did find out that if add a new operation to an existing dstream (say, > words.print()) after the ssc.start it works. It *will* add the second > print() call to the execution list every n seconds. > > but if I try to add new dstreams, e.g. > ... > > ssc.start() > > var testpairs = words.map(x => (x, "TEST")) > testpairs.print() > > > I get an error: > > 14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time > 1395932270000 ms > java.lang.Exception: > org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been > initialized > > > Is this sort of interactive use just not supported? Modifying the DStream operations after the context has started is not officially supported. However dynamically changing the computation can be done using DStream.transform() or DStream.foreachRDD() Both these operations allow you to do arbitrary RDD operations on each RDD. So you can dynamically modify what RDD operations are used within the DStream transform / foreachRDD (so you are not changing the DStream operations, only whats inside the DStream operation). But to use this really interactively, you have to write a bit of additional code that allows the user to interactively specify the function applied on each RDD. > > Thanks! > > Diana