I have a similar requirement. But instead of grouping it by chunkSize, I would have the timeStamp be part of the data. So the function I want has the following signature:
// RDD of (timestamp, value) def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: StreamingContext): DStream[T] And DStream should respect the timestamp part. This is important for simulation, right? Do you have any good solution for this? Jianshi On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Hey Aniket, > > Great thoughts! I understand the usecase. But as you have realized > yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD > operations are defined to be scan based, it is not efficient to define RDD > based on slices of data within a partition of another RDD, using pure RDD > transformations. What you have done is a decent, and probably the only > feasible solution, with its limitations. > > Also the requirements of converting a batch of data to a stream of data > can be pretty diverse. What rate, what # of events per batch, how many > batches, is it efficient? Hence, it is not trivial to define a good, clean > public API for that. If any one has any thoughts, ideas, etc on this, you > are more than welcome to share them. > > TD > > > On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar < > aniket.bhatna...@gmail.com> wrote: > >> The use case for converting RDD into DStream is that I want to simulate a >> stream from an already persisted data for testing analytics. It is trivial >> to create a RDD from any persisted data but not so much for DStream. >> Therefore, my idea to create DStream from RDD. For example, lets say you >> are trying to implement analytics on time series data using Lambda >> architecture. This means you would have to implement the same analytics on >> streaming data (in streaming mode) as well as persisted data (in batch >> mode). The workflow for implementing the anlytics would be to first >> implement it in batch mode using RDD operations and then simulate stream to >> test the analytics in stream mode. The simulated stream should produce the >> elements at a specified rate. So the solution maybe to read data in a RDD, >> split (chunk) it into multiple RDDs with each RDD having the size of >> elements that need to be streamed per time unit and then finally stream >> each RDD using the compute function. >> >> The problem with using QueueInputDStream is that it will stream data as >> per the batch duration specified in the streaming context and one cannot >> specify a custom slide duration. Moreover, the class QueueInputDStream is >> private to streaming package, so I can't really use it/extend it from an >> external package. Also, I could not find a good solution split a RDD into >> equal sized smaller RDDs that can be fed into an extended version of >> QueueInputDStream. >> >> Finally, here is what I came up with: >> >> class RDDExtension[T: ClassTag](rdd: RDD[T]) { >> def toStream(streamingContext: StreamingContext, chunkSize: Int, >> slideDurationMilli: Option[Long] = None): DStream[T] = { >> new InputDStream[T](streamingContext) { >> >> private val iterator = rdd.toLocalIterator // WARNING: each >> partition much fit in RAM of local machine. >> private val grouped = iterator.grouped(chunkSize) >> >> override def start(): Unit = {} >> >> override def stop(): Unit = {} >> >> override def compute(validTime: Time): Option[RDD[T]] = { >> if (grouped.hasNext) { >> Some(rdd.sparkContext.parallelize(grouped.next())) >> } else { >> None >> } >> } >> >> override def slideDuration = { >> slideDurationMilli.map(duration => new Duration(duration)). >> getOrElse(super.slideDuration) >> } >> } >> } >> >> This aims to stream chunkSize elements every slideDurationMilli >> milliseconds (defaults to batch size in streaming context). It's still not >> perfect (for example, the streaming is not precise) but given that this >> will only be used for testing purposes, I don't look for ways to further >> optimize it. >> >> Thanks, >> Aniket >> >> >> >> On 2 August 2014 04:07, Mayur Rustagi <mayur.rust...@gmail.com> wrote: >> >>> Nice question :) >>> Ideally you should use a queuestream interface to push RDD into a queue >>> & then spark streaming can handle the rest. >>> Though why are you looking to convert RDD to DStream, another workaround >>> folks use is to source DStream from folders & move files that they need >>> reprocessed back into the folder, its a hack but much less headache . >>> >>> Mayur Rustagi >>> Ph: +1 (760) 203 3257 >>> http://www.sigmoidanalytics.com >>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>> >>> >>> >>> On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar < >>> aniket.bhatna...@gmail.com> wrote: >>> >>>> Hi everyone >>>> >>>> I haven't been receiving replies to my queries in the distribution >>>> list. Not pissed but I am actually curious to know if my messages are >>>> actually going through or not. Can someone please confirm that my msgs are >>>> getting delivered via this distribution list? >>>> >>>> Thanks, >>>> Aniket >>>> >>>> >>>> On 1 August 2014 13:55, Aniket Bhatnagar <aniket.bhatna...@gmail.com> >>>> wrote: >>>> >>>>> Sometimes it is useful to convert a RDD into a DStream for testing >>>>> purposes (generating DStreams from historical data, etc). Is there an easy >>>>> way to do this? >>>>> >>>>> I could come up with the following inefficient way but no sure if >>>>> there is a better way to achieve this. Thoughts? >>>>> >>>>> class RDDExtension[T](rdd: RDD[T]) { >>>>> >>>>> def chunked(chunkSize: Int): RDD[Seq[T]] = { >>>>> rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize)) >>>>> } >>>>> >>>>> def skipFirst(): RDD[T] = { >>>>> rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1) >>>>> } >>>>> >>>>> def toStream(streamingContext: StreamingContext, chunkSize: Int, >>>>> slideDurationMilli: Option[Long] = None): DStream[T] = { >>>>> new InputDStream[T](streamingContext) { >>>>> >>>>> @volatile private var currentRDD: RDD[Seq[T]] = >>>>> rdd.chunked(chunkSize) >>>>> >>>>> override def start(): Unit = {} >>>>> >>>>> override def stop(): Unit = {} >>>>> >>>>> override def compute(validTime: Time): Option[RDD[T]] = { >>>>> val chunk = currentRDD.take(1) >>>>> currentRDD = currentRDD.skipFirst() >>>>> Some(rdd.sparkContext.parallelize(chunk)) >>>>> } >>>>> >>>>> override def slideDuration = { >>>>> slideDurationMilli.map(duration => new Duration(duration)). >>>>> getOrElse(super.slideDuration) >>>>> } >>>>> } >>>>> >>>>> } >>>>> >>>> >>>> >>> >> > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/