Hi everyone

I haven't been receiving replies to my queries in the distribution list.
Not pissed but I am actually curious to know if my messages are actually
going through or not. Can someone please confirm that my msgs are getting
delivered via this distribution list?

Thanks,
Aniket


On 1 August 2014 13:55, Aniket Bhatnagar <aniket.bhatna...@gmail.com> wrote:

> Sometimes it is useful to convert a RDD into a DStream for testing
> purposes (generating DStreams from historical data, etc). Is there an easy
> way to do this?
>
> I could come up with the following inefficient way but no sure if there is
> a better way to achieve this. Thoughts?
>
> class RDDExtension[T](rdd: RDD[T]) {
>
>   def chunked(chunkSize: Int): RDD[Seq[T]] = {
>     rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
>   }
>
>   def skipFirst(): RDD[T] = {
>     rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
>   }
>
>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
> slideDurationMilli: Option[Long] = None): DStream[T] = {
>     new InputDStream[T](streamingContext) {
>
>       @volatile private var currentRDD: RDD[Seq[T]] =
> rdd.chunked(chunkSize)
>
>       override def start(): Unit = {}
>
>       override def stop(): Unit = {}
>
>       override def compute(validTime: Time): Option[RDD[T]] = {
>         val chunk = currentRDD.take(1)
>         currentRDD = currentRDD.skipFirst()
>         Some(rdd.sparkContext.parallelize(chunk))
>       }
>
>       override def slideDuration = {
>         slideDurationMilli.map(duration => new Duration(duration)).
>           getOrElse(super.slideDuration)
>       }
>     }
>
> }
>

Reply via email to