Sometimes it is useful to convert a RDD into a DStream for testing purposes
(generating DStreams from historical data, etc). Is there an easy way to do
this?

I could come up with the following inefficient way but no sure if there is
a better way to achieve this. Thoughts?

class RDDExtension[T](rdd: RDD[T]) {

  def chunked(chunkSize: Int): RDD[Seq[T]] = {
    rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
  }

  def skipFirst(): RDD[T] = {
    rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
  }

  def toStream(streamingContext: StreamingContext, chunkSize: Int,
slideDurationMilli: Option[Long] = None): DStream[T] = {
    new InputDStream[T](streamingContext) {

      @volatile private var currentRDD: RDD[Seq[T]] = rdd.chunked(chunkSize)

      override def start(): Unit = {}

      override def stop(): Unit = {}

      override def compute(validTime: Time): Option[RDD[T]] = {
        val chunk = currentRDD.take(1)
        currentRDD = currentRDD.skipFirst()
        Some(rdd.sparkContext.parallelize(chunk))
      }

      override def slideDuration = {
        slideDurationMilli.map(duration => new Duration(duration)).
          getOrElse(super.slideDuration)
      }
    }

}

Reply via email to