[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:57 PM:
--------------------------------------------------------------------

Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> ----------------------------------------------
>
>                 Key: SPARK-31417
>                 URL: https://issues.apache.org/jira/browse/SPARK-31417
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0, 3.0.0
>            Reporter: Florentino Sainz
>            Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> Read last comment :)
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/61111172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to