[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:15 PM:
--------------------------------------------------------------------

I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list (so Scala serializes just the 
function with the broadcast variable instead of the whole list), API would be 
compatible I think, but that forces not-to-use case classes + 99% breaks binary 
compatibility :(



was (Author: fsainz):
I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list, API would be compatible I think, 
but that forces not-to-use case classes + 99% breaks binary compatibility :(


> Allow broadcast variables when using isin code
> ----------------------------------------------
>
>                 Key: SPARK-31417
>                 URL: https://issues.apache.org/jira/browse/SPARK-31417
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0, 3.0.0
>            Reporter: Florentino Sainz
>            Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/61111172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to