Dear Spark developers,

I am working with Spark streaming 1.6.1. The task is to get RDDs for some 
external analytics from each timewindow. This external function accepts RDD so 
I cannot use DStream. I learned that DStream.window.compute(time) returns 
Option[RDD]. I am trying to use it in the following code derived from the 
example in programming guide:
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

val lines = ssc.socketTextStream("localhost", 9999)

val rdd = lines.window(Seconds(5), 
Seconds(3)).compute(Time(System.currentTimeMillis())) // this does not seem to 
be a proper way to set time

ssc.start()

ssc.awaitTermination()

At the line with rdd I get the following exception: Exception in thread "main" 
org.apache.spark.SparkException: 
org.apache.spark.streaming.dstream.SocketInputDStream@2264e43c has not been 
initialized.

The other option to get RDD from DStream is to use "slice" function. However, 
it is not clear how to use it and I get the same exception with the following 
use:

val rdd = lines.slice(Time(System.currentTimeMillis() - 100), 
Time(System.currentTimeMillis())) // does not seem correct

Could you suggest what is the proper use of "compute" or "slice" functions from 
DStream or another way to get RDD from DStream?

Best regards, Alexander

P.S. I have found the following example that does streaming within the loop, 
however it looks hacky:
https://github.com/chlam4/spark-exercises/blob/master/using-dstream-slice.scala

Reply via email to