Re: RDD.broadcast

2016-04-28 Thread Reynold Xin
This is a nice feature in broadcast join. It is just a little bit
complicated to do and as a result hasn't been prioritized as highly yet.


On Thu, Apr 28, 2016 at 5:51 AM, <ioannis.deligian...@nomura.com> wrote:

> I was aiming to show the operations with pseudo-code, but I apparently
> failed, so Java it is J
>
> Assume the following 3 datasets on HDFS.
>
> 1.   RDD1: User (1 Million rows – 2GB ) Columns: uid, locationId,
> (extra stuff)
>
> 2.   RDD2: Actions (1 Billion rows – 500GB) Columns: uid_1, uid_2
> (extra stuff)
>
> 3.   RDD3: Locations (10 Millions rows – 10GB) Columns: locationId,
> (extra stuff)
>
>
>
> So, what I am doing is shown below.
>
> I assume that it is clear of why using a “join” would be really
> inefficient, but if not have a look of some of the reasons here
> <http://stackoverflow.com/questions/30412325/hoes-does-spark-schedule-a-join/30490619#30490619>
> .
>
>
>
> //1 Filter actions
>
>  > JavaRDD actionRdd = RDD2.filter(f->….); //300GB
>
>
>
> //2 Filter users & Broadcast
>
> > JavaRDD userRdd =  RDD1.filter(f->…); //1GB
>
> > Map m = userRdd.keyBy(f->g.getUid()).collectAsMap();
>
> > Broadcast userMap = sparkCtx.broadcast(m);//1GB
>
>
>
> //3 Filter locations & Broadcast
>
> > JavaPairRDD locations = RDD3.filter(f->….).keyBy(f->g.getLocationId());
>
> > Broadcast locationMap =
> sparkCtx.broadcast(locations.collectAsMap());//1GB
>
>
>
> //4 Process using data from all 3 data sets
>
> > actionRdd.map(f-> {
>
> User u1 = userMap.value.get( f.getUid1());
>
> User u2 = userMap.value.get( f.getUid2());
>
> Location l= locationMap.value.get(u.getLocationId();
>
> Object result = method(f,u1,u2,l);//method implementation not important,
> but requires all 3 objects
>
> return result;
>
>   });
>
>
>
>
>
> *From:* Marcin Tustin [mailto:mtus...@handybook.com]
> *Sent:* 28 April 2016 12:27
>
> *To:* Deligiannis, Ioannis (UK)
> *Cc:* dev@spark.apache.org
> *Subject:* Re: RDD.broadcast
>
>
>
> I don't know what your notation really means. I'm very much unclear on why
> you can't use the filter method for 1. If you're talking about
> splitting/bucketing rather filtering as such I think that is a specific
> lacuna in spark's Api.
>
>
>
> I've generally found the join api to be entirely adequate for my needs, so
> I don't really have a comment on 2.
>
>
> On Thursday, April 28, 2016, <ioannis.deligian...@nomura.com> wrote:
>
> One example pattern we have it doing joins or filters based on two
> datasets. E.g.
>
> 1 Filter –multiple- RddB for a given set extracted from RddA
> (keyword here is multiple times)
>
> a.   RddA -> keyBy -> distinct -> collect() to Set A;
>
> b.  RddB -> Filter using Set A;
>
> 2 “Join” using composition on executor (again multiple times)
>
> a.   RddA -> filter by XYZ -> keyBy join attribute -> collectAsMap
> ->Broadcast MapA;
>
> b.  RddB -> map (Broadcast<Map<K,V>> MapA;
>
>
>
> The first use case might not be that common, but joining a large RDD with
> a small (reference) RDD is quite common and much faster than using “join”
> method.
>
>
>
>
>
> *From:* Marcin Tustin [mailto:mtus...@handybook.com]
> *Sent:* 28 April 2016 12:08
> *To:* Deligiannis, Ioannis (UK)
> *Cc:* dev@spark.apache.org
> *Subject:* Re: RDD.broadcast
>
>
>
> Why would you ever need to do this? I'm genuinely curious. I view collects
> as being solely for interactive work.
>
> On Thursday, April 28, 2016, <ioannis.deligian...@nomura.com> wrote:
>
> Hi,
>
>
>
> It is a common pattern to process an RDD, collect (typically a subset) to
> the driver and then broadcast back.
>
>
>
> Adding an RDD method that can do that using the torrent broadcast
> mechanics would be much more efficient. In addition, it would not require
> the Driver to also utilize its Heap holding this broadcast.
>
>
>
> I guess this can become complicated if the resulting broadcast is required
> to keep lineage information, but assuming a torrent distribution, once the
> broadcast is synced then lineage would not be required. I’d also expect the
> call to rdd.brodcast to be an action that eagerly distributes the broadcast
> and returns when the operation has succeeded.
>
>
>
> Is this something that could be implemented or are there any reasons that
> prohibits this?
>
>
>
> Thanks
>
> Ioannis
>
>
>
> This e-mail (including any attachments) is private and confidentia

RE: RDD.broadcast

2016-04-28 Thread Ioannis.Deligiannis
I was aiming to show the operations with pseudo-code, but I apparently failed, 
so Java it is ☺
Assume the following 3 datasets on HDFS.

1.   RDD1: User (1 Million rows – 2GB ) Columns: uid, locationId, (extra 
stuff)

2.   RDD2: Actions (1 Billion rows – 500GB) Columns: uid_1, uid_2 (extra 
stuff)

3.   RDD3: Locations (10 Millions rows – 10GB) Columns: locationId, (extra 
stuff)

So, what I am doing is shown below.
I assume that it is clear of why using a “join” would be really inefficient, 
but if not have a look of some of the reasons 
here<http://stackoverflow.com/questions/30412325/hoes-does-spark-schedule-a-join/30490619#30490619>
 .

//1 Filter actions
 > JavaRDD actionRdd = RDD2.filter(f->….); //300GB

//2 Filter users & Broadcast
> JavaRDD userRdd =  RDD1.filter(f->…); //1GB
> Map m = userRdd.keyBy(f->g.getUid()).collectAsMap();
> Broadcast userMap = sparkCtx.broadcast(m);//1GB

//3 Filter locations & Broadcast
> JavaPairRDD locations = RDD3.filter(f->….).keyBy(f->g.getLocationId());
> Broadcast locationMap = 
> sparkCtx.broadcast(locations.collectAsMap());//1GB

//4 Process using data from all 3 data sets
> actionRdd.map(f-> {
User u1 = userMap.value.get( f.getUid1());
User u2 = userMap.value.get( f.getUid2());
Location l= locationMap.value.get(u.getLocationId();
Object result = method(f,u1,u2,l);//method implementation not important, but 
requires all 3 objects
return result;
  });


From: Marcin Tustin [mailto:mtus...@handybook.com]
Sent: 28 April 2016 12:27
To: Deligiannis, Ioannis (UK)
Cc: dev@spark.apache.org
Subject: Re: RDD.broadcast

I don't know what your notation really means. I'm very much unclear on why you 
can't use the filter method for 1. If you're talking about splitting/bucketing 
rather filtering as such I think that is a specific lacuna in spark's Api.

I've generally found the join api to be entirely adequate for my needs, so I 
don't really have a comment on 2.

On Thursday, April 28, 2016, 
<ioannis.deligian...@nomura.com<mailto:ioannis.deligian...@nomura.com>> wrote:
One example pattern we have it doing joins or filters based on two datasets. 
E.g.

1 Filter –multiple- RddB for a given set extracted from RddA (keyword 
here is multiple times)

a.   RddA -> keyBy -> distinct -> collect() to Set A;

b.  RddB -> Filter using Set A;

2 “Join” using composition on executor (again multiple times)

a.   RddA -> filter by XYZ -> keyBy join attribute -> collectAsMap 
->Broadcast MapA;

b.  RddB -> map (Broadcast<Map<K,V>> MapA;


The first use case might not be that common, but joining a large RDD with a 
small (reference) RDD is quite common and much faster than using “join” method.


From: Marcin Tustin 
[mailto:mtus...@handybook.com<javascript:_e(%7B%7D,'cvml','mtus...@handybook.com');>]
Sent: 28 April 2016 12:08
To: Deligiannis, Ioannis (UK)
Cc: dev@spark.apache.org<javascript:_e(%7B%7D,'cvml','dev@spark.apache.org');>
Subject: Re: RDD.broadcast

Why would you ever need to do this? I'm genuinely curious. I view collects as 
being solely for interactive work.

On Thursday, April 28, 2016, 
<ioannis.deligian...@nomura.com<javascript:_e(%7B%7D,'cvml','ioannis.deligian...@nomura.com');>>
 wrote:
Hi,

It is a common pattern to process an RDD, collect (typically a subset) to the 
driver and then broadcast back.

Adding an RDD method that can do that using the torrent broadcast mechanics 
would be much more efficient. In addition, it would not require the Driver to 
also utilize its Heap holding this broadcast.

I guess this can become complicated if the resulting broadcast is required to 
keep lineage information, but assuming a torrent distribution, once the 
broadcast is synced then lineage would not be required. I’d also expect the 
call to rdd.brodcast to be an action that eagerly distributes the broadcast and 
returns when the operation has succeeded.

Is this something that could be implemented or are there any reasons that 
prohibits this?

Thanks
Ioannis

This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its

Re: RDD.broadcast

2016-04-28 Thread Marcin Tustin
I don't know what your notation really means. I'm very much unclear on why
you can't use the filter method for 1. If you're talking about
splitting/bucketing rather filtering as such I think that is a specific
lacuna in spark's Api.

I've generally found the join api to be entirely adequate for my needs, so
I don't really have a comment on 2.

On Thursday, April 28, 2016, <ioannis.deligian...@nomura.com> wrote:

> One example pattern we have it doing joins or filters based on two
> datasets. E.g.
>
> 1 Filter –multiple- RddB for a given set extracted from RddA
> (keyword here is multiple times)
>
> a.   RddA -> keyBy -> distinct -> collect() to Set A;
>
> b.  RddB -> Filter using Set A;
>
> 2 “Join” using composition on executor (again multiple times)
>
> a.   RddA -> filter by XYZ -> keyBy join attribute -> collectAsMap
> ->Broadcast MapA;
>
> b.  RddB -> map (Broadcast<Map<K,V>> MapA;
>
>
>
> The first use case might not be that common, but joining a large RDD with
> a small (reference) RDD is quite common and much faster than using “join”
> method.
>
>
>
>
>
> *From:* Marcin Tustin [mailto:mtus...@handybook.com
> <javascript:_e(%7B%7D,'cvml','mtus...@handybook.com');>]
> *Sent:* 28 April 2016 12:08
> *To:* Deligiannis, Ioannis (UK)
> *Cc:* dev@spark.apache.org
> <javascript:_e(%7B%7D,'cvml','dev@spark.apache.org');>
> *Subject:* Re: RDD.broadcast
>
>
>
> Why would you ever need to do this? I'm genuinely curious. I view collects
> as being solely for interactive work.
>
> On Thursday, April 28, 2016, <ioannis.deligian...@nomura.com
> <javascript:_e(%7B%7D,'cvml','ioannis.deligian...@nomura.com');>> wrote:
>
> Hi,
>
>
>
> It is a common pattern to process an RDD, collect (typically a subset) to
> the driver and then broadcast back.
>
>
>
> Adding an RDD method that can do that using the torrent broadcast
> mechanics would be much more efficient. In addition, it would not require
> the Driver to also utilize its Heap holding this broadcast.
>
>
>
> I guess this can become complicated if the resulting broadcast is required
> to keep lineage information, but assuming a torrent distribution, once the
> broadcast is synced then lineage would not be required. I’d also expect the
> call to rdd.brodcast to be an action that eagerly distributes the broadcast
> and returns when the operation has succeeded.
>
>
>
> Is this something that could be implemented or are there any reasons that
> prohibits this?
>
>
>
> Thanks
>
> Ioannis
>
>
>
> This e-mail (including any attachments) is private and confidential, may
> contain proprietary or privileged information and is intended for the named
> recipient(s) only. Unintended recipients are strictly prohibited from
> taking action on the basis of information in this e-mail and must contact
> the sender immediately, delete this e-mail (and all attachments) and
> destroy any hard copies. Nomura will not accept responsibility or liability
> for the accuracy or completeness of, or the presence of any virus or
> disabling code in, this e-mail. If verification is sought please request a
> hard copy. Any reference to the terms of executed transactions should be
> treated as preliminary only and subject to formal written confirmation by
> Nomura. Nomura reserves the right to retain, monitor and intercept e-mail
> communications through its networks (subject to and in accordance with
> applicable laws). No confidentiality or privilege is waived or lost by
> Nomura by any mistransmission of this e-mail. Any reference to "Nomura" is
> a reference to any entity in the Nomura Holdings, Inc. group. Please read
> our Electronic Communications Legal Notice which forms part of this e-mail:
> http://www.Nomura.com/email_disclaimer.htm
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__www.Nomura.com_email-5Fdisclaimer.htm=CwMFaQ=dCBwIlVXJsYZrY6gpNt0LA=B8E4n9FrSS85mPCi6Mfs7cyEPQnVrpcQ1zeB-JKws6A=GAA5LZhuKEWXxozKzXPhWAYY4BSTpcXaf2lFg5JSPB0=SLnOgTBJ2zAlhtvjcFRXfqUArds-4HSAZCgFXLgMCVY=>
>
>
>
> Want to work at Handy? Check out our culture deck and open roles
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__www.handy.com_careers=CwMFaQ=dCBwIlVXJsYZrY6gpNt0LA=B8E4n9FrSS85mPCi6Mfs7cyEPQnVrpcQ1zeB-JKws6A=GAA5LZhuKEWXxozKzXPhWAYY4BSTpcXaf2lFg5JSPB0=WgDnCrSGv_qt66f2cabjugmMGU46gc5rSkt_gm7lEkQ=>
>
> Latest news
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__www.handy.com_press=CwMFaQ=dCBwIlVXJsYZrY6gpNt0LA=B8E4n9FrSS85mPCi6Mfs7cyEPQnVrpcQ1zeB-JKws6A=GAA5LZhuKEWXxozKzXPhWAYY4BSTpcXaf2lFg5JSPB0=rfQxr8cDwVFK7Mql1_HdnvqAmXeiOHZgnjNtKXGn_Kg=>
>  at
&g

Re: RDD.broadcast

2016-04-28 Thread Mike Hynes
I second knowing the use case for interest. I can imagine a case where
knowledge of the RDD key distribution would help local computations, for
relaticely few keys, but would be interested to hear your motive.

Essentially, are you trying to achieve what would be an all-reduce type
operation in MPI?
On Apr 28, 2016 9:08 PM, "Marcin Tustin"  wrote:

> Why would you ever need to do this? I'm genuinely curious. I view
> collects as being solely for interactive work.
>
> On Thursday, April 28, 2016,  wrote:
>
>> Hi,
>>
>>
>>
>> It is a common pattern to process an RDD, collect (typically a subset) to
>> the driver and then broadcast back.
>>
>>
>>
>> Adding an RDD method that can do that using the torrent broadcast
>> mechanics would be much more efficient. In addition, it would not require
>> the Driver to also utilize its Heap holding this broadcast.
>>
>>
>>
>> I guess this can become complicated if the resulting broadcast is
>> required to keep lineage information, but assuming a torrent distribution,
>> once the broadcast is synced then lineage would not be required. I’d also
>> expect the call to rdd.brodcast to be an action that eagerly distributes
>> the broadcast and returns when the operation has succeeded.
>>
>>
>>
>> Is this something that could be implemented or are there any reasons that
>> prohibits this?
>>
>>
>>
>> Thanks
>>
>> Ioannis
>>
>> This e-mail (including any attachments) is private and confidential, may
>> contain proprietary or privileged information and is intended for the named
>> recipient(s) only. Unintended recipients are strictly prohibited from
>> taking action on the basis of information in this e-mail and must contact
>> the sender immediately, delete this e-mail (and all attachments) and
>> destroy any hard copies. Nomura will not accept responsibility or liability
>> for the accuracy or completeness of, or the presence of any virus or
>> disabling code in, this e-mail. If verification is sought please request a
>> hard copy. Any reference to the terms of executed transactions should be
>> treated as preliminary only and subject to formal written confirmation by
>> Nomura. Nomura reserves the right to retain, monitor and intercept e-mail
>> communications through its networks (subject to and in accordance with
>> applicable laws). No confidentiality or privilege is waived or lost by
>> Nomura by any mistransmission of this e-mail. Any reference to "Nomura" is
>> a reference to any entity in the Nomura Holdings, Inc. group. Please read
>> our Electronic Communications Legal Notice which forms part of this e-mail:
>> http://www.Nomura.com/email_disclaimer.htm
>>
>
> Want to work at Handy? Check out our culture deck and open roles
> 
> Latest news  at Handy
> Handy just raised $50m
> 
>  led
> by Fidelity
>
>


Re: RDD.broadcast

2016-04-28 Thread Marcin Tustin
Why would you ever need to do this? I'm genuinely curious. I view collects
as being solely for interactive work.

On Thursday, April 28, 2016,  wrote:

> Hi,
>
>
>
> It is a common pattern to process an RDD, collect (typically a subset) to
> the driver and then broadcast back.
>
>
>
> Adding an RDD method that can do that using the torrent broadcast
> mechanics would be much more efficient. In addition, it would not require
> the Driver to also utilize its Heap holding this broadcast.
>
>
>
> I guess this can become complicated if the resulting broadcast is required
> to keep lineage information, but assuming a torrent distribution, once the
> broadcast is synced then lineage would not be required. I’d also expect the
> call to rdd.brodcast to be an action that eagerly distributes the broadcast
> and returns when the operation has succeeded.
>
>
>
> Is this something that could be implemented or are there any reasons that
> prohibits this?
>
>
>
> Thanks
>
> Ioannis
>
> This e-mail (including any attachments) is private and confidential, may
> contain proprietary or privileged information and is intended for the named
> recipient(s) only. Unintended recipients are strictly prohibited from
> taking action on the basis of information in this e-mail and must contact
> the sender immediately, delete this e-mail (and all attachments) and
> destroy any hard copies. Nomura will not accept responsibility or liability
> for the accuracy or completeness of, or the presence of any virus or
> disabling code in, this e-mail. If verification is sought please request a
> hard copy. Any reference to the terms of executed transactions should be
> treated as preliminary only and subject to formal written confirmation by
> Nomura. Nomura reserves the right to retain, monitor and intercept e-mail
> communications through its networks (subject to and in accordance with
> applicable laws). No confidentiality or privilege is waived or lost by
> Nomura by any mistransmission of this e-mail. Any reference to "Nomura" is
> a reference to any entity in the Nomura Holdings, Inc. group. Please read
> our Electronic Communications Legal Notice which forms part of this e-mail:
> http://www.Nomura.com/email_disclaimer.htm
>

-- 
Want to work at Handy? Check out our culture deck and open roles 

Latest news  at Handy
Handy just raised $50m 

 led 
by Fidelity



RDD.broadcast

2016-04-28 Thread Ioannis.Deligiannis
Hi,

It is a common pattern to process an RDD, collect (typically a subset) to the 
driver and then broadcast back.

Adding an RDD method that can do that using the torrent broadcast mechanics 
would be much more efficient. In addition, it would not require the Driver to 
also utilize its Heap holding this broadcast.

I guess this can become complicated if the resulting broadcast is required to 
keep lineage information, but assuming a torrent distribution, once the 
broadcast is synced then lineage would not be required. I’d also expect the 
call to rdd.brodcast to be an action that eagerly distributes the broadcast and 
returns when the operation has succeeded.

Is this something that could be implemented or are there any reasons that 
prohibits this?

Thanks
Ioannis


This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its networks 
(subject to and in accordance with applicable laws). No confidentiality or 
privilege is waived or lost by Nomura by any mistransmission of this e-mail. 
Any reference to "Nomura" is a reference to any entity in the Nomura Holdings, 
Inc. group. Please read our Electronic Communications Legal Notice which forms 
part of this e-mail: http://www.Nomura.com/email_disclaimer.htm