I have a similar requirement. But instead of grouping it by chunkSize, I
would have the timeStamp be part of the data. So the function I want has
the following signature:

  // RDD of (timestamp, value)
  def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc:
StreamingContext): DStream[T]

And DStream should respect the timestamp part. This is important for
simulation, right?

Do you have any good solution for this?

Jianshi


On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Hey Aniket,
>
> Great thoughts! I understand the usecase. But as you have realized
> yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD
> operations are defined to be scan based, it is not efficient to define RDD
> based on slices of data within a partition of another RDD, using pure RDD
> transformations. What you have done is a decent, and probably the only
> feasible solution, with its limitations.
>
> Also the requirements of converting a batch of data to a stream of data
> can be pretty diverse. What rate, what # of events per batch, how many
> batches, is it efficient? Hence, it is not trivial to define a good, clean
> public API for that. If any one has any thoughts, ideas, etc on this, you
> are more than welcome to share them.
>
> TD
>
>
> On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> The use case for converting RDD into DStream is that I want to simulate a
>> stream from an already persisted data for testing analytics. It is trivial
>> to create a RDD from any persisted data but not so much for DStream.
>> Therefore, my idea to create DStream from RDD. For example, lets say you
>> are trying to implement analytics on time series data using Lambda
>> architecture. This means you would have to implement the same analytics on
>> streaming data (in streaming mode) as well as persisted data (in batch
>> mode). The workflow for implementing the anlytics would be to first
>> implement it in batch mode using RDD operations and then simulate stream to
>> test the analytics in stream mode. The simulated stream should produce the
>> elements at a specified rate. So the solution maybe to read data in a RDD,
>> split (chunk) it into multiple RDDs with each RDD having the size of
>> elements that need to be streamed per time unit and then finally stream
>> each RDD using the compute function.
>>
>> The problem with using QueueInputDStream is that it will stream data as
>> per the batch duration specified in the streaming context and one cannot
>> specify a custom slide duration. Moreover, the class QueueInputDStream is
>> private to streaming package, so I can't really use it/extend it from an
>> external package. Also, I could not find a good solution split a RDD into
>> equal sized smaller RDDs that can be fed into an extended version of
>> QueueInputDStream.
>>
>> Finally, here is what I came up with:
>>
>> class RDDExtension[T: ClassTag](rdd: RDD[T]) {
>>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
>> slideDurationMilli: Option[Long] = None): DStream[T] = {
>>     new InputDStream[T](streamingContext) {
>>
>>       private val iterator = rdd.toLocalIterator // WARNING: each
>> partition much fit in RAM of local machine.
>>       private val grouped = iterator.grouped(chunkSize)
>>
>>       override def start(): Unit = {}
>>
>>       override def stop(): Unit = {}
>>
>>       override def compute(validTime: Time): Option[RDD[T]] = {
>>         if (grouped.hasNext) {
>>           Some(rdd.sparkContext.parallelize(grouped.next()))
>>         } else {
>>           None
>>         }
>>       }
>>
>>       override def slideDuration = {
>>         slideDurationMilli.map(duration => new Duration(duration)).
>>           getOrElse(super.slideDuration)
>>       }
>>     }
>> }
>>
>> This aims to stream chunkSize elements every slideDurationMilli
>> milliseconds (defaults to batch size in streaming context). It's still not
>> perfect (for example, the streaming is not precise) but given that this
>> will only be used for testing purposes, I don't look for ways to further
>> optimize it.
>>
>> Thanks,
>> Aniket
>>
>>
>>
>> On 2 August 2014 04:07, Mayur Rustagi <mayur.rust...@gmail.com> wrote:
>>
>>> Nice question :)
>>> Ideally you should use a queuestream interface to push RDD into a queue
>>> & then spark streaming can handle the rest.
>>> Though why are you looking to convert RDD to DStream, another workaround
>>> folks use is to source DStream from folders & move files that they need
>>> reprocessed back into the folder, its a hack but much less headache .
>>>
>>> Mayur Rustagi
>>> Ph: +1 (760) 203 3257
>>> http://www.sigmoidanalytics.com
>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>
>>>
>>>
>>> On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar <
>>> aniket.bhatna...@gmail.com> wrote:
>>>
>>>> Hi everyone
>>>>
>>>> I haven't been receiving replies to my queries in the distribution
>>>> list. Not pissed but I am actually curious to know if my messages are
>>>> actually going through or not. Can someone please confirm that my msgs are
>>>> getting delivered via this distribution list?
>>>>
>>>> Thanks,
>>>> Aniket
>>>>
>>>>
>>>> On 1 August 2014 13:55, Aniket Bhatnagar <aniket.bhatna...@gmail.com>
>>>> wrote:
>>>>
>>>>> Sometimes it is useful to convert a RDD into a DStream for testing
>>>>> purposes (generating DStreams from historical data, etc). Is there an easy
>>>>> way to do this?
>>>>>
>>>>> I could come up with the following inefficient way but no sure if
>>>>> there is a better way to achieve this. Thoughts?
>>>>>
>>>>> class RDDExtension[T](rdd: RDD[T]) {
>>>>>
>>>>>   def chunked(chunkSize: Int): RDD[Seq[T]] = {
>>>>>     rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
>>>>>   }
>>>>>
>>>>>   def skipFirst(): RDD[T] = {
>>>>>     rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
>>>>>   }
>>>>>
>>>>>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
>>>>> slideDurationMilli: Option[Long] = None): DStream[T] = {
>>>>>     new InputDStream[T](streamingContext) {
>>>>>
>>>>>       @volatile private var currentRDD: RDD[Seq[T]] =
>>>>> rdd.chunked(chunkSize)
>>>>>
>>>>>       override def start(): Unit = {}
>>>>>
>>>>>       override def stop(): Unit = {}
>>>>>
>>>>>       override def compute(validTime: Time): Option[RDD[T]] = {
>>>>>         val chunk = currentRDD.take(1)
>>>>>         currentRDD = currentRDD.skipFirst()
>>>>>         Some(rdd.sparkContext.parallelize(chunk))
>>>>>       }
>>>>>
>>>>>       override def slideDuration = {
>>>>>         slideDurationMilli.map(duration => new Duration(duration)).
>>>>>           getOrElse(super.slideDuration)
>>>>>       }
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>
>>>>
>>>
>>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Reply via email to