Hi Saisai,

I understand it's non-trivial, but the requirement of simulating offline
data as stream is also fair. :)

I just wrote a prototype, however, I need to do a collect and a bunch of
parallelize...

  // RDD of (timestamp, value)
  def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long,
ssc: StreamingContext): DStream[T] = {
    val sc = ssc.sparkContext
    val d = data.groupBy(_._1 / timeWindow)
                .map(e => (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))
                .collect()
                .map(e => (e._1, sc.parallelize(e._2)))
                .sortBy(_._1)
    val queue = new mutable.SynchronizedQueue[RDD[T]]

    queue ++= d.map(_._2)

    ssc.queueStream(queue)
  }

Any way to get a list of RDDs sorted by group key just after groupBy?

Jianshi

On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai <saisai.s...@intel.com> wrote:

>  Hi Jianshi,
>
>
>
> For simulation purpose, I think you can try ConstantInputDStream and
> QueueInputDStream to convert one RDD or series of RDD into DStream, the
> first one output the same RDD in each batch duration, and the second one
> just output a RDD in a queue in each batch duration. You can take a look at
> it.
>
>
>
> For your case, I think TD’s comment are quite meaningful, it’s not trivial
> to do so, often requires a job to scan all the records, it’s also not the
> design purpose of Spark Streaming, I guess it’s hard to achieve what you
> want.
>
>
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Monday, October 27, 2014 1:42 PM
> *To:* Tathagata Das
> *Cc:* Aniket Bhatnagar; user@spark.apache.org
> *Subject:* Re: RDD to DStream
>
>
>
> I have a similar requirement. But instead of grouping it by chunkSize, I
> would have the timeStamp be part of the data. So the function I want has
> the following signature:
>
>
>
>   // RDD of (timestamp, value)
>
>   def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit
> ssc: StreamingContext): DStream[T]
>
>
>
> And DStream should respect the timestamp part. This is important for
> simulation, right?
>
>
>
> Do you have any good solution for this?
>
>
>
> Jianshi
>
>
>
>
>
> On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das <tathagata.das1...@gmail.com>
> wrote:
>
>  Hey Aniket,
>
>
>
> Great thoughts! I understand the usecase. But as you have realized
> yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD
> operations are defined to be scan based, it is not efficient to define RDD
> based on slices of data within a partition of another RDD, using pure RDD
> transformations. What you have done is a decent, and probably the only
> feasible solution, with its limitations.
>
>
>
> Also the requirements of converting a batch of data to a stream of data
> can be pretty diverse. What rate, what # of events per batch, how many
> batches, is it efficient? Hence, it is not trivial to define a good, clean
> public API for that. If any one has any thoughts, ideas, etc on this, you
> are more than welcome to share them.
>
>
>
> TD
>
>
>
> On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>  The use case for converting RDD into DStream is that I want to simulate
> a stream from an already persisted data for testing analytics. It is
> trivial to create a RDD from any persisted data but not so much for
> DStream. Therefore, my idea to create DStream from RDD. For example, lets
> say you are trying to implement analytics on time series data using Lambda
> architecture. This means you would have to implement the same analytics on
> streaming data (in streaming mode) as well as persisted data (in batch
> mode). The workflow for implementing the anlytics would be to first
> implement it in batch mode using RDD operations and then simulate stream to
> test the analytics in stream mode. The simulated stream should produce the
> elements at a specified rate. So the solution maybe to read data in a RDD,
> split (chunk) it into multiple RDDs with each RDD having the size of
> elements that need to be streamed per time unit and then finally stream
> each RDD using the compute function.
>
>
>
> The problem with using QueueInputDStream is that it will stream data as
> per the batch duration specified in the streaming context and one cannot
> specify a custom slide duration. Moreover, the class QueueInputDStream is
> private to streaming package, so I can't really use it/extend it from an
> external package. Also, I could not find a good solution split a RDD into
> equal sized smaller RDDs that can be fed into an extended version of
> QueueInputDStream.
>
>
>
> Finally, here is what I came up with:
>
>
>
> class RDDExtension[T: ClassTag](rdd: RDD[T]) {
>
>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
> slideDurationMilli: Option[Long] = None): DStream[T] = {
>
>     new InputDStream[T](streamingContext) {
>
>
>
>       private val iterator = rdd.toLocalIterator // WARNING: each
> partition much fit in RAM of local machine.
>
>       private val grouped = iterator.grouped(chunkSize)
>
>
>
>       override def start(): Unit = {}
>
>
>
>       override def stop(): Unit = {}
>
>
>
>       override def compute(validTime: Time): Option[RDD[T]] = {
>
>         if (grouped.hasNext) {
>
>           Some(rdd.sparkContext.parallelize(grouped.next()))
>
>         } else {
>
>           None
>
>         }
>
>       }
>
>
>
>       override def slideDuration = {
>
>         slideDurationMilli.map(duration => new Duration(duration)).
>
>           getOrElse(super.slideDuration)
>
>       }
>
>     }
>
> }
>
>
>
> This aims to stream chunkSize elements every slideDurationMilli
> milliseconds (defaults to batch size in streaming context). It's still not
> perfect (for example, the streaming is not precise) but given that this
> will only be used for testing purposes, I don't look for ways to further
> optimize it.
>
>
>
> Thanks,
>
> Aniket
>
>
>
>
>
> On 2 August 2014 04:07, Mayur Rustagi <mayur.rust...@gmail.com> wrote:
>
>  Nice question :)
>
> Ideally you should use a queuestream interface to push RDD into a queue &
> then spark streaming can handle the rest.
>
> Though why are you looking to convert RDD to DStream, another workaround
> folks use is to source DStream from folders & move files that they need
> reprocessed back into the folder, its a hack but much less headache .
>
>
>   Mayur Rustagi
> Ph: +1 (760) 203 3257
>
> http://www.sigmoidanalytics.com
>
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
>
>
> On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>  Hi everyone
>
>
>
> I haven't been receiving replies to my queries in the distribution list.
> Not pissed but I am actually curious to know if my messages are actually
> going through or not. Can someone please confirm that my msgs are getting
> delivered via this distribution list?
>
>
>
> Thanks,
>
> Aniket
>
>
>
> On 1 August 2014 13:55, Aniket Bhatnagar <aniket.bhatna...@gmail.com>
> wrote:
>
>  Sometimes it is useful to convert a RDD into a DStream for testing
> purposes (generating DStreams from historical data, etc). Is there an easy
> way to do this?
>
>
>
> I could come up with the following inefficient way but no sure if there is
> a better way to achieve this. Thoughts?
>
>
>
> class RDDExtension[T](rdd: RDD[T]) {
>
>
>
>   def chunked(chunkSize: Int): RDD[Seq[T]] = {
>
>     rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
>
>   }
>
>
>
>   def skipFirst(): RDD[T] = {
>
>     rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
>
>   }
>
>
>
>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
> slideDurationMilli: Option[Long] = None): DStream[T] = {
>
>     new InputDStream[T](streamingContext) {
>
>
>
>       @volatile private var currentRDD: RDD[Seq[T]] =
> rdd.chunked(chunkSize)
>
>
>
>       override def start(): Unit = {}
>
>
>
>       override def stop(): Unit = {}
>
>
>
>       override def compute(validTime: Time): Option[RDD[T]] = {
>
>         val chunk = currentRDD.take(1)
>
>         currentRDD = currentRDD.skipFirst()
>
>         Some(rdd.sparkContext.parallelize(chunk))
>
>       }
>
>
>
>       override def slideDuration = {
>
>         slideDurationMilli.map(duration => new Duration(duration)).
>
>           getOrElse(super.slideDuration)
>
>       }
>
>     }
>
>
>
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Reply via email to