[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florentino Sainz updated SPARK-31417:
-------------------------------------
    Description: 
Read last comment :)

*I would like some help to explore if the following feature makes sense and 
what's the best way to implement it. Allow users to use "isin" (or similar) 
predicates with huge Lists (which might be broadcasted).*

As of now (AFAIK), users can only provide a list/sequence of elements which 
will be sent as part of the "In" predicate, which is part of the task, to the 
executors. So when this list is huge, this causes tasks to be "huge" (specially 
when the task number is big).

I'm coming from 
https://stackoverflow.com/questions/61111172/scala-spark-isin-broadcast-list 
(I'm the creator of the post, and also the 2nd answer). 

I know this is not common, but can be useful for people reading from "servers" 
which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete 
case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 
minute "calculating plan" to send the BIG tasks to the executors) versus 30 
minutes doing a full-scan (even if semi-filtered, most of the data has the same 
nature and thus is hard to filter, unless I use this "200.000" list) on my 
table.

More or less I have a clear view (explained in stackoverflow) on what to modify 
If I wanted to create my own "BroadcastIn" in 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
 however with my limited "idea", anyone who implemented In pushdown would have 
to re-adapt it to the new "BroadcastIn"

I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
for a corporation which uses that version).

Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
current case class "In" while keeping the compatibility with PushDown 
implementations (aka creating a new Predicate not allowed I think), but not 
having only Seq[Expression] in the case class, but also allow 
Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
test-run with a predicate which receives and Broadcast variable and uses the 
value inside, and it works much better.

  was:
*I would like some help to explore if the following feature makes sense and 
what's the best way to implement it. Allow users to use "isin" (or similar) 
predicates with huge Lists (which might be broadcasted).*

As of now (AFAIK), users can only provide a list/sequence of elements which 
will be sent as part of the "In" predicate, which is part of the task, to the 
executors. So when this list is huge, this causes tasks to be "huge" (specially 
when the task number is big).

I'm coming from 
https://stackoverflow.com/questions/61111172/scala-spark-isin-broadcast-list 
(I'm the creator of the post, and also the 2nd answer). 

I know this is not common, but can be useful for people reading from "servers" 
which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete 
case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 
minute "calculating plan" to send the BIG tasks to the executors) versus 30 
minutes doing a full-scan (even if semi-filtered, most of the data has the same 
nature and thus is hard to filter, unless I use this "200.000" list) on my 
table.

More or less I have a clear view (explained in stackoverflow) on what to modify 
If I wanted to create my own "BroadcastIn" in 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
 however with my limited "idea", anyone who implemented In pushdown would have 
to re-adapt it to the new "BroadcastIn"

I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
for a corporation which uses that version).

Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
current case class "In" while keeping the compatibility with PushDown 
implementations (aka creating a new Predicate not allowed I think), but not 
having only Seq[Expression] in the case class, but also allow 
Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
test-run with a predicate which receives and Broadcast variable and uses the 
value inside, and it works much better.


> Allow broadcast variables when using isin code
> ----------------------------------------------
>
>                 Key: SPARK-31417
>                 URL: https://issues.apache.org/jira/browse/SPARK-31417
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0, 3.0.0
>            Reporter: Florentino Sainz
>            Priority: Minor
>
> Read last comment :)
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/61111172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to