Re: RDD to DStream

2014-11-12 Thread Jianshi Huang
I also discussed with Liancheng two weeks ago. And he suggested to use
toLocalIterator to collect partitions of RDD to driver (same order if RDD
is sorted), and then turn each partition to a RDD and put them in the queue.

So: To turn RDD[(timestamp, value)] to DStream

1) Group by timestamp/windowSize
2) Sort RDD by (group, timestamp)
3) Use toLocalIterator to collect each group/partition
4) Turn each group/partition to RDD and put them in a Queue
5) Use SparkStreamingContext.queueStream to consume the Queue[RDD] as
DStream

Looks good to me, will try it today. The downside is all data needs to be
collect to driver, is there a way to avoid doing this?

Thanks
Jianshi






On Mon, Oct 27, 2014 at 4:57 PM, Jianshi Huang 
wrote:

> Sure, let's still focus on the streaming simulation use case. It's a very
> useful problem to solve.
>
> If we're going to use the same Spark-streaming core for the simulation,
> the most simple way is to have a globally sorted RDDs and use
> ssc.queueStream. Thus collecting the Key part to driver is probably
> necessary.
>
> I've done offline simulation in Pig and it's absolutely non-trivial and
> error prone, also I had to have multiple copies of data due to overlaps of
> window.
>
> Therefore I would prefer using the existing streaming implementation for
> the simulation rather than a special DStream.
>
>
> Jianshi
>
>
>
> On Mon, Oct 27, 2014 at 4:44 PM, Shao, Saisai 
> wrote:
>
>>  Yes, I understand what you want, but maybe hard to achieve without
>> collecting back to driver node.
>>
>>
>>
>> Besides, can we just think of another way to do it.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>> *Sent:* Monday, October 27, 2014 4:07 PM
>>
>> *To:* Shao, Saisai
>> *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
>> *Subject:* Re: RDD to DStream
>>
>>
>>
>> Yeah, you're absolutely right Saisai.
>>
>>
>>
>> My point is we should allow this kind of logic in RDD, let's say
>> transforming type RDD[(Key, Iterable[T])] to Seq[(Key, RDD[T])].
>>
>>
>>
>> Make sense?
>>
>>
>>
>> Jianshi
>>
>>
>>
>> On Mon, Oct 27, 2014 at 3:56 PM, Shao, Saisai 
>> wrote:
>>
>>  I think what you want is to make each bucket as a new RDD as what you
>> mentioned in Pig syntax.
>>
>>
>>
>> gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
>> timestamp for each bucket
>>
>>
>>
>> From my understanding, currently in Spark there’s no such kind of API to
>> achieve this, maybe you have to create a customized RDD by yourself.
>>
>>
>>
>> For the code why cannot executed,
>>
>>
>>
>>   .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...
>>
>>
>>
>> This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure
>> to execute in remote side, which obviously do not has SparkContext, I think
>> Spark cannot support nested RDD in closure.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>> *Sent:* Monday, October 27, 2014 3:30 PM
>>
>>
>> *To:* Shao, Saisai
>> *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
>> *Subject:* Re: RDD to DStream
>>
>>
>>
>> Ok, back to Scala code, I'm wondering why I cannot do this:
>>
>>
>>
>> data.groupBy(timestamp / window)
>>
>>   .sortByKey()  // no sort method available here
>>
>>   .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...
>>
>>   .collect() // returns Seq[RDD[(Timestamp, T)]]
>>
>>
>>
>>
>>
>> Jianshi
>>
>>
>>
>> On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang 
>> wrote:
>>
>>  You're absolutely right, it's not 'scalable' as I'm using collect().
>>
>>
>>
>> However, it's important to have the RDDs ordered by the timestamp of the
>> time window (groupBy puts data to corresponding timewindow).
>>
>>
>>
>> It's fairly easy to do in Pig, but somehow I have no idea how to express
>> it in RDD...
>>
>>
>>
>> Something like (in Pig, pseudo code :):
>>
>>
>>
>> g = GROUP data BY (timestamp / windowSize)  // group data into buckets in
>> the same time window
>>
&g

Re: RDD to DStream

2014-10-27 Thread Jianshi Huang
Sure, let's still focus on the streaming simulation use case. It's a very
useful problem to solve.

If we're going to use the same Spark-streaming core for the simulation, the
most simple way is to have a globally sorted RDDs and use ssc.queueStream.
Thus collecting the Key part to driver is probably necessary.

I've done offline simulation in Pig and it's absolutely non-trivial and
error prone, also I had to have multiple copies of data due to overlaps of
window.

Therefore I would prefer using the existing streaming implementation for
the simulation rather than a special DStream.


Jianshi



On Mon, Oct 27, 2014 at 4:44 PM, Shao, Saisai  wrote:

>  Yes, I understand what you want, but maybe hard to achieve without
> collecting back to driver node.
>
>
>
> Besides, can we just think of another way to do it.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Monday, October 27, 2014 4:07 PM
>
> *To:* Shao, Saisai
> *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
> *Subject:* Re: RDD to DStream
>
>
>
> Yeah, you're absolutely right Saisai.
>
>
>
> My point is we should allow this kind of logic in RDD, let's say
> transforming type RDD[(Key, Iterable[T])] to Seq[(Key, RDD[T])].
>
>
>
> Make sense?
>
>
>
> Jianshi
>
>
>
> On Mon, Oct 27, 2014 at 3:56 PM, Shao, Saisai 
> wrote:
>
>  I think what you want is to make each bucket as a new RDD as what you
> mentioned in Pig syntax.
>
>
>
> gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
> timestamp for each bucket
>
>
>
> From my understanding, currently in Spark there’s no such kind of API to
> achieve this, maybe you have to create a customized RDD by yourself.
>
>
>
> For the code why cannot executed,
>
>
>
>   .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...
>
>
>
> This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure
> to execute in remote side, which obviously do not has SparkContext, I think
> Spark cannot support nested RDD in closure.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Monday, October 27, 2014 3:30 PM
>
>
> *To:* Shao, Saisai
> *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
> *Subject:* Re: RDD to DStream
>
>
>
> Ok, back to Scala code, I'm wondering why I cannot do this:
>
>
>
> data.groupBy(timestamp / window)
>
>   .sortByKey()  // no sort method available here
>
>   .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...
>
>   .collect() // returns Seq[RDD[(Timestamp, T)]]
>
>
>
>
>
> Jianshi
>
>
>
> On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang 
> wrote:
>
>  You're absolutely right, it's not 'scalable' as I'm using collect().
>
>
>
> However, it's important to have the RDDs ordered by the timestamp of the
> time window (groupBy puts data to corresponding timewindow).
>
>
>
> It's fairly easy to do in Pig, but somehow I have no idea how to express
> it in RDD...
>
>
>
> Something like (in Pig, pseudo code :):
>
>
>
> g = GROUP data BY (timestamp / windowSize)  // group data into buckets in
> the same time window
>
> gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
> timestamp for each bucket
>
> stream = FOREACH gs GENERATE toRDD(g)
>
>
>
> No idea how to do the order by part in RDD.
>
>
>
> Jianshi
>
>
>
>
>
> On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai 
> wrote:
>
>  I think you solution may not  be extendable if the data size is
> increasing, since you have to collect all your data back to driver node, so
> the memory usage of driver will be a problem.
>
>
>
> why not filter out specific time-range data as a rdd, after filtering the
> whole time-range, you will get a series of RDD with timestamp divided, and
> then feed into queue. Still it is not an efficient way, but  it is not
> limited by driver memory.
>
>
>
> Also there may have some other solutions like shuffle to arrange data, but
> you cannot avoid scanning the whole data. Basically we need to avoid
> fetching large amount of data back to driver.
>
>
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Monday, October 27, 2014 2:39 PM
> *To:* Shao, Saisai
> *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
>
>
> *Subject:* Re: RDD to DStream
>
>

RE: RDD to DStream

2014-10-27 Thread Shao, Saisai
Yes, I understand what you want, but maybe hard to achieve without collecting 
back to driver node.

Besides, can we just think of another way to do it.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 4:07 PM
To: Shao, Saisai
Cc: user@spark.apache.org; Tathagata Das (t...@databricks.com)
Subject: Re: RDD to DStream

Yeah, you're absolutely right Saisai.

My point is we should allow this kind of logic in RDD, let's say transforming 
type RDD[(Key, Iterable[T])] to Seq[(Key, RDD[T])].

Make sense?

Jianshi

On Mon, Oct 27, 2014 at 3:56 PM, Shao, Saisai 
mailto:saisai.s...@intel.com>> wrote:
I think what you want is to make each bucket as a new RDD as what you mentioned 
in Pig syntax.

gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded timestamp 
for each bucket

From my understanding, currently in Spark there’s no such kind of API to 
achieve this, maybe you have to create a customized RDD by yourself.

For the code why cannot executed,

  .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...

This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure to 
execute in remote side, which obviously do not has SparkContext, I think Spark 
cannot support nested RDD in closure.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.com<mailto:jianshi.hu...@gmail.com>]
Sent: Monday, October 27, 2014 3:30 PM

To: Shao, Saisai
Cc: user@spark.apache.org<mailto:user@spark.apache.org>; Tathagata Das 
(t...@databricks.com<mailto:t...@databricks.com>)
Subject: Re: RDD to DStream

Ok, back to Scala code, I'm wondering why I cannot do this:

data.groupBy(timestamp / window)
  .sortByKey()  // no sort method available here
  .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...
  .collect() // returns Seq[RDD[(Timestamp, T)]]


Jianshi

On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang 
mailto:jianshi.hu...@gmail.com>> wrote:
You're absolutely right, it's not 'scalable' as I'm using collect().

However, it's important to have the RDDs ordered by the timestamp of the time 
window (groupBy puts data to corresponding timewindow).

It's fairly easy to do in Pig, but somehow I have no idea how to express it in 
RDD...

Something like (in Pig, pseudo code :):

g = GROUP data BY (timestamp / windowSize)  // group data into buckets in the 
same time window
gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded timestamp 
for each bucket
stream = FOREACH gs GENERATE toRDD(g)

No idea how to do the order by part in RDD.

Jianshi


On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai 
mailto:saisai.s...@intel.com>> wrote:
I think you solution may not  be extendable if the data size is increasing, 
since you have to collect all your data back to driver node, so the memory 
usage of driver will be a problem.

why not filter out specific time-range data as a rdd, after filtering the whole 
time-range, you will get a series of RDD with timestamp divided, and then feed 
into queue. Still it is not an efficient way, but  it is not limited by driver 
memory.

Also there may have some other solutions like shuffle to arrange data, but you 
cannot avoid scanning the whole data. Basically we need to avoid fetching large 
amount of data back to driver.


Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.com<mailto:jianshi.hu...@gmail.com>]
Sent: Monday, October 27, 2014 2:39 PM
To: Shao, Saisai
Cc: user@spark.apache.org<mailto:user@spark.apache.org>; Tathagata Das 
(t...@databricks.com<mailto:t...@databricks.com>)

Subject: Re: RDD to DStream

Hi Saisai,

I understand it's non-trivial, but the requirement of simulating offline data 
as stream is also fair. :)

I just wrote a prototype, however, I need to do a collect and a bunch of 
parallelize...

  // RDD of (timestamp, value)
  def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: 
StreamingContext): DStream[T] = {
val sc = ssc.sparkContext
val d = data.groupBy(_._1 / timeWindow)
.map(e => (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))
.collect()
.map(e => (e._1, sc.parallelize(e._2)))
.sortBy(_._1)
val queue = new mutable.SynchronizedQueue[RDD[T]]

queue ++= d.map(_._2)

ssc.queueStream(queue)
  }

Any way to get a list of RDDs sorted by group key just after groupBy?

Jianshi

On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai 
mailto:saisai.s...@intel.com>> wrote:
Hi Jianshi,

For simulation purpose, I think you can try ConstantInputDStream and 
QueueInputDStream to convert one RDD or series of RDD into DStream, the first 
one output the same RDD in each batch duration, and the second one just output 
a RDD in a queue in each batch duration. You can take a look at it.

For your case, I think TD’s comment are quite meaningful, it’s no

Re: RDD to DStream

2014-10-27 Thread Jianshi Huang
Yeah, you're absolutely right Saisai.

My point is we should allow this kind of logic in RDD, let's say
transforming type RDD[(Key, Iterable[T])] to Seq[(Key, RDD[T])].

Make sense?

Jianshi

On Mon, Oct 27, 2014 at 3:56 PM, Shao, Saisai  wrote:

>  I think what you want is to make each bucket as a new RDD as what you
> mentioned in Pig syntax.
>
>
>
> gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
> timestamp for each bucket
>
>
>
> From my understanding, currently in Spark there’s no such kind of API to
> achieve this, maybe you have to create a customized RDD by yourself.
>
>
>
> For the code why cannot executed,
>
>
>
>   .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...
>
>
>
> This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure
> to execute in remote side, which obviously do not has SparkContext, I think
> Spark cannot support nested RDD in closure.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Monday, October 27, 2014 3:30 PM
>
> *To:* Shao, Saisai
> *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
> *Subject:* Re: RDD to DStream
>
>
>
> Ok, back to Scala code, I'm wondering why I cannot do this:
>
>
>
> data.groupBy(timestamp / window)
>
>   .sortByKey()  // no sort method available here
>
>   .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...
>
>   .collect() // returns Seq[RDD[(Timestamp, T)]]
>
>
>
>
>
> Jianshi
>
>
>
> On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang 
> wrote:
>
>  You're absolutely right, it's not 'scalable' as I'm using collect().
>
>
>
> However, it's important to have the RDDs ordered by the timestamp of the
> time window (groupBy puts data to corresponding timewindow).
>
>
>
> It's fairly easy to do in Pig, but somehow I have no idea how to express
> it in RDD...
>
>
>
> Something like (in Pig, pseudo code :):
>
>
>
> g = GROUP data BY (timestamp / windowSize)  // group data into buckets in
> the same time window
>
> gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
> timestamp for each bucket
>
> stream = FOREACH gs GENERATE toRDD(g)
>
>
>
> No idea how to do the order by part in RDD.
>
>
>
> Jianshi
>
>
>
>
>
> On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai 
> wrote:
>
>  I think you solution may not  be extendable if the data size is
> increasing, since you have to collect all your data back to driver node, so
> the memory usage of driver will be a problem.
>
>
>
> why not filter out specific time-range data as a rdd, after filtering the
> whole time-range, you will get a series of RDD with timestamp divided, and
> then feed into queue. Still it is not an efficient way, but  it is not
> limited by driver memory.
>
>
>
> Also there may have some other solutions like shuffle to arrange data, but
> you cannot avoid scanning the whole data. Basically we need to avoid
> fetching large amount of data back to driver.
>
>
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Monday, October 27, 2014 2:39 PM
> *To:* Shao, Saisai
> *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
>
>
> *Subject:* Re: RDD to DStream
>
>
>
> Hi Saisai,
>
>
>
> I understand it's non-trivial, but the requirement of simulating offline
> data as stream is also fair. :)
>
>
>
> I just wrote a prototype, however, I need to do a collect and a bunch of
> parallelize...
>
>
>
>   // RDD of (timestamp, value)
>
>   def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long,
> ssc: StreamingContext): DStream[T] = {
>
> val sc = ssc.sparkContext
>
> val d = data.groupBy(_._1 / timeWindow)
>
> .map(e => (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))
>
> .collect()
>
> .map(e => (e._1, sc.parallelize(e._2)))
>
> .sortBy(_._1)
>
> val queue = new mutable.SynchronizedQueue[RDD[T]]
>
>
>
> queue ++= d.map(_._2)
>
>
>
> ssc.queueStream(queue)
>
>   }
>
>
>
> Any way to get a list of RDDs sorted by group key just after groupBy?
>
>
>
> Jianshi
>
>
>
> On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai 
> wrote:
>
>  Hi Jianshi,
>
>
>
> For simulation purpose, I think you can try ConstantInputDStream and
> QueueInputDStream to con

RE: RDD to DStream

2014-10-27 Thread Shao, Saisai
I think what you want is to make each bucket as a new RDD as what you mentioned 
in Pig syntax.

gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded timestamp 
for each bucket

From my understanding, currently in Spark there’s no such kind of API to 
achieve this, maybe you have to create a customized RDD by yourself.

For the code why cannot executed,

  .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...

This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure to 
execute in remote side, which obviously do not has SparkContext, I think Spark 
cannot support nested RDD in closure.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 3:30 PM
To: Shao, Saisai
Cc: user@spark.apache.org; Tathagata Das (t...@databricks.com)
Subject: Re: RDD to DStream

Ok, back to Scala code, I'm wondering why I cannot do this:

data.groupBy(timestamp / window)
  .sortByKey()  // no sort method available here
  .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...
  .collect() // returns Seq[RDD[(Timestamp, T)]]


Jianshi

On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang 
mailto:jianshi.hu...@gmail.com>> wrote:
You're absolutely right, it's not 'scalable' as I'm using collect().

However, it's important to have the RDDs ordered by the timestamp of the time 
window (groupBy puts data to corresponding timewindow).

It's fairly easy to do in Pig, but somehow I have no idea how to express it in 
RDD...

Something like (in Pig, pseudo code :):

g = GROUP data BY (timestamp / windowSize)  // group data into buckets in the 
same time window
gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded timestamp 
for each bucket
stream = FOREACH gs GENERATE toRDD(g)

No idea how to do the order by part in RDD.

Jianshi


On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai 
mailto:saisai.s...@intel.com>> wrote:
I think you solution may not  be extendable if the data size is increasing, 
since you have to collect all your data back to driver node, so the memory 
usage of driver will be a problem.

why not filter out specific time-range data as a rdd, after filtering the whole 
time-range, you will get a series of RDD with timestamp divided, and then feed 
into queue. Still it is not an efficient way, but  it is not limited by driver 
memory.

Also there may have some other solutions like shuffle to arrange data, but you 
cannot avoid scanning the whole data. Basically we need to avoid fetching large 
amount of data back to driver.


Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.com<mailto:jianshi.hu...@gmail.com>]
Sent: Monday, October 27, 2014 2:39 PM
To: Shao, Saisai
Cc: user@spark.apache.org<mailto:user@spark.apache.org>; Tathagata Das 
(t...@databricks.com<mailto:t...@databricks.com>)

Subject: Re: RDD to DStream

Hi Saisai,

I understand it's non-trivial, but the requirement of simulating offline data 
as stream is also fair. :)

I just wrote a prototype, however, I need to do a collect and a bunch of 
parallelize...

  // RDD of (timestamp, value)
  def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: 
StreamingContext): DStream[T] = {
val sc = ssc.sparkContext
val d = data.groupBy(_._1 / timeWindow)
.map(e => (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))
.collect()
.map(e => (e._1, sc.parallelize(e._2)))
.sortBy(_._1)
val queue = new mutable.SynchronizedQueue[RDD[T]]

queue ++= d.map(_._2)

ssc.queueStream(queue)
  }

Any way to get a list of RDDs sorted by group key just after groupBy?

Jianshi

On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai 
mailto:saisai.s...@intel.com>> wrote:
Hi Jianshi,

For simulation purpose, I think you can try ConstantInputDStream and 
QueueInputDStream to convert one RDD or series of RDD into DStream, the first 
one output the same RDD in each batch duration, and the second one just output 
a RDD in a queue in each batch duration. You can take a look at it.

For your case, I think TD’s comment are quite meaningful, it’s not trivial to 
do so, often requires a job to scan all the records, it’s also not the design 
purpose of Spark Streaming, I guess it’s hard to achieve what you want.


Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.com<mailto:jianshi.hu...@gmail.com>]
Sent: Monday, October 27, 2014 1:42 PM
To: Tathagata Das
Cc: Aniket Bhatnagar; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: RDD to DStream

I have a similar requirement. But instead of grouping it by chunkSize, I would 
have the timeStamp be part of the data. So the function I want has the 
following signature:

  // RDD of (timestamp, value)
  def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: 
StreamingContext): DStream[T]

And DStream should 

Re: RDD to DStream

2014-10-27 Thread Jianshi Huang
Ok, back to Scala code, I'm wondering why I cannot do this:

data.groupBy(timestamp / window)
  .sortByKey()  // no sort method available here
  .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...
  .collect() // returns Seq[RDD[(Timestamp, T)]]


Jianshi

On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang 
wrote:

> You're absolutely right, it's not 'scalable' as I'm using collect().
>
> However, it's important to have the RDDs ordered by the timestamp of the
> time window (groupBy puts data to corresponding timewindow).
>
> It's fairly easy to do in Pig, but somehow I have no idea how to express
> it in RDD...
>
> Something like (in Pig, pseudo code :):
>
> g = GROUP data BY (timestamp / windowSize)  // group data into buckets in
> the same time window
> gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
> timestamp for each bucket
> stream = FOREACH gs GENERATE toRDD(g)
>
> No idea how to do the order by part in RDD.
>
> Jianshi
>
>
> On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai 
> wrote:
>
>>  I think you solution may not  be extendable if the data size is
>> increasing, since you have to collect all your data back to driver node, so
>> the memory usage of driver will be a problem.
>>
>>
>>
>> why not filter out specific time-range data as a rdd, after filtering the
>> whole time-range, you will get a series of RDD with timestamp divided, and
>> then feed into queue. Still it is not an efficient way, but  it is not
>> limited by driver memory.
>>
>>
>>
>> Also there may have some other solutions like shuffle to arrange data,
>> but you cannot avoid scanning the whole data. Basically we need to avoid
>> fetching large amount of data back to driver.
>>
>>
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>> *Sent:* Monday, October 27, 2014 2:39 PM
>> *To:* Shao, Saisai
>> *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
>>
>> *Subject:* Re: RDD to DStream
>>
>>
>>
>> Hi Saisai,
>>
>>
>>
>> I understand it's non-trivial, but the requirement of simulating offline
>> data as stream is also fair. :)
>>
>>
>>
>> I just wrote a prototype, however, I need to do a collect and a bunch of
>> parallelize...
>>
>>
>>
>>   // RDD of (timestamp, value)
>>
>>   def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long,
>> ssc: StreamingContext): DStream[T] = {
>>
>> val sc = ssc.sparkContext
>>
>> val d = data.groupBy(_._1 / timeWindow)
>>
>> .map(e => (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))
>>
>> .collect()
>>
>> .map(e => (e._1, sc.parallelize(e._2)))
>>
>> .sortBy(_._1)
>>
>> val queue = new mutable.SynchronizedQueue[RDD[T]]
>>
>>
>>
>> queue ++= d.map(_._2)
>>
>>
>>
>> ssc.queueStream(queue)
>>
>>   }
>>
>>
>>
>> Any way to get a list of RDDs sorted by group key just after groupBy?
>>
>>
>>
>> Jianshi
>>
>>
>>
>> On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai 
>> wrote:
>>
>>  Hi Jianshi,
>>
>>
>>
>> For simulation purpose, I think you can try ConstantInputDStream and
>> QueueInputDStream to convert one RDD or series of RDD into DStream, the
>> first one output the same RDD in each batch duration, and the second one
>> just output a RDD in a queue in each batch duration. You can take a look at
>> it.
>>
>>
>>
>> For your case, I think TD’s comment are quite meaningful, it’s not
>> trivial to do so, often requires a job to scan all the records, it’s also
>> not the design purpose of Spark Streaming, I guess it’s hard to achieve
>> what you want.
>>
>>
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>> *Sent:* Monday, October 27, 2014 1:42 PM
>> *To:* Tathagata Das
>> *Cc:* Aniket Bhatnagar; user@spark.apache.org
>> *Subject:* Re: RDD to DStream
>>
>>
>>
>> I have a similar requirement. But instead of grouping it by chunkSize, I
>> would have the timeStamp be part of the data. So the function I want has
>> the following signature:
>>
>>
>>
>>   // RDD of (timestam

Re: RDD to DStream

2014-10-27 Thread Jianshi Huang
You're absolutely right, it's not 'scalable' as I'm using collect().

However, it's important to have the RDDs ordered by the timestamp of the
time window (groupBy puts data to corresponding timewindow).

It's fairly easy to do in Pig, but somehow I have no idea how to express it
in RDD...

Something like (in Pig, pseudo code :):

g = GROUP data BY (timestamp / windowSize)  // group data into buckets in
the same time window
gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
timestamp for each bucket
stream = FOREACH gs GENERATE toRDD(g)

No idea how to do the order by part in RDD.

Jianshi


On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai  wrote:

>  I think you solution may not  be extendable if the data size is
> increasing, since you have to collect all your data back to driver node, so
> the memory usage of driver will be a problem.
>
>
>
> why not filter out specific time-range data as a rdd, after filtering the
> whole time-range, you will get a series of RDD with timestamp divided, and
> then feed into queue. Still it is not an efficient way, but  it is not
> limited by driver memory.
>
>
>
> Also there may have some other solutions like shuffle to arrange data, but
> you cannot avoid scanning the whole data. Basically we need to avoid
> fetching large amount of data back to driver.
>
>
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Monday, October 27, 2014 2:39 PM
> *To:* Shao, Saisai
> *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
>
> *Subject:* Re: RDD to DStream
>
>
>
> Hi Saisai,
>
>
>
> I understand it's non-trivial, but the requirement of simulating offline
> data as stream is also fair. :)
>
>
>
> I just wrote a prototype, however, I need to do a collect and a bunch of
> parallelize...
>
>
>
>   // RDD of (timestamp, value)
>
>   def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long,
> ssc: StreamingContext): DStream[T] = {
>
> val sc = ssc.sparkContext
>
> val d = data.groupBy(_._1 / timeWindow)
>
> .map(e => (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))
>
> .collect()
>
> .map(e => (e._1, sc.parallelize(e._2)))
>
> .sortBy(_._1)
>
> val queue = new mutable.SynchronizedQueue[RDD[T]]
>
>
>
> queue ++= d.map(_._2)
>
>
>
> ssc.queueStream(queue)
>
>   }
>
>
>
> Any way to get a list of RDDs sorted by group key just after groupBy?
>
>
>
> Jianshi
>
>
>
> On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai 
> wrote:
>
>  Hi Jianshi,
>
>
>
> For simulation purpose, I think you can try ConstantInputDStream and
> QueueInputDStream to convert one RDD or series of RDD into DStream, the
> first one output the same RDD in each batch duration, and the second one
> just output a RDD in a queue in each batch duration. You can take a look at
> it.
>
>
>
> For your case, I think TD’s comment are quite meaningful, it’s not trivial
> to do so, often requires a job to scan all the records, it’s also not the
> design purpose of Spark Streaming, I guess it’s hard to achieve what you
> want.
>
>
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Monday, October 27, 2014 1:42 PM
> *To:* Tathagata Das
> *Cc:* Aniket Bhatnagar; user@spark.apache.org
> *Subject:* Re: RDD to DStream
>
>
>
> I have a similar requirement. But instead of grouping it by chunkSize, I
> would have the timeStamp be part of the data. So the function I want has
> the following signature:
>
>
>
>   // RDD of (timestamp, value)
>
>   def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit
> ssc: StreamingContext): DStream[T]
>
>
>
> And DStream should respect the timestamp part. This is important for
> simulation, right?
>
>
>
> Do you have any good solution for this?
>
>
>
> Jianshi
>
>
>
>
>
> On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das 
> wrote:
>
>  Hey Aniket,
>
>
>
> Great thoughts! I understand the usecase. But as you have realized
> yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD
> operations are defined to be scan based, it is not efficient to define RDD
> based on slices of data within a partition of another RDD, using pure RDD
> transformations. What you have done is a decent, and probably the only
> feasible solution, with its limitations.
>
>
>
> Also the requirements of converting a batch of data to 

RE: RDD to DStream

2014-10-27 Thread Shao, Saisai
I think you solution may not  be extendable if the data size is increasing, 
since you have to collect all your data back to driver node, so the memory 
usage of driver will be a problem.

why not filter out specific time-range data as a rdd, after filtering the whole 
time-range, you will get a series of RDD with timestamp divided, and then feed 
into queue. Still it is not an efficient way, but  it is not limited by driver 
memory.

Also there may have some other solutions like shuffle to arrange data, but you 
cannot avoid scanning the whole data. Basically we need to avoid fetching large 
amount of data back to driver.


Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 2:39 PM
To: Shao, Saisai
Cc: user@spark.apache.org; Tathagata Das (t...@databricks.com)
Subject: Re: RDD to DStream

Hi Saisai,

I understand it's non-trivial, but the requirement of simulating offline data 
as stream is also fair. :)

I just wrote a prototype, however, I need to do a collect and a bunch of 
parallelize...

  // RDD of (timestamp, value)
  def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: 
StreamingContext): DStream[T] = {
val sc = ssc.sparkContext
val d = data.groupBy(_._1 / timeWindow)
.map(e => (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))
.collect()
.map(e => (e._1, sc.parallelize(e._2)))
.sortBy(_._1)
val queue = new mutable.SynchronizedQueue[RDD[T]]

queue ++= d.map(_._2)

ssc.queueStream(queue)
  }

Any way to get a list of RDDs sorted by group key just after groupBy?

Jianshi

On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai 
mailto:saisai.s...@intel.com>> wrote:
Hi Jianshi,

For simulation purpose, I think you can try ConstantInputDStream and 
QueueInputDStream to convert one RDD or series of RDD into DStream, the first 
one output the same RDD in each batch duration, and the second one just output 
a RDD in a queue in each batch duration. You can take a look at it.

For your case, I think TD’s comment are quite meaningful, it’s not trivial to 
do so, often requires a job to scan all the records, it’s also not the design 
purpose of Spark Streaming, I guess it’s hard to achieve what you want.


Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.com<mailto:jianshi.hu...@gmail.com>]
Sent: Monday, October 27, 2014 1:42 PM
To: Tathagata Das
Cc: Aniket Bhatnagar; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: RDD to DStream

I have a similar requirement. But instead of grouping it by chunkSize, I would 
have the timeStamp be part of the data. So the function I want has the 
following signature:

  // RDD of (timestamp, value)
  def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: 
StreamingContext): DStream[T]

And DStream should respect the timestamp part. This is important for 
simulation, right?

Do you have any good solution for this?

Jianshi


On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das 
mailto:tathagata.das1...@gmail.com>> wrote:
Hey Aniket,

Great thoughts! I understand the usecase. But as you have realized yourself it 
is not trivial to cleanly stream a RDD as a DStream. Since RDD operations are 
defined to be scan based, it is not efficient to define RDD based on slices of 
data within a partition of another RDD, using pure RDD transformations. What 
you have done is a decent, and probably the only feasible solution, with its 
limitations.

Also the requirements of converting a batch of data to a stream of data can be 
pretty diverse. What rate, what # of events per batch, how many batches, is it 
efficient? Hence, it is not trivial to define a good, clean public API for 
that. If any one has any thoughts, ideas, etc on this, you are more than 
welcome to share them.

TD

On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar 
mailto:aniket.bhatna...@gmail.com>> wrote:
The use case for converting RDD into DStream is that I want to simulate a 
stream from an already persisted data for testing analytics. It is trivial to 
create a RDD from any persisted data but not so much for DStream. Therefore, my 
idea to create DStream from RDD. For example, lets say you are trying to 
implement analytics on time series data using Lambda architecture. This means 
you would have to implement the same analytics on streaming data (in streaming 
mode) as well as persisted data (in batch mode). The workflow for implementing 
the anlytics would be to first implement it in batch mode using RDD operations 
and then simulate stream to test the analytics in stream mode. The simulated 
stream should produce the elements at a specified rate. So the solution maybe 
to read data in a RDD, split (chunk) it into multiple RDDs with each RDD having 
the size of elements that need to be streamed per time unit and then finally 
stream each RDD using the compute function.

The problem with using QueueIn

Re: RDD to DStream

2014-10-26 Thread Jianshi Huang
Hi Saisai,

I understand it's non-trivial, but the requirement of simulating offline
data as stream is also fair. :)

I just wrote a prototype, however, I need to do a collect and a bunch of
parallelize...

  // RDD of (timestamp, value)
  def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long,
ssc: StreamingContext): DStream[T] = {
val sc = ssc.sparkContext
val d = data.groupBy(_._1 / timeWindow)
.map(e => (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))
.collect()
.map(e => (e._1, sc.parallelize(e._2)))
.sortBy(_._1)
val queue = new mutable.SynchronizedQueue[RDD[T]]

queue ++= d.map(_._2)

ssc.queueStream(queue)
  }

Any way to get a list of RDDs sorted by group key just after groupBy?

Jianshi

On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai  wrote:

>  Hi Jianshi,
>
>
>
> For simulation purpose, I think you can try ConstantInputDStream and
> QueueInputDStream to convert one RDD or series of RDD into DStream, the
> first one output the same RDD in each batch duration, and the second one
> just output a RDD in a queue in each batch duration. You can take a look at
> it.
>
>
>
> For your case, I think TD’s comment are quite meaningful, it’s not trivial
> to do so, often requires a job to scan all the records, it’s also not the
> design purpose of Spark Streaming, I guess it’s hard to achieve what you
> want.
>
>
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Monday, October 27, 2014 1:42 PM
> *To:* Tathagata Das
> *Cc:* Aniket Bhatnagar; user@spark.apache.org
> *Subject:* Re: RDD to DStream
>
>
>
> I have a similar requirement. But instead of grouping it by chunkSize, I
> would have the timeStamp be part of the data. So the function I want has
> the following signature:
>
>
>
>   // RDD of (timestamp, value)
>
>   def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit
> ssc: StreamingContext): DStream[T]
>
>
>
> And DStream should respect the timestamp part. This is important for
> simulation, right?
>
>
>
> Do you have any good solution for this?
>
>
>
> Jianshi
>
>
>
>
>
> On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das 
> wrote:
>
>  Hey Aniket,
>
>
>
> Great thoughts! I understand the usecase. But as you have realized
> yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD
> operations are defined to be scan based, it is not efficient to define RDD
> based on slices of data within a partition of another RDD, using pure RDD
> transformations. What you have done is a decent, and probably the only
> feasible solution, with its limitations.
>
>
>
> Also the requirements of converting a batch of data to a stream of data
> can be pretty diverse. What rate, what # of events per batch, how many
> batches, is it efficient? Hence, it is not trivial to define a good, clean
> public API for that. If any one has any thoughts, ideas, etc on this, you
> are more than welcome to share them.
>
>
>
> TD
>
>
>
> On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>  The use case for converting RDD into DStream is that I want to simulate
> a stream from an already persisted data for testing analytics. It is
> trivial to create a RDD from any persisted data but not so much for
> DStream. Therefore, my idea to create DStream from RDD. For example, lets
> say you are trying to implement analytics on time series data using Lambda
> architecture. This means you would have to implement the same analytics on
> streaming data (in streaming mode) as well as persisted data (in batch
> mode). The workflow for implementing the anlytics would be to first
> implement it in batch mode using RDD operations and then simulate stream to
> test the analytics in stream mode. The simulated stream should produce the
> elements at a specified rate. So the solution maybe to read data in a RDD,
> split (chunk) it into multiple RDDs with each RDD having the size of
> elements that need to be streamed per time unit and then finally stream
> each RDD using the compute function.
>
>
>
> The problem with using QueueInputDStream is that it will stream data as
> per the batch duration specified in the streaming context and one cannot
> specify a custom slide duration. Moreover, the class QueueInputDStream is
> private to streaming package, so I can't really use it/extend it from an
> external package. Also, I could not find a good solution split a RDD into
> equal sized smaller RDDs that can be fed into an extended version of
> QueueInputDStream.
>
>
>
> F

RE: RDD to DStream

2014-10-26 Thread Shao, Saisai
Hi Jianshi,

For simulation purpose, I think you can try ConstantInputDStream and 
QueueInputDStream to convert one RDD or series of RDD into DStream, the first 
one output the same RDD in each batch duration, and the second one just output 
a RDD in a queue in each batch duration. You can take a look at it.

For your case, I think TD’s comment are quite meaningful, it’s not trivial to 
do so, often requires a job to scan all the records, it’s also not the design 
purpose of Spark Streaming, I guess it’s hard to achieve what you want.


Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 1:42 PM
To: Tathagata Das
Cc: Aniket Bhatnagar; user@spark.apache.org
Subject: Re: RDD to DStream

I have a similar requirement. But instead of grouping it by chunkSize, I would 
have the timeStamp be part of the data. So the function I want has the 
following signature:

  // RDD of (timestamp, value)
  def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: 
StreamingContext): DStream[T]

And DStream should respect the timestamp part. This is important for 
simulation, right?

Do you have any good solution for this?

Jianshi


On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das 
mailto:tathagata.das1...@gmail.com>> wrote:
Hey Aniket,

Great thoughts! I understand the usecase. But as you have realized yourself it 
is not trivial to cleanly stream a RDD as a DStream. Since RDD operations are 
defined to be scan based, it is not efficient to define RDD based on slices of 
data within a partition of another RDD, using pure RDD transformations. What 
you have done is a decent, and probably the only feasible solution, with its 
limitations.

Also the requirements of converting a batch of data to a stream of data can be 
pretty diverse. What rate, what # of events per batch, how many batches, is it 
efficient? Hence, it is not trivial to define a good, clean public API for 
that. If any one has any thoughts, ideas, etc on this, you are more than 
welcome to share them.

TD

On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar 
mailto:aniket.bhatna...@gmail.com>> wrote:
The use case for converting RDD into DStream is that I want to simulate a 
stream from an already persisted data for testing analytics. It is trivial to 
create a RDD from any persisted data but not so much for DStream. Therefore, my 
idea to create DStream from RDD. For example, lets say you are trying to 
implement analytics on time series data using Lambda architecture. This means 
you would have to implement the same analytics on streaming data (in streaming 
mode) as well as persisted data (in batch mode). The workflow for implementing 
the anlytics would be to first implement it in batch mode using RDD operations 
and then simulate stream to test the analytics in stream mode. The simulated 
stream should produce the elements at a specified rate. So the solution maybe 
to read data in a RDD, split (chunk) it into multiple RDDs with each RDD having 
the size of elements that need to be streamed per time unit and then finally 
stream each RDD using the compute function.

The problem with using QueueInputDStream is that it will stream data as per the 
batch duration specified in the streaming context and one cannot specify a 
custom slide duration. Moreover, the class QueueInputDStream is private to 
streaming package, so I can't really use it/extend it from an external package. 
Also, I could not find a good solution split a RDD into equal sized smaller 
RDDs that can be fed into an extended version of QueueInputDStream.

Finally, here is what I came up with:

class RDDExtension[T: ClassTag](rdd: RDD[T]) {
  def toStream(streamingContext: StreamingContext, chunkSize: Int, 
slideDurationMilli: Option[Long] = None): DStream[T] = {
new InputDStream[T](streamingContext) {

  private val iterator = rdd.toLocalIterator // WARNING: each partition 
much fit in RAM of local machine.
  private val grouped = iterator.grouped(chunkSize)

  override def start(): Unit = {}

  override def stop(): Unit = {}

  override def compute(validTime: Time): Option[RDD[T]] = {
if (grouped.hasNext) {
  Some(rdd.sparkContext.parallelize(grouped.next()))
} else {
  None
}
  }

  override def slideDuration = {
slideDurationMilli.map(duration => new Duration(duration)).
  getOrElse(super.slideDuration)
  }
}
}

This aims to stream chunkSize elements every slideDurationMilli milliseconds 
(defaults to batch size in streaming context). It's still not perfect (for 
example, the streaming is not precise) but given that this will only be used 
for testing purposes, I don't look for ways to further optimize it.

Thanks,
Aniket


On 2 August 2014 04:07, Mayur Rustagi 
mailto:mayur.rust...@gmail.com>> wrote:
Nice question :)
Ideally you should use a queuestream interface to push RDD into a queue & then 
spa

Re: RDD to DStream

2014-10-26 Thread Jianshi Huang
I have a similar requirement. But instead of grouping it by chunkSize, I
would have the timeStamp be part of the data. So the function I want has
the following signature:

  // RDD of (timestamp, value)
  def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc:
StreamingContext): DStream[T]

And DStream should respect the timestamp part. This is important for
simulation, right?

Do you have any good solution for this?

Jianshi


On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das 
wrote:

> Hey Aniket,
>
> Great thoughts! I understand the usecase. But as you have realized
> yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD
> operations are defined to be scan based, it is not efficient to define RDD
> based on slices of data within a partition of another RDD, using pure RDD
> transformations. What you have done is a decent, and probably the only
> feasible solution, with its limitations.
>
> Also the requirements of converting a batch of data to a stream of data
> can be pretty diverse. What rate, what # of events per batch, how many
> batches, is it efficient? Hence, it is not trivial to define a good, clean
> public API for that. If any one has any thoughts, ideas, etc on this, you
> are more than welcome to share them.
>
> TD
>
>
> On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> The use case for converting RDD into DStream is that I want to simulate a
>> stream from an already persisted data for testing analytics. It is trivial
>> to create a RDD from any persisted data but not so much for DStream.
>> Therefore, my idea to create DStream from RDD. For example, lets say you
>> are trying to implement analytics on time series data using Lambda
>> architecture. This means you would have to implement the same analytics on
>> streaming data (in streaming mode) as well as persisted data (in batch
>> mode). The workflow for implementing the anlytics would be to first
>> implement it in batch mode using RDD operations and then simulate stream to
>> test the analytics in stream mode. The simulated stream should produce the
>> elements at a specified rate. So the solution maybe to read data in a RDD,
>> split (chunk) it into multiple RDDs with each RDD having the size of
>> elements that need to be streamed per time unit and then finally stream
>> each RDD using the compute function.
>>
>> The problem with using QueueInputDStream is that it will stream data as
>> per the batch duration specified in the streaming context and one cannot
>> specify a custom slide duration. Moreover, the class QueueInputDStream is
>> private to streaming package, so I can't really use it/extend it from an
>> external package. Also, I could not find a good solution split a RDD into
>> equal sized smaller RDDs that can be fed into an extended version of
>> QueueInputDStream.
>>
>> Finally, here is what I came up with:
>>
>> class RDDExtension[T: ClassTag](rdd: RDD[T]) {
>>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
>> slideDurationMilli: Option[Long] = None): DStream[T] = {
>> new InputDStream[T](streamingContext) {
>>
>>   private val iterator = rdd.toLocalIterator // WARNING: each
>> partition much fit in RAM of local machine.
>>   private val grouped = iterator.grouped(chunkSize)
>>
>>   override def start(): Unit = {}
>>
>>   override def stop(): Unit = {}
>>
>>   override def compute(validTime: Time): Option[RDD[T]] = {
>> if (grouped.hasNext) {
>>   Some(rdd.sparkContext.parallelize(grouped.next()))
>> } else {
>>   None
>> }
>>   }
>>
>>   override def slideDuration = {
>> slideDurationMilli.map(duration => new Duration(duration)).
>>   getOrElse(super.slideDuration)
>>   }
>> }
>> }
>>
>> This aims to stream chunkSize elements every slideDurationMilli
>> milliseconds (defaults to batch size in streaming context). It's still not
>> perfect (for example, the streaming is not precise) but given that this
>> will only be used for testing purposes, I don't look for ways to further
>> optimize it.
>>
>> Thanks,
>> Aniket
>>
>>
>>
>> On 2 August 2014 04:07, Mayur Rustagi  wrote:
>>
>>> Nice question :)
>>> Ideally you should use a queuestream interface to push RDD into a queue
>>> & then spark streaming can handle the rest.
>>> Though why are you looking to convert RDD to DStream, another workaround
>>> folks use is to source DStream from folders & move files that they need
>>> reprocessed back into the folder, its a hack but much less headache .
>>>
>>> Mayur Rustagi
>>> Ph: +1 (760) 203 3257
>>> http://www.sigmoidanalytics.com
>>> @mayur_rustagi 
>>>
>>>
>>>
>>> On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar <
>>> aniket.bhatna...@gmail.com> wrote:
>>>
 Hi everyone

 I haven't been receiving replies to my queries in the distribution
 list. Not pissed but I am actually curious to kno

Re: RDD to DStream

2014-08-06 Thread Tathagata Das
Hey Aniket,

Great thoughts! I understand the usecase. But as you have realized yourself
it is not trivial to cleanly stream a RDD as a DStream. Since RDD
operations are defined to be scan based, it is not efficient to define RDD
based on slices of data within a partition of another RDD, using pure RDD
transformations. What you have done is a decent, and probably the only
feasible solution, with its limitations.

Also the requirements of converting a batch of data to a stream of data can
be pretty diverse. What rate, what # of events per batch, how many batches,
is it efficient? Hence, it is not trivial to define a good, clean public
API for that. If any one has any thoughts, ideas, etc on this, you are more
than welcome to share them.

TD


On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> The use case for converting RDD into DStream is that I want to simulate a
> stream from an already persisted data for testing analytics. It is trivial
> to create a RDD from any persisted data but not so much for DStream.
> Therefore, my idea to create DStream from RDD. For example, lets say you
> are trying to implement analytics on time series data using Lambda
> architecture. This means you would have to implement the same analytics on
> streaming data (in streaming mode) as well as persisted data (in batch
> mode). The workflow for implementing the anlytics would be to first
> implement it in batch mode using RDD operations and then simulate stream to
> test the analytics in stream mode. The simulated stream should produce the
> elements at a specified rate. So the solution maybe to read data in a RDD,
> split (chunk) it into multiple RDDs with each RDD having the size of
> elements that need to be streamed per time unit and then finally stream
> each RDD using the compute function.
>
> The problem with using QueueInputDStream is that it will stream data as
> per the batch duration specified in the streaming context and one cannot
> specify a custom slide duration. Moreover, the class QueueInputDStream is
> private to streaming package, so I can't really use it/extend it from an
> external package. Also, I could not find a good solution split a RDD into
> equal sized smaller RDDs that can be fed into an extended version of
> QueueInputDStream.
>
> Finally, here is what I came up with:
>
> class RDDExtension[T: ClassTag](rdd: RDD[T]) {
>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
> slideDurationMilli: Option[Long] = None): DStream[T] = {
> new InputDStream[T](streamingContext) {
>
>   private val iterator = rdd.toLocalIterator // WARNING: each
> partition much fit in RAM of local machine.
>   private val grouped = iterator.grouped(chunkSize)
>
>   override def start(): Unit = {}
>
>   override def stop(): Unit = {}
>
>   override def compute(validTime: Time): Option[RDD[T]] = {
> if (grouped.hasNext) {
>   Some(rdd.sparkContext.parallelize(grouped.next()))
> } else {
>   None
> }
>   }
>
>   override def slideDuration = {
> slideDurationMilli.map(duration => new Duration(duration)).
>   getOrElse(super.slideDuration)
>   }
> }
> }
>
> This aims to stream chunkSize elements every slideDurationMilli
> milliseconds (defaults to batch size in streaming context). It's still not
> perfect (for example, the streaming is not precise) but given that this
> will only be used for testing purposes, I don't look for ways to further
> optimize it.
>
> Thanks,
> Aniket
>
>
>
> On 2 August 2014 04:07, Mayur Rustagi  wrote:
>
>> Nice question :)
>> Ideally you should use a queuestream interface to push RDD into a queue &
>> then spark streaming can handle the rest.
>> Though why are you looking to convert RDD to DStream, another workaround
>> folks use is to source DStream from folders & move files that they need
>> reprocessed back into the folder, its a hack but much less headache .
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi 
>>
>>
>>
>> On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>> Hi everyone
>>>
>>> I haven't been receiving replies to my queries in the distribution list.
>>> Not pissed but I am actually curious to know if my messages are actually
>>> going through or not. Can someone please confirm that my msgs are getting
>>> delivered via this distribution list?
>>>
>>> Thanks,
>>> Aniket
>>>
>>>
>>> On 1 August 2014 13:55, Aniket Bhatnagar 
>>> wrote:
>>>
 Sometimes it is useful to convert a RDD into a DStream for testing
 purposes (generating DStreams from historical data, etc). Is there an easy
 way to do this?

 I could come up with the following inefficient way but no sure if there
 is a better way to achieve this. Thoughts?

 class RDDExtension[T](rdd: RDD[T]) {

   def chunked(chunkSiz

Re: RDD to DStream

2014-08-04 Thread Aniket Bhatnagar
The use case for converting RDD into DStream is that I want to simulate a
stream from an already persisted data for testing analytics. It is trivial
to create a RDD from any persisted data but not so much for DStream.
Therefore, my idea to create DStream from RDD. For example, lets say you
are trying to implement analytics on time series data using Lambda
architecture. This means you would have to implement the same analytics on
streaming data (in streaming mode) as well as persisted data (in batch
mode). The workflow for implementing the anlytics would be to first
implement it in batch mode using RDD operations and then simulate stream to
test the analytics in stream mode. The simulated stream should produce the
elements at a specified rate. So the solution maybe to read data in a RDD,
split (chunk) it into multiple RDDs with each RDD having the size of
elements that need to be streamed per time unit and then finally stream
each RDD using the compute function.

The problem with using QueueInputDStream is that it will stream data as per
the batch duration specified in the streaming context and one cannot
specify a custom slide duration. Moreover, the class QueueInputDStream is
private to streaming package, so I can't really use it/extend it from an
external package. Also, I could not find a good solution split a RDD into
equal sized smaller RDDs that can be fed into an extended version of
QueueInputDStream.

Finally, here is what I came up with:

class RDDExtension[T: ClassTag](rdd: RDD[T]) {
  def toStream(streamingContext: StreamingContext, chunkSize: Int,
slideDurationMilli: Option[Long] = None): DStream[T] = {
new InputDStream[T](streamingContext) {

  private val iterator = rdd.toLocalIterator // WARNING: each partition
much fit in RAM of local machine.
  private val grouped = iterator.grouped(chunkSize)

  override def start(): Unit = {}

  override def stop(): Unit = {}

  override def compute(validTime: Time): Option[RDD[T]] = {
if (grouped.hasNext) {
  Some(rdd.sparkContext.parallelize(grouped.next()))
} else {
  None
}
  }

  override def slideDuration = {
slideDurationMilli.map(duration => new Duration(duration)).
  getOrElse(super.slideDuration)
  }
}
}

This aims to stream chunkSize elements every slideDurationMilli
milliseconds (defaults to batch size in streaming context). It's still not
perfect (for example, the streaming is not precise) but given that this
will only be used for testing purposes, I don't look for ways to further
optimize it.

Thanks,
Aniket



On 2 August 2014 04:07, Mayur Rustagi  wrote:

> Nice question :)
> Ideally you should use a queuestream interface to push RDD into a queue &
> then spark streaming can handle the rest.
> Though why are you looking to convert RDD to DStream, another workaround
> folks use is to source DStream from folders & move files that they need
> reprocessed back into the folder, its a hack but much less headache .
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> Hi everyone
>>
>> I haven't been receiving replies to my queries in the distribution list.
>> Not pissed but I am actually curious to know if my messages are actually
>> going through or not. Can someone please confirm that my msgs are getting
>> delivered via this distribution list?
>>
>> Thanks,
>> Aniket
>>
>>
>> On 1 August 2014 13:55, Aniket Bhatnagar 
>> wrote:
>>
>>> Sometimes it is useful to convert a RDD into a DStream for testing
>>> purposes (generating DStreams from historical data, etc). Is there an easy
>>> way to do this?
>>>
>>> I could come up with the following inefficient way but no sure if there
>>> is a better way to achieve this. Thoughts?
>>>
>>> class RDDExtension[T](rdd: RDD[T]) {
>>>
>>>   def chunked(chunkSize: Int): RDD[Seq[T]] = {
>>> rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
>>>   }
>>>
>>>   def skipFirst(): RDD[T] = {
>>> rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
>>>   }
>>>
>>>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
>>> slideDurationMilli: Option[Long] = None): DStream[T] = {
>>> new InputDStream[T](streamingContext) {
>>>
>>>   @volatile private var currentRDD: RDD[Seq[T]] =
>>> rdd.chunked(chunkSize)
>>>
>>>   override def start(): Unit = {}
>>>
>>>   override def stop(): Unit = {}
>>>
>>>   override def compute(validTime: Time): Option[RDD[T]] = {
>>> val chunk = currentRDD.take(1)
>>> currentRDD = currentRDD.skipFirst()
>>> Some(rdd.sparkContext.parallelize(chunk))
>>>   }
>>>
>>>   override def slideDuration = {
>>> slideDurationMilli.map(duration => new Duration(duration)).
>>>   getOrElse(super.slideDuration)
>>>   }
>>> }
>

Re: RDD to DStream

2014-08-01 Thread Mayur Rustagi
Nice question :)
Ideally you should use a queuestream interface to push RDD into a queue &
then spark streaming can handle the rest.
Though why are you looking to convert RDD to DStream, another workaround
folks use is to source DStream from folders & move files that they need
reprocessed back into the folder, its a hack but much less headache .

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Hi everyone
>
> I haven't been receiving replies to my queries in the distribution list.
> Not pissed but I am actually curious to know if my messages are actually
> going through or not. Can someone please confirm that my msgs are getting
> delivered via this distribution list?
>
> Thanks,
> Aniket
>
>
> On 1 August 2014 13:55, Aniket Bhatnagar 
> wrote:
>
>> Sometimes it is useful to convert a RDD into a DStream for testing
>> purposes (generating DStreams from historical data, etc). Is there an easy
>> way to do this?
>>
>> I could come up with the following inefficient way but no sure if there
>> is a better way to achieve this. Thoughts?
>>
>> class RDDExtension[T](rdd: RDD[T]) {
>>
>>   def chunked(chunkSize: Int): RDD[Seq[T]] = {
>> rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
>>   }
>>
>>   def skipFirst(): RDD[T] = {
>> rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
>>   }
>>
>>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
>> slideDurationMilli: Option[Long] = None): DStream[T] = {
>> new InputDStream[T](streamingContext) {
>>
>>   @volatile private var currentRDD: RDD[Seq[T]] =
>> rdd.chunked(chunkSize)
>>
>>   override def start(): Unit = {}
>>
>>   override def stop(): Unit = {}
>>
>>   override def compute(validTime: Time): Option[RDD[T]] = {
>> val chunk = currentRDD.take(1)
>> currentRDD = currentRDD.skipFirst()
>> Some(rdd.sparkContext.parallelize(chunk))
>>   }
>>
>>   override def slideDuration = {
>> slideDurationMilli.map(duration => new Duration(duration)).
>>   getOrElse(super.slideDuration)
>>   }
>> }
>>
>> }
>>
>
>


Re: RDD to DStream

2014-08-01 Thread Aniket Bhatnagar
Hi everyone

I haven't been receiving replies to my queries in the distribution list.
Not pissed but I am actually curious to know if my messages are actually
going through or not. Can someone please confirm that my msgs are getting
delivered via this distribution list?

Thanks,
Aniket


On 1 August 2014 13:55, Aniket Bhatnagar  wrote:

> Sometimes it is useful to convert a RDD into a DStream for testing
> purposes (generating DStreams from historical data, etc). Is there an easy
> way to do this?
>
> I could come up with the following inefficient way but no sure if there is
> a better way to achieve this. Thoughts?
>
> class RDDExtension[T](rdd: RDD[T]) {
>
>   def chunked(chunkSize: Int): RDD[Seq[T]] = {
> rdd.mapPartitions(partitionItr => partitionItr.grouped(chunkSize))
>   }
>
>   def skipFirst(): RDD[T] = {
> rdd.zipWithIndex().filter(tuple => tuple._2 > 0).map(_._1)
>   }
>
>   def toStream(streamingContext: StreamingContext, chunkSize: Int,
> slideDurationMilli: Option[Long] = None): DStream[T] = {
> new InputDStream[T](streamingContext) {
>
>   @volatile private var currentRDD: RDD[Seq[T]] =
> rdd.chunked(chunkSize)
>
>   override def start(): Unit = {}
>
>   override def stop(): Unit = {}
>
>   override def compute(validTime: Time): Option[RDD[T]] = {
> val chunk = currentRDD.take(1)
> currentRDD = currentRDD.skipFirst()
> Some(rdd.sparkContext.parallelize(chunk))
>   }
>
>   override def slideDuration = {
> slideDurationMilli.map(duration => new Duration(duration)).
>   getOrElse(super.slideDuration)
>   }
> }
>
> }
>