The use case for converting RDD into DStream is that I want to simulate a
stream from an already persisted data for testing analytics. It is trivial
to create a RDD from any persisted data but not so much for DStream.
Therefore, my idea to create DStream from RDD. For example, lets say you
are trying to implement analytics on time series data using Lambda
architecture. This means you would have to implement the same analytics on
streaming data (in streaming mode) as well as persisted data (in batch
mode). The workflow for implementing the anlytics would be to first
implement it in batch mode using RDD operations and then simulate stream to
test the analytics in stream mode. The simulated stream should produce the
elements at a specified rate. So the solution maybe to read data in a RDD,
split (chunk) it into multiple RDDs with each RDD having the size of
elements that need to be streamed per time unit and then finally stream
each RDD using the compute function.

The problem with using QueueInputDStream is that it will stream data as per
the batch duration specified in the streaming context and one cannot
specify a custom slide duration. Moreover, the class QueueInputDStream is
private to streaming package, so I can't really use it/extend it from an
external package. Also, I could not find a good solution split a RDD into
equal sized smaller RDDs that can be fed into an extended version of
QueueInputDStream.

Finally, here is what I came up with:

class RDDExtension[T: ClassTag](rdd: RDD[T]) {
  def toStream(streamingContext: StreamingContext, chunkSize: Int,
slideDurationMilli: Option[Long] = None): DStream[T] = {
    new InputDStream[T](streamingContext) {

      private val iterator = rdd.toLocalIterator // WARNING: each partition
much fit in RAM of local machine.
      private val grouped = iterator.grouped(chunkSize)

      override def start(): Unit = {}

      override def stop(): Unit = {}

      override def compute(validTime: Time): Option[RDD[T]] = {
        if (grouped.hasNext) {
          Some(rdd.sparkContext.parallelize(grouped.next()))
        } else {
          None
        }
      }

      override def slideDuration = {
        slideDurationMilli.map(duration => new Duration(duration)).
          getOrElse(super.slideDuration)
      }
    }
}

This aims to stream chunkSize elements every slideDurationMilli
milliseconds (defaults to batch size in streaming context). It's still not
perfect (for example, the streaming is not precise) but given that this
will only be used for testing purposes, I don't look for ways to further
optimize it.

Thanks,
Aniket



On 2 August 2014 04:07, Mayur Rustagi <mayur.rust...@gmail.com> wrote:

> Nice question :)
> Ideally you should use a queuestream interface to push RDD into a queue &
> then spark streaming can handle the rest.
> Though why are you looking to convert RDD to DStream, another workaround
> folks use is to source DStream from folders & move files that they need
> reprocessed back into the folder, its a hack but much less headache .
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> Hi everyone
>>
>> I haven't been receiving replies to my queries in the distribution list.
>> Not pissed but I am actually curious to know if my messages are actually
>> going through or not. Can someone please confirm that my msgs are getting
>> delivered via this distribution list?
>>
>> Thanks,
>> Aniket
>>
>>
>> On 1 August 2014 13:55, Aniket Bhatnagar <aniket.bhatna...@gmail.com>
>> wrote:
>>
>>> Sometimes it is useful to convert a RDD into a DStream for testing
>>> purposes (generating DStreams from historical data, etc). Is there an easy
>>> way to do this?
>>>
>>> I could come up with the following inefficient way but no sure if there
>>> is a better way to achieve this. Thoughts?
>>>
>>> class RDDExtension[T](rdd: RDD[T]) {
>>>
>>>   def chunked(chunkSize: Int): RDD[Seq[T]] = {
>>>     rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
>>>   }
>>>
>>>   def skipFirst(): RDD[T] = {
>>>     rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
>>>   }
>>>
>>>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
>>> slideDurationMilli: Option[Long] = None): DStream[T] = {
>>>     new InputDStream[T](streamingContext) {
>>>
>>>       @volatile private var currentRDD: RDD[Seq[T]] =
>>> rdd.chunked(chunkSize)
>>>
>>>       override def start(): Unit = {}
>>>
>>>       override def stop(): Unit = {}
>>>
>>>       override def compute(validTime: Time): Option[RDD[T]] = {
>>>         val chunk = currentRDD.take(1)
>>>         currentRDD = currentRDD.skipFirst()
>>>         Some(rdd.sparkContext.parallelize(chunk))
>>>       }
>>>
>>>       override def slideDuration = {
>>>         slideDurationMilli.map(duration => new Duration(duration)).
>>>           getOrElse(super.slideDuration)
>>>       }
>>>     }
>>>
>>> }
>>>
>>
>>
>

Reply via email to