Hello,
I am trying to implement something like "process a stream for N
seconds, then return a result" with Spark Streaming (built from git
head). My approach (which is probably not very elegant) is
val ssc = new StreamingContext(...)
ssc.start()
future {
Thread.sleep(Seconds(N))
ssc.stop(true)
}
ssc.awaitTermination()
and in fact, this stops the stream processing. However, I get the
following error messages:
14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered
receiver for stream 0: Stopped by driver
14/05/12 18:41:49 ERROR scheduler.ReceiverTracker: Deregistered
receiver for stream 0: Restarting receiver with delay 2000ms: Retrying
connecting to localhost:9999
14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found
14/05/12 18:41:50 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found
(where localhost:9999 is the source I am reading the stream from).
This doesn't actually seem like the proper way to do it. Can anyone
point me to how to implement "stop after N seconds" without these
error messages?
Thanks
Tobias