Hi Jianshi,

For simulation purpose, I think you can try ConstantInputDStream and 
QueueInputDStream to convert one RDD or series of RDD into DStream, the first 
one output the same RDD in each batch duration, and the second one just output 
a RDD in a queue in each batch duration. You can take a look at it.

For your case, I think TD’s comment are quite meaningful, it’s not trivial to 
do so, often requires a job to scan all the records, it’s also not the design 
purpose of Spark Streaming, I guess it’s hard to achieve what you want.


Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 1:42 PM
To: Tathagata Das
Cc: Aniket Bhatnagar; user@spark.apache.org
Subject: Re: RDD to DStream

I have a similar requirement. But instead of grouping it by chunkSize, I would 
have the timeStamp be part of the data. So the function I want has the 
following signature:

  // RDD of (timestamp, value)
  def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: 
StreamingContext): DStream[T]

And DStream should respect the timestamp part. This is important for 
simulation, right?

Do you have any good solution for this?

Jianshi


On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das 
<tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>> wrote:
Hey Aniket,

Great thoughts! I understand the usecase. But as you have realized yourself it 
is not trivial to cleanly stream a RDD as a DStream. Since RDD operations are 
defined to be scan based, it is not efficient to define RDD based on slices of 
data within a partition of another RDD, using pure RDD transformations. What 
you have done is a decent, and probably the only feasible solution, with its 
limitations.

Also the requirements of converting a batch of data to a stream of data can be 
pretty diverse. What rate, what # of events per batch, how many batches, is it 
efficient? Hence, it is not trivial to define a good, clean public API for 
that. If any one has any thoughts, ideas, etc on this, you are more than 
welcome to share them.

TD

On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar 
<aniket.bhatna...@gmail.com<mailto:aniket.bhatna...@gmail.com>> wrote:
The use case for converting RDD into DStream is that I want to simulate a 
stream from an already persisted data for testing analytics. It is trivial to 
create a RDD from any persisted data but not so much for DStream. Therefore, my 
idea to create DStream from RDD. For example, lets say you are trying to 
implement analytics on time series data using Lambda architecture. This means 
you would have to implement the same analytics on streaming data (in streaming 
mode) as well as persisted data (in batch mode). The workflow for implementing 
the anlytics would be to first implement it in batch mode using RDD operations 
and then simulate stream to test the analytics in stream mode. The simulated 
stream should produce the elements at a specified rate. So the solution maybe 
to read data in a RDD, split (chunk) it into multiple RDDs with each RDD having 
the size of elements that need to be streamed per time unit and then finally 
stream each RDD using the compute function.

The problem with using QueueInputDStream is that it will stream data as per the 
batch duration specified in the streaming context and one cannot specify a 
custom slide duration. Moreover, the class QueueInputDStream is private to 
streaming package, so I can't really use it/extend it from an external package. 
Also, I could not find a good solution split a RDD into equal sized smaller 
RDDs that can be fed into an extended version of QueueInputDStream.

Finally, here is what I came up with:

class RDDExtension[T: ClassTag](rdd: RDD[T]) {
  def toStream(streamingContext: StreamingContext, chunkSize: Int, 
slideDurationMilli: Option[Long] = None): DStream[T] = {
    new InputDStream[T](streamingContext) {

      private val iterator = rdd.toLocalIterator // WARNING: each partition 
much fit in RAM of local machine.
      private val grouped = iterator.grouped(chunkSize)

      override def start(): Unit = {}

      override def stop(): Unit = {}

      override def compute(validTime: Time): Option[RDD[T]] = {
        if (grouped.hasNext) {
          Some(rdd.sparkContext.parallelize(grouped.next()))
        } else {
          None
        }
      }

      override def slideDuration = {
        slideDurationMilli.map(duration => new Duration(duration)).
          getOrElse(super.slideDuration)
      }
    }
}

This aims to stream chunkSize elements every slideDurationMilli milliseconds 
(defaults to batch size in streaming context). It's still not perfect (for 
example, the streaming is not precise) but given that this will only be used 
for testing purposes, I don't look for ways to further optimize it.

Thanks,
Aniket


On 2 August 2014 04:07, Mayur Rustagi 
<mayur.rust...@gmail.com<mailto:mayur.rust...@gmail.com>> wrote:
Nice question :)
Ideally you should use a queuestream interface to push RDD into a queue & then 
spark streaming can handle the rest.
Though why are you looking to convert RDD to DStream, another workaround folks 
use is to source DStream from folders & move files that they need reprocessed 
back into the folder, its a hack but much less headache .

Mayur Rustagi
Ph: +1 (760) 203 3257<tel:%2B1%20%28760%29%20203%203257>
http://www.sigmoidanalytics.com
@mayur_rustagi<https://twitter.com/mayur_rustagi>


On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar 
<aniket.bhatna...@gmail.com<mailto:aniket.bhatna...@gmail.com>> wrote:
Hi everyone

I haven't been receiving replies to my queries in the distribution list. Not 
pissed but I am actually curious to know if my messages are actually going 
through or not. Can someone please confirm that my msgs are getting delivered 
via this distribution list?

Thanks,
Aniket

On 1 August 2014 13:55, Aniket Bhatnagar 
<aniket.bhatna...@gmail.com<mailto:aniket.bhatna...@gmail.com>> wrote:
Sometimes it is useful to convert a RDD into a DStream for testing purposes 
(generating DStreams from historical data, etc). Is there an easy way to do 
this?

I could come up with the following inefficient way but no sure if there is a 
better way to achieve this. Thoughts?

class RDDExtension[T](rdd: RDD[T]) {

  def chunked(chunkSize: Int): RDD[Seq[T]] = {
    rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
  }

  def skipFirst(): RDD[T] = {
    rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
  }

  def toStream(streamingContext: StreamingContext, chunkSize: Int, 
slideDurationMilli: Option[Long] = None): DStream[T] = {
    new InputDStream[T](streamingContext) {

      @volatile private var currentRDD: RDD[Seq[T]] = rdd.chunked(chunkSize)

      override def start(): Unit = {}

      override def stop(): Unit = {}

      override def compute(validTime: Time): Option[RDD[T]] = {
        val chunk = currentRDD.take(1)
        currentRDD = currentRDD.skipFirst()
        Some(rdd.sparkContext.parallelize(chunk))
      }

      override def slideDuration = {
        slideDurationMilli.map(duration => new Duration(duration)).
          getOrElse(super.slideDuration)
      }
    }

}







--
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Reply via email to