Sometimes it is useful to convert a RDD into a DStream for testing purposes (generating DStreams from historical data, etc). Is there an easy way to do this?
I could come up with the following inefficient way but no sure if there is a better way to achieve this. Thoughts? class RDDExtension[T](rdd: RDD[T]) { def chunked(chunkSize: Int): RDD[Seq[T]] = { rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize)) } def skipFirst(): RDD[T] = { rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1) } def toStream(streamingContext: StreamingContext, chunkSize: Int, slideDurationMilli: Option[Long] = None): DStream[T] = { new InputDStream[T](streamingContext) { @volatile private var currentRDD: RDD[Seq[T]] = rdd.chunked(chunkSize) override def start(): Unit = {} override def stop(): Unit = {} override def compute(validTime: Time): Option[RDD[T]] = { val chunk = currentRDD.take(1) currentRDD = currentRDD.skipFirst() Some(rdd.sparkContext.parallelize(chunk)) } override def slideDuration = { slideDurationMilli.map(duration => new Duration(duration)). getOrElse(super.slideDuration) } } }