Nice question :) Ideally you should use a queuestream interface to push RDD into a queue & then spark streaming can handle the rest. Though why are you looking to convert RDD to DStream, another workaround folks use is to source DStream from folders & move files that they need reprocessed back into the folder, its a hack but much less headache .
Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi <https://twitter.com/mayur_rustagi> On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar < [email protected]> wrote: > Hi everyone > > I haven't been receiving replies to my queries in the distribution list. > Not pissed but I am actually curious to know if my messages are actually > going through or not. Can someone please confirm that my msgs are getting > delivered via this distribution list? > > Thanks, > Aniket > > > On 1 August 2014 13:55, Aniket Bhatnagar <[email protected]> > wrote: > >> Sometimes it is useful to convert a RDD into a DStream for testing >> purposes (generating DStreams from historical data, etc). Is there an easy >> way to do this? >> >> I could come up with the following inefficient way but no sure if there >> is a better way to achieve this. Thoughts? >> >> class RDDExtension[T](rdd: RDD[T]) { >> >> def chunked(chunkSize: Int): RDD[Seq[T]] = { >> rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize)) >> } >> >> def skipFirst(): RDD[T] = { >> rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1) >> } >> >> def toStream(streamingContext: StreamingContext, chunkSize: Int, >> slideDurationMilli: Option[Long] = None): DStream[T] = { >> new InputDStream[T](streamingContext) { >> >> @volatile private var currentRDD: RDD[Seq[T]] = >> rdd.chunked(chunkSize) >> >> override def start(): Unit = {} >> >> override def stop(): Unit = {} >> >> override def compute(validTime: Time): Option[RDD[T]] = { >> val chunk = currentRDD.take(1) >> currentRDD = currentRDD.skipFirst() >> Some(rdd.sparkContext.parallelize(chunk)) >> } >> >> override def slideDuration = { >> slideDurationMilli.map(duration => new Duration(duration)). >> getOrElse(super.slideDuration) >> } >> } >> >> } >> > >
