Sometimes it is useful to convert a RDD into a DStream for testing purposes
(generating DStreams from historical data, etc). Is there an easy way to do
this?
I could come up with the following inefficient way but no sure if there is
a better way to achieve this. Thoughts?
class RDDExtension[T](rdd: RDD[T]) {
def chunked(chunkSize: Int): RDD[Seq[T]] = {
rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
}
def skipFirst(): RDD[T] = {
rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
}
def toStream(streamingContext: StreamingContext, chunkSize: Int,
slideDurationMilli: Option[Long] = None): DStream[T] = {
new InputDStream[T](streamingContext) {
@volatile private var currentRDD: RDD[Seq[T]] = rdd.chunked(chunkSize)
override def start(): Unit = {}
override def stop(): Unit = {}
override def compute(validTime: Time): Option[RDD[T]] = {
val chunk = currentRDD.take(1)
currentRDD = currentRDD.skipFirst()
Some(rdd.sparkContext.parallelize(chunk))
}
override def slideDuration = {
slideDurationMilli.map(duration => new Duration(duration)).
getOrElse(super.slideDuration)
}
}
}