Ok, back to Scala code, I'm wondering why I cannot do this: data.groupBy(timestamp / window) .sortByKey() // no sort method available here .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... .collect() // returns Seq[RDD[(Timestamp, T)]]
Jianshi On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang <jianshi.hu...@gmail.com> wrote: > You're absolutely right, it's not 'scalable' as I'm using collect(). > > However, it's important to have the RDDs ordered by the timestamp of the > time window (groupBy puts data to corresponding timewindow). > > It's fairly easy to do in Pig, but somehow I have no idea how to express > it in RDD... > > Something like (in Pig, pseudo code :): > > g = GROUP data BY (timestamp / windowSize) // group data into buckets in > the same time window > gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded > timestamp for each bucket > stream = FOREACH gs GENERATE toRDD(g) > > No idea how to do the order by part in RDD. > > Jianshi > > > On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai <saisai.s...@intel.com> > wrote: > >> I think you solution may not be extendable if the data size is >> increasing, since you have to collect all your data back to driver node, so >> the memory usage of driver will be a problem. >> >> >> >> why not filter out specific time-range data as a rdd, after filtering the >> whole time-range, you will get a series of RDD with timestamp divided, and >> then feed into queue. Still it is not an efficient way, but it is not >> limited by driver memory. >> >> >> >> Also there may have some other solutions like shuffle to arrange data, >> but you cannot avoid scanning the whole data. Basically we need to avoid >> fetching large amount of data back to driver. >> >> >> >> >> >> Thanks >> >> Jerry >> >> >> >> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] >> *Sent:* Monday, October 27, 2014 2:39 PM >> *To:* Shao, Saisai >> *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) >> >> *Subject:* Re: RDD to DStream >> >> >> >> Hi Saisai, >> >> >> >> I understand it's non-trivial, but the requirement of simulating offline >> data as stream is also fair. :) >> >> >> >> I just wrote a prototype, however, I need to do a collect and a bunch of >> parallelize... >> >> >> >> // RDD of (timestamp, value) >> >> def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, >> ssc: StreamingContext): DStream[T] = { >> >> val sc = ssc.sparkContext >> >> val d = data.groupBy(_._1 / timeWindow) >> >> .map(e => (e._1, e._2.toSeq.sortBy(_._1).map(_._2))) >> >> .collect() >> >> .map(e => (e._1, sc.parallelize(e._2))) >> >> .sortBy(_._1) >> >> val queue = new mutable.SynchronizedQueue[RDD[T]] >> >> >> >> queue ++= d.map(_._2) >> >> >> >> ssc.queueStream(queue) >> >> } >> >> >> >> Any way to get a list of RDDs sorted by group key just after groupBy? >> >> >> >> Jianshi >> >> >> >> On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai <saisai.s...@intel.com> >> wrote: >> >> Hi Jianshi, >> >> >> >> For simulation purpose, I think you can try ConstantInputDStream and >> QueueInputDStream to convert one RDD or series of RDD into DStream, the >> first one output the same RDD in each batch duration, and the second one >> just output a RDD in a queue in each batch duration. You can take a look at >> it. >> >> >> >> For your case, I think TD’s comment are quite meaningful, it’s not >> trivial to do so, often requires a job to scan all the records, it’s also >> not the design purpose of Spark Streaming, I guess it’s hard to achieve >> what you want. >> >> >> >> >> >> Thanks >> >> Jerry >> >> >> >> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] >> *Sent:* Monday, October 27, 2014 1:42 PM >> *To:* Tathagata Das >> *Cc:* Aniket Bhatnagar; user@spark.apache.org >> *Subject:* Re: RDD to DStream >> >> >> >> I have a similar requirement. But instead of grouping it by chunkSize, I >> would have the timeStamp be part of the data. So the function I want has >> the following signature: >> >> >> >> // RDD of (timestamp, value) >> >> def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit >> ssc: StreamingContext): DStream[T] >> >> >> >> And DStream should respect the timestamp part. This is important for >> simulation, right? >> >> >> >> Do you have any good solution for this? >> >> >> >> Jianshi >> >> >> >> >> >> On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >> Hey Aniket, >> >> >> >> Great thoughts! I understand the usecase. But as you have realized >> yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD >> operations are defined to be scan based, it is not efficient to define RDD >> based on slices of data within a partition of another RDD, using pure RDD >> transformations. What you have done is a decent, and probably the only >> feasible solution, with its limitations. >> >> >> >> Also the requirements of converting a batch of data to a stream of data >> can be pretty diverse. What rate, what # of events per batch, how many >> batches, is it efficient? Hence, it is not trivial to define a good, clean >> public API for that. If any one has any thoughts, ideas, etc on this, you >> are more than welcome to share them. >> >> >> >> TD >> >> >> >> On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar < >> aniket.bhatna...@gmail.com> wrote: >> >> The use case for converting RDD into DStream is that I want to simulate >> a stream from an already persisted data for testing analytics. It is >> trivial to create a RDD from any persisted data but not so much for >> DStream. Therefore, my idea to create DStream from RDD. For example, lets >> say you are trying to implement analytics on time series data using Lambda >> architecture. This means you would have to implement the same analytics on >> streaming data (in streaming mode) as well as persisted data (in batch >> mode). The workflow for implementing the anlytics would be to first >> implement it in batch mode using RDD operations and then simulate stream to >> test the analytics in stream mode. The simulated stream should produce the >> elements at a specified rate. So the solution maybe to read data in a RDD, >> split (chunk) it into multiple RDDs with each RDD having the size of >> elements that need to be streamed per time unit and then finally stream >> each RDD using the compute function. >> >> >> >> The problem with using QueueInputDStream is that it will stream data as >> per the batch duration specified in the streaming context and one cannot >> specify a custom slide duration. Moreover, the class QueueInputDStream is >> private to streaming package, so I can't really use it/extend it from an >> external package. Also, I could not find a good solution split a RDD into >> equal sized smaller RDDs that can be fed into an extended version of >> QueueInputDStream. >> >> >> >> Finally, here is what I came up with: >> >> >> >> class RDDExtension[T: ClassTag](rdd: RDD[T]) { >> >> def toStream(streamingContext: StreamingContext, chunkSize: Int, >> slideDurationMilli: Option[Long] = None): DStream[T] = { >> >> new InputDStream[T](streamingContext) { >> >> >> >> private val iterator = rdd.toLocalIterator // WARNING: each >> partition much fit in RAM of local machine. >> >> private val grouped = iterator.grouped(chunkSize) >> >> >> >> override def start(): Unit = {} >> >> >> >> override def stop(): Unit = {} >> >> >> >> override def compute(validTime: Time): Option[RDD[T]] = { >> >> if (grouped.hasNext) { >> >> Some(rdd.sparkContext.parallelize(grouped.next())) >> >> } else { >> >> None >> >> } >> >> } >> >> >> >> override def slideDuration = { >> >> slideDurationMilli.map(duration => new Duration(duration)). >> >> getOrElse(super.slideDuration) >> >> } >> >> } >> >> } >> >> >> >> This aims to stream chunkSize elements every slideDurationMilli >> milliseconds (defaults to batch size in streaming context). It's still not >> perfect (for example, the streaming is not precise) but given that this >> will only be used for testing purposes, I don't look for ways to further >> optimize it. >> >> >> >> Thanks, >> >> Aniket >> >> >> >> >> >> On 2 August 2014 04:07, Mayur Rustagi <mayur.rust...@gmail.com> wrote: >> >> Nice question :) >> >> Ideally you should use a queuestream interface to push RDD into a queue & >> then spark streaming can handle the rest. >> >> Though why are you looking to convert RDD to DStream, another workaround >> folks use is to source DStream from folders & move files that they need >> reprocessed back into the folder, its a hack but much less headache . >> >> >> Mayur Rustagi >> Ph: +1 (760) 203 3257 >> >> http://www.sigmoidanalytics.com >> >> @mayur_rustagi <https://twitter.com/mayur_rustagi> >> >> >> >> >> >> On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar < >> aniket.bhatna...@gmail.com> wrote: >> >> Hi everyone >> >> >> >> I haven't been receiving replies to my queries in the distribution list. >> Not pissed but I am actually curious to know if my messages are actually >> going through or not. Can someone please confirm that my msgs are getting >> delivered via this distribution list? >> >> >> >> Thanks, >> >> Aniket >> >> >> >> On 1 August 2014 13:55, Aniket Bhatnagar <aniket.bhatna...@gmail.com> >> wrote: >> >> Sometimes it is useful to convert a RDD into a DStream for testing >> purposes (generating DStreams from historical data, etc). Is there an easy >> way to do this? >> >> >> >> I could come up with the following inefficient way but no sure if there >> is a better way to achieve this. Thoughts? >> >> >> >> class RDDExtension[T](rdd: RDD[T]) { >> >> >> >> def chunked(chunkSize: Int): RDD[Seq[T]] = { >> >> rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize)) >> >> } >> >> >> >> def skipFirst(): RDD[T] = { >> >> rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1) >> >> } >> >> >> >> def toStream(streamingContext: StreamingContext, chunkSize: Int, >> slideDurationMilli: Option[Long] = None): DStream[T] = { >> >> new InputDStream[T](streamingContext) { >> >> >> >> @volatile private var currentRDD: RDD[Seq[T]] = >> rdd.chunked(chunkSize) >> >> >> >> override def start(): Unit = {} >> >> >> >> override def stop(): Unit = {} >> >> >> >> override def compute(validTime: Time): Option[RDD[T]] = { >> >> val chunk = currentRDD.take(1) >> >> currentRDD = currentRDD.skipFirst() >> >> Some(rdd.sparkContext.parallelize(chunk)) >> >> } >> >> >> >> override def slideDuration = { >> >> slideDurationMilli.map(duration => new Duration(duration)). >> >> getOrElse(super.slideDuration) >> >> } >> >> } >> >> >> >> } >> >> >> >> >> >> >> >> >> >> >> >> >> >> -- >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> >> >> >> >> >> -- >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/