Ok, back to Scala code, I'm wondering why I cannot do this:

data.groupBy(timestamp / window)
  .sortByKey()  // no sort method available here
  .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...
  .collect() // returns Seq[RDD[(Timestamp, T)]]


Jianshi

On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang <jianshi.hu...@gmail.com>
wrote:

> You're absolutely right, it's not 'scalable' as I'm using collect().
>
> However, it's important to have the RDDs ordered by the timestamp of the
> time window (groupBy puts data to corresponding timewindow).
>
> It's fairly easy to do in Pig, but somehow I have no idea how to express
> it in RDD...
>
> Something like (in Pig, pseudo code :):
>
> g = GROUP data BY (timestamp / windowSize)  // group data into buckets in
> the same time window
> gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
> timestamp for each bucket
> stream = FOREACH gs GENERATE toRDD(g)
>
> No idea how to do the order by part in RDD.
>
> Jianshi
>
>
> On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai <saisai.s...@intel.com>
> wrote:
>
>>  I think you solution may not  be extendable if the data size is
>> increasing, since you have to collect all your data back to driver node, so
>> the memory usage of driver will be a problem.
>>
>>
>>
>> why not filter out specific time-range data as a rdd, after filtering the
>> whole time-range, you will get a series of RDD with timestamp divided, and
>> then feed into queue. Still it is not an efficient way, but  it is not
>> limited by driver memory.
>>
>>
>>
>> Also there may have some other solutions like shuffle to arrange data,
>> but you cannot avoid scanning the whole data. Basically we need to avoid
>> fetching large amount of data back to driver.
>>
>>
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>> *Sent:* Monday, October 27, 2014 2:39 PM
>> *To:* Shao, Saisai
>> *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
>>
>> *Subject:* Re: RDD to DStream
>>
>>
>>
>> Hi Saisai,
>>
>>
>>
>> I understand it's non-trivial, but the requirement of simulating offline
>> data as stream is also fair. :)
>>
>>
>>
>> I just wrote a prototype, however, I need to do a collect and a bunch of
>> parallelize...
>>
>>
>>
>>   // RDD of (timestamp, value)
>>
>>   def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long,
>> ssc: StreamingContext): DStream[T] = {
>>
>>     val sc = ssc.sparkContext
>>
>>     val d = data.groupBy(_._1 / timeWindow)
>>
>>                 .map(e => (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))
>>
>>                 .collect()
>>
>>                 .map(e => (e._1, sc.parallelize(e._2)))
>>
>>                 .sortBy(_._1)
>>
>>     val queue = new mutable.SynchronizedQueue[RDD[T]]
>>
>>
>>
>>     queue ++= d.map(_._2)
>>
>>
>>
>>     ssc.queueStream(queue)
>>
>>   }
>>
>>
>>
>> Any way to get a list of RDDs sorted by group key just after groupBy?
>>
>>
>>
>> Jianshi
>>
>>
>>
>> On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai <saisai.s...@intel.com>
>> wrote:
>>
>>  Hi Jianshi,
>>
>>
>>
>> For simulation purpose, I think you can try ConstantInputDStream and
>> QueueInputDStream to convert one RDD or series of RDD into DStream, the
>> first one output the same RDD in each batch duration, and the second one
>> just output a RDD in a queue in each batch duration. You can take a look at
>> it.
>>
>>
>>
>> For your case, I think TD’s comment are quite meaningful, it’s not
>> trivial to do so, often requires a job to scan all the records, it’s also
>> not the design purpose of Spark Streaming, I guess it’s hard to achieve
>> what you want.
>>
>>
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>> *Sent:* Monday, October 27, 2014 1:42 PM
>> *To:* Tathagata Das
>> *Cc:* Aniket Bhatnagar; user@spark.apache.org
>> *Subject:* Re: RDD to DStream
>>
>>
>>
>> I have a similar requirement. But instead of grouping it by chunkSize, I
>> would have the timeStamp be part of the data. So the function I want has
>> the following signature:
>>
>>
>>
>>   // RDD of (timestamp, value)
>>
>>   def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit
>> ssc: StreamingContext): DStream[T]
>>
>>
>>
>> And DStream should respect the timestamp part. This is important for
>> simulation, right?
>>
>>
>>
>> Do you have any good solution for this?
>>
>>
>>
>> Jianshi
>>
>>
>>
>>
>>
>> On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>  Hey Aniket,
>>
>>
>>
>> Great thoughts! I understand the usecase. But as you have realized
>> yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD
>> operations are defined to be scan based, it is not efficient to define RDD
>> based on slices of data within a partition of another RDD, using pure RDD
>> transformations. What you have done is a decent, and probably the only
>> feasible solution, with its limitations.
>>
>>
>>
>> Also the requirements of converting a batch of data to a stream of data
>> can be pretty diverse. What rate, what # of events per batch, how many
>> batches, is it efficient? Hence, it is not trivial to define a good, clean
>> public API for that. If any one has any thoughts, ideas, etc on this, you
>> are more than welcome to share them.
>>
>>
>>
>> TD
>>
>>
>>
>> On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>  The use case for converting RDD into DStream is that I want to simulate
>> a stream from an already persisted data for testing analytics. It is
>> trivial to create a RDD from any persisted data but not so much for
>> DStream. Therefore, my idea to create DStream from RDD. For example, lets
>> say you are trying to implement analytics on time series data using Lambda
>> architecture. This means you would have to implement the same analytics on
>> streaming data (in streaming mode) as well as persisted data (in batch
>> mode). The workflow for implementing the anlytics would be to first
>> implement it in batch mode using RDD operations and then simulate stream to
>> test the analytics in stream mode. The simulated stream should produce the
>> elements at a specified rate. So the solution maybe to read data in a RDD,
>> split (chunk) it into multiple RDDs with each RDD having the size of
>> elements that need to be streamed per time unit and then finally stream
>> each RDD using the compute function.
>>
>>
>>
>> The problem with using QueueInputDStream is that it will stream data as
>> per the batch duration specified in the streaming context and one cannot
>> specify a custom slide duration. Moreover, the class QueueInputDStream is
>> private to streaming package, so I can't really use it/extend it from an
>> external package. Also, I could not find a good solution split a RDD into
>> equal sized smaller RDDs that can be fed into an extended version of
>> QueueInputDStream.
>>
>>
>>
>> Finally, here is what I came up with:
>>
>>
>>
>> class RDDExtension[T: ClassTag](rdd: RDD[T]) {
>>
>>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
>> slideDurationMilli: Option[Long] = None): DStream[T] = {
>>
>>     new InputDStream[T](streamingContext) {
>>
>>
>>
>>       private val iterator = rdd.toLocalIterator // WARNING: each
>> partition much fit in RAM of local machine.
>>
>>       private val grouped = iterator.grouped(chunkSize)
>>
>>
>>
>>       override def start(): Unit = {}
>>
>>
>>
>>       override def stop(): Unit = {}
>>
>>
>>
>>       override def compute(validTime: Time): Option[RDD[T]] = {
>>
>>         if (grouped.hasNext) {
>>
>>           Some(rdd.sparkContext.parallelize(grouped.next()))
>>
>>         } else {
>>
>>           None
>>
>>         }
>>
>>       }
>>
>>
>>
>>       override def slideDuration = {
>>
>>         slideDurationMilli.map(duration => new Duration(duration)).
>>
>>           getOrElse(super.slideDuration)
>>
>>       }
>>
>>     }
>>
>> }
>>
>>
>>
>> This aims to stream chunkSize elements every slideDurationMilli
>> milliseconds (defaults to batch size in streaming context). It's still not
>> perfect (for example, the streaming is not precise) but given that this
>> will only be used for testing purposes, I don't look for ways to further
>> optimize it.
>>
>>
>>
>> Thanks,
>>
>> Aniket
>>
>>
>>
>>
>>
>> On 2 August 2014 04:07, Mayur Rustagi <mayur.rust...@gmail.com> wrote:
>>
>>  Nice question :)
>>
>> Ideally you should use a queuestream interface to push RDD into a queue &
>> then spark streaming can handle the rest.
>>
>> Though why are you looking to convert RDD to DStream, another workaround
>> folks use is to source DStream from folders & move files that they need
>> reprocessed back into the folder, its a hack but much less headache .
>>
>>
>>   Mayur Rustagi
>> Ph: +1 (760) 203 3257
>>
>> http://www.sigmoidanalytics.com
>>
>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>
>>
>>
>>
>>
>> On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>  Hi everyone
>>
>>
>>
>> I haven't been receiving replies to my queries in the distribution list.
>> Not pissed but I am actually curious to know if my messages are actually
>> going through or not. Can someone please confirm that my msgs are getting
>> delivered via this distribution list?
>>
>>
>>
>> Thanks,
>>
>> Aniket
>>
>>
>>
>> On 1 August 2014 13:55, Aniket Bhatnagar <aniket.bhatna...@gmail.com>
>> wrote:
>>
>>  Sometimes it is useful to convert a RDD into a DStream for testing
>> purposes (generating DStreams from historical data, etc). Is there an easy
>> way to do this?
>>
>>
>>
>> I could come up with the following inefficient way but no sure if there
>> is a better way to achieve this. Thoughts?
>>
>>
>>
>> class RDDExtension[T](rdd: RDD[T]) {
>>
>>
>>
>>   def chunked(chunkSize: Int): RDD[Seq[T]] = {
>>
>>     rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
>>
>>   }
>>
>>
>>
>>   def skipFirst(): RDD[T] = {
>>
>>     rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
>>
>>   }
>>
>>
>>
>>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
>> slideDurationMilli: Option[Long] = None): DStream[T] = {
>>
>>     new InputDStream[T](streamingContext) {
>>
>>
>>
>>       @volatile private var currentRDD: RDD[Seq[T]] =
>> rdd.chunked(chunkSize)
>>
>>
>>
>>       override def start(): Unit = {}
>>
>>
>>
>>       override def stop(): Unit = {}
>>
>>
>>
>>       override def compute(validTime: Time): Option[RDD[T]] = {
>>
>>         val chunk = currentRDD.take(1)
>>
>>         currentRDD = currentRDD.skipFirst()
>>
>>         Some(rdd.sparkContext.parallelize(chunk))
>>
>>       }
>>
>>
>>
>>       override def slideDuration = {
>>
>>         slideDurationMilli.map(duration => new Duration(duration)).
>>
>>           getOrElse(super.slideDuration)
>>
>>       }
>>
>>     }
>>
>>
>>
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Reply via email to