[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz commented on SPARK-31417:
------------------------------------------

Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a bit :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


> Allow broadcast variables when using isin code
> ----------------------------------------------
>
>                 Key: SPARK-31417
>                 URL: https://issues.apache.org/jira/browse/SPARK-31417
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0, 3.0.0
>            Reporter: Florentino Sainz
>            Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/61111172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to