Sure, let's still focus on the streaming simulation use case. It's a very useful problem to solve.
If we're going to use the same Spark-streaming core for the simulation, the most simple way is to have a globally sorted RDDs and use ssc.queueStream. Thus collecting the Key part to driver is probably necessary. I've done offline simulation in Pig and it's absolutely non-trivial and error prone, also I had to have multiple copies of data due to overlaps of window. Therefore I would prefer using the existing streaming implementation for the simulation rather than a special DStream. Jianshi On Mon, Oct 27, 2014 at 4:44 PM, Shao, Saisai <saisai.s...@intel.com> wrote: > Yes, I understand what you want, but maybe hard to achieve without > collecting back to driver node. > > > > Besides, can we just think of another way to do it. > > > > Thanks > > Jerry > > > > *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] > *Sent:* Monday, October 27, 2014 4:07 PM > > *To:* Shao, Saisai > *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) > *Subject:* Re: RDD to DStream > > > > Yeah, you're absolutely right Saisai. > > > > My point is we should allow this kind of logic in RDD, let's say > transforming type RDD[(Key, Iterable[T])] to Seq[(Key, RDD[T])]. > > > > Make sense? > > > > Jianshi > > > > On Mon, Oct 27, 2014 at 3:56 PM, Shao, Saisai <saisai.s...@intel.com> > wrote: > > I think what you want is to make each bucket as a new RDD as what you > mentioned in Pig syntax. > > > > gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded > timestamp for each bucket > > > > From my understanding, currently in Spark there’s no such kind of API to > achieve this, maybe you have to create a customized RDD by yourself. > > > > For the code why cannot executed, > > > > .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... > > > > This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure > to execute in remote side, which obviously do not has SparkContext, I think > Spark cannot support nested RDD in closure. > > > > Thanks > > Jerry > > > > *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] > *Sent:* Monday, October 27, 2014 3:30 PM > > > *To:* Shao, Saisai > *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) > *Subject:* Re: RDD to DStream > > > > Ok, back to Scala code, I'm wondering why I cannot do this: > > > > data.groupBy(timestamp / window) > > .sortByKey() // no sort method available here > > .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... > > .collect() // returns Seq[RDD[(Timestamp, T)]] > > > > > > Jianshi > > > > On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang <jianshi.hu...@gmail.com> > wrote: > > You're absolutely right, it's not 'scalable' as I'm using collect(). > > > > However, it's important to have the RDDs ordered by the timestamp of the > time window (groupBy puts data to corresponding timewindow). > > > > It's fairly easy to do in Pig, but somehow I have no idea how to express > it in RDD... > > > > Something like (in Pig, pseudo code :): > > > > g = GROUP data BY (timestamp / windowSize) // group data into buckets in > the same time window > > gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded > timestamp for each bucket > > stream = FOREACH gs GENERATE toRDD(g) > > > > No idea how to do the order by part in RDD. > > > > Jianshi > > > > > > On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai <saisai.s...@intel.com> > wrote: > > I think you solution may not be extendable if the data size is > increasing, since you have to collect all your data back to driver node, so > the memory usage of driver will be a problem. > > > > why not filter out specific time-range data as a rdd, after filtering the > whole time-range, you will get a series of RDD with timestamp divided, and > then feed into queue. Still it is not an efficient way, but it is not > limited by driver memory. > > > > Also there may have some other solutions like shuffle to arrange data, but > you cannot avoid scanning the whole data. Basically we need to avoid > fetching large amount of data back to driver. > > > > > > Thanks > > Jerry > > > > *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] > *Sent:* Monday, October 27, 2014 2:39 PM > *To:* Shao, Saisai > *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) > > > *Subject:* Re: RDD to DStream > > > > Hi Saisai, > > > > I understand it's non-trivial, but the requirement of simulating offline > data as stream is also fair. :) > > > > I just wrote a prototype, however, I need to do a collect and a bunch of > parallelize... > > > > // RDD of (timestamp, value) > > def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, > ssc: StreamingContext): DStream[T] = { > > val sc = ssc.sparkContext > > val d = data.groupBy(_._1 / timeWindow) > > .map(e => (e._1, e._2.toSeq.sortBy(_._1).map(_._2))) > > .collect() > > .map(e => (e._1, sc.parallelize(e._2))) > > .sortBy(_._1) > > val queue = new mutable.SynchronizedQueue[RDD[T]] > > > > queue ++= d.map(_._2) > > > > ssc.queueStream(queue) > > } > > > > Any way to get a list of RDDs sorted by group key just after groupBy? > > > > Jianshi > > > > On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai <saisai.s...@intel.com> > wrote: > > Hi Jianshi, > > > > For simulation purpose, I think you can try ConstantInputDStream and > QueueInputDStream to convert one RDD or series of RDD into DStream, the > first one output the same RDD in each batch duration, and the second one > just output a RDD in a queue in each batch duration. You can take a look at > it. > > > > For your case, I think TD’s comment are quite meaningful, it’s not trivial > to do so, often requires a job to scan all the records, it’s also not the > design purpose of Spark Streaming, I guess it’s hard to achieve what you > want. > > > > > > Thanks > > Jerry > > > > *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] > *Sent:* Monday, October 27, 2014 1:42 PM > *To:* Tathagata Das > *Cc:* Aniket Bhatnagar; user@spark.apache.org > *Subject:* Re: RDD to DStream > > > > I have a similar requirement. But instead of grouping it by chunkSize, I > would have the timeStamp be part of the data. So the function I want has > the following signature: > > > > // RDD of (timestamp, value) > > def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit > ssc: StreamingContext): DStream[T] > > > > And DStream should respect the timestamp part. This is important for > simulation, right? > > > > Do you have any good solution for this? > > > > Jianshi > > > > > > On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das <tathagata.das1...@gmail.com> > wrote: > > Hey Aniket, > > > > Great thoughts! I understand the usecase. But as you have realized > yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD > operations are defined to be scan based, it is not efficient to define RDD > based on slices of data within a partition of another RDD, using pure RDD > transformations. What you have done is a decent, and probably the only > feasible solution, with its limitations. > > > > Also the requirements of converting a batch of data to a stream of data > can be pretty diverse. What rate, what # of events per batch, how many > batches, is it efficient? Hence, it is not trivial to define a good, clean > public API for that. If any one has any thoughts, ideas, etc on this, you > are more than welcome to share them. > > > > TD > > > > On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar < > aniket.bhatna...@gmail.com> wrote: > > The use case for converting RDD into DStream is that I want to simulate > a stream from an already persisted data for testing analytics. It is > trivial to create a RDD from any persisted data but not so much for > DStream. Therefore, my idea to create DStream from RDD. For example, lets > say you are trying to implement analytics on time series data using Lambda > architecture. This means you would have to implement the same analytics on > streaming data (in streaming mode) as well as persisted data (in batch > mode). The workflow for implementing the anlytics would be to first > implement it in batch mode using RDD operations and then simulate stream to > test the analytics in stream mode. The simulated stream should produce the > elements at a specified rate. So the solution maybe to read data in a RDD, > split (chunk) it into multiple RDDs with each RDD having the size of > elements that need to be streamed per time unit and then finally stream > each RDD using the compute function. > > > > The problem with using QueueInputDStream is that it will stream data as > per the batch duration specified in the streaming context and one cannot > specify a custom slide duration. Moreover, the class QueueInputDStream is > private to streaming package, so I can't really use it/extend it from an > external package. Also, I could not find a good solution split a RDD into > equal sized smaller RDDs that can be fed into an extended version of > QueueInputDStream. > > > > Finally, here is what I came up with: > > > > class RDDExtension[T: ClassTag](rdd: RDD[T]) { > > def toStream(streamingContext: StreamingContext, chunkSize: Int, > slideDurationMilli: Option[Long] = None): DStream[T] = { > > new InputDStream[T](streamingContext) { > > > > private val iterator = rdd.toLocalIterator // WARNING: each > partition much fit in RAM of local machine. > > private val grouped = iterator.grouped(chunkSize) > > > > override def start(): Unit = {} > > > > override def stop(): Unit = {} > > > > override def compute(validTime: Time): Option[RDD[T]] = { > > if (grouped.hasNext) { > > Some(rdd.sparkContext.parallelize(grouped.next())) > > } else { > > None > > } > > } > > > > override def slideDuration = { > > slideDurationMilli.map(duration => new Duration(duration)). > > getOrElse(super.slideDuration) > > } > > } > > } > > > > This aims to stream chunkSize elements every slideDurationMilli > milliseconds (defaults to batch size in streaming context). It's still not > perfect (for example, the streaming is not precise) but given that this > will only be used for testing purposes, I don't look for ways to further > optimize it. > > > > Thanks, > > Aniket > > > > > > On 2 August 2014 04:07, Mayur Rustagi <mayur.rust...@gmail.com> wrote: > > Nice question :) > > Ideally you should use a queuestream interface to push RDD into a queue & > then spark streaming can handle the rest. > > Though why are you looking to convert RDD to DStream, another workaround > folks use is to source DStream from folders & move files that they need > reprocessed back into the folder, its a hack but much less headache . > > > Mayur Rustagi > Ph: +1 (760) 203 3257 > > http://www.sigmoidanalytics.com > > @mayur_rustagi <https://twitter.com/mayur_rustagi> > > > > > > On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar < > aniket.bhatna...@gmail.com> wrote: > > Hi everyone > > > > I haven't been receiving replies to my queries in the distribution list. > Not pissed but I am actually curious to know if my messages are actually > going through or not. Can someone please confirm that my msgs are getting > delivered via this distribution list? > > > > Thanks, > > Aniket > > > > On 1 August 2014 13:55, Aniket Bhatnagar <aniket.bhatna...@gmail.com> > wrote: > > Sometimes it is useful to convert a RDD into a DStream for testing > purposes (generating DStreams from historical data, etc). Is there an easy > way to do this? > > > > I could come up with the following inefficient way but no sure if there is > a better way to achieve this. Thoughts? > > > > class RDDExtension[T](rdd: RDD[T]) { > > > > def chunked(chunkSize: Int): RDD[Seq[T]] = { > > rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize)) > > } > > > > def skipFirst(): RDD[T] = { > > rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1) > > } > > > > def toStream(streamingContext: StreamingContext, chunkSize: Int, > slideDurationMilli: Option[Long] = None): DStream[T] = { > > new InputDStream[T](streamingContext) { > > > > @volatile private var currentRDD: RDD[Seq[T]] = > rdd.chunked(chunkSize) > > > > override def start(): Unit = {} > > > > override def stop(): Unit = {} > > > > override def compute(validTime: Time): Option[RDD[T]] = { > > val chunk = currentRDD.take(1) > > currentRDD = currentRDD.skipFirst() > > Some(rdd.sparkContext.parallelize(chunk)) > > } > > > > override def slideDuration = { > > slideDurationMilli.map(duration => new Duration(duration)). > > getOrElse(super.slideDuration) > > } > > } > > > > } > > > > > > > > > > > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > > > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > > > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > > > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > > > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/