[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 9:26 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

*Anyways, I think maybe the best option is to deal with it inside the 
RDD/RelationProvider, when it finds a big list to be pushed-down (as long as 
unhandledFilters makes it disappear), it can just broadcast it and then use it.*




was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> Read last comment :)
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-br

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 9:26 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

*Anyways, I think maybe the best option without changing all the architecture 
is to deal with it inside the RDD/RelationProvider, when it finds a big list to 
be pushed-down (as long as unhandledFilters makes it disappear), it can just 
broadcast it and then use it. Only problem is that this might leave non-cleaned 
broadcast variables* 




was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

*Anyways, I think maybe the best option is to deal with it inside the 
RDD/RelationProvider, when it finds a big list to be pushed-down (as long as 
unhandledFilters makes it disappear), it can just broadcast it and then use it.*



> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> Read last comment :)
> As of now (AFAI

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 9:00 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.



was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> Read last comment :)
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task numbe

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:57 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates wit

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:56 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P ). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predica

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:55 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P ). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> Read last comment :)
> As of now (AFAIK), users can only 

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:54 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> Read last comment :)
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:45 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and reimplemented the relation) just ping 
me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code and reimplemented the relation) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackove

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:45 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and reimplemented the relation) just ping 
me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (speci

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:45 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code and reimplemented the relation) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the ideas (it 
can be injected into Spark, but you have to copy quite a bit of code and 
reimplemented the relation) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:44 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the ideas (it 
can be injected into Spark, but you have to copy quite a bit of code and 
reimplemented the relation) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than 

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:42 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter,

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:42 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently and only access data at compute 
method looks hard). Specially, I don't know if this can be useful for something 
apart from big user-provided "isins" (maybe some join-pushdown-optimization I 
don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (ex

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:41 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode 
 it efficiently and only access data at compute method looks hard). Specially, 
I don't know if this can be useful for something apart from big user-provided 
"isins" (maybe some join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode 
 it efficiently and only access data at compute method looks hard). Specially, 
I don't know if this can be useful for something apart from big user-provided 
"isins" (maybe some join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:41 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently and only access data at compute 
method looks hard). Specially, I don't know if this can be useful for something 
apart from big user-provided "isins" (maybe some join-pushdown-optimization I 
don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode 
 it efficiently and only access data at compute method looks hard). Specially, 
I don't know if this can be useful for something apart from big user-provided 
"isins" (maybe some join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More 

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:40 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode 
 it efficiently and only access data at compute method looks hard). Specially, 
I don't know if this can be useful for something apart from big user-provided 
"isins" (maybe some join-pushdown-optimization I don't realize?) What you guys 
think?*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relation which doesn't seem so easy so the only place where value 
is already accessed is in the compute method). Specially, I don't know if this 
can be useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?) What you guys think?*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard 

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:40 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode 
 it efficiently and only access data at compute method looks hard). Specially, 
I don't know if this can be useful for something apart from big user-provided 
"isins" (maybe some join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode 
 it efficiently and only access data at compute method looks hard). Specially, 
I don't know if this can be useful for something apart from big user-provided 
"isins" (maybe some join-pushdown-optimization I don't realize?) What you guys 
think?*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" lis

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:39 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relation which doesn't seem so easy so the only place where value 
is already accessed is in the compute method). Specially, I don't know if this 
can be useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?) What you guys think?*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) 
What you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in st

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:15 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) 
What you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/fullscan, which takes like 10x 
the time, the amount of data I get from that table fits in memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) 
What you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:15 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) 
What you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) 
What you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/fullscan, which takes like 10x 
the time, the amount of data I get from that table fits in memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/sc

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:12 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) 
What you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 any

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:11 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyo

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:10 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case clas

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:09 PM:


Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :)


was (Author: fsainz):
Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a bit :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:09 PM:


Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter, the amount of data 
I get from that table fits in memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :)

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case 

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:09 PM:


Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter, the amount of data 
I get from that table fits in memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with Pus

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:59 PM:


I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


Just confirmed that all the plans execute in the driver, as expected (this is 
the first time I check "deeply" inside spark sources). Maybe the list can be 
passed as a function returning a list instead of a list (so Scala serializes 
just the function with the broadcast variable instead of the whole list), API 
would be compatible I think, but that forces not-to-use case classes + 99% 
breaks binary compatibility :(, ima try a few things before continuing.



was (Author: fsainz):
I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list (so Scala serializes just the 
function with the broadcast variable instead of the whole list), API would be 
compatible I think, but that forces not-to-use case classes + 99% breaks binary 
compatibility :(


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
>

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:15 PM:


I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list (so Scala serializes just the 
function with the broadcast variable instead of the whole list), API would be 
compatible I think, but that forces not-to-use case classes + 99% breaks binary 
compatibility :(



was (Author: fsainz):
I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list, API would be compatible I think, 
but that forces not-to-use case classes + 99% breaks binary compatibility :(


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:14 PM:


I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list, API would be compatible I think, 
but that forces not-to-use case classes + 99% breaks binary compatibility :(



was (Author: fsainz):
I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list, API would be compatible I think, 
but that forces not-to-use case classes + 99% binary compatibility :(


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:14 PM:


I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list, API would be compatible I think, 
but that forces not-to-use case classes + 99% binary compatibility :(



was (Author: fsainz):
I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case), passing the list as a function returning 
a list, can work and API would be compatible I think, but that breaks case 
classes + 99% binary compatibility :(


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses th

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:13 PM:


I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case), passing the list as a function returning 
a list, can work and API would be compatible I think, but that breaks case 
classes + 99% binary compatibility :(



was (Author: fsainz):
I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)






> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:08 PM:


I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)







was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors which is what I'm trying to check now). I'm basing 
it on InSet (actually delegating everything to the InSet implementation not to 
duplicate code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)






> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:43 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors which is what I'm trying to check now). I'm basing 
it on InSet (actually delegating everything to the InSet implementation not to 
duplicate code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)







was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors which is what I'm trying to check now). I'm basing 
it on InSet (actually delegating everything to the InSet implementation not to 
duplicate code) and adding a new method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown, at least If I want to reuse the current In pushdown)






> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:41 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors which is what I'm trying to check now). I'm basing 
it on InSet (actually delegating everything to the InSet implementation not to 
duplicate code) and adding a new method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown, at least If I want to reuse the current In pushdown)







was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown, at least If I want to reuse the current In pushdown)






> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:40 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown, at least If I want to reuse the current In pushdown)







was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown, at least If I use the current In pushdown)






> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:38 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown)







was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.







> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:38 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown, at least If I use the current In pushdown)







was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown)






> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:26 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.








was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :). I'm basing it on InSet (actually delegating everything 
to the InSet implementation not to duplicate code) and adding a new method to 
the Column isInBroadcastCollection.







> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:08 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :). I'm basing it on InSet (actually delegating everything 
to the InSet implementation not to duplicate code) and adding a new method to 
the Column isInBroadcastCollection.








was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :). I'm basing it on InSet (actually delegating everything 
to the InSet implementation) and adding a new method to the Column 
isInBroadcastCollection.







> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org