[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 9:26 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. *Anyways, I think maybe the best option is to deal with it inside the RDD/RelationProvider, when it finds a big list to be pushed-down (as long as unhandledFilters makes it disappear), it can just broadcast it and then use it.* was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > Read last comment :) > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-br
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 9:26 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. *Anyways, I think maybe the best option without changing all the architecture is to deal with it inside the RDD/RelationProvider, when it finds a big list to be pushed-down (as long as unhandledFilters makes it disappear), it can just broadcast it and then use it. Only problem is that this might leave non-cleaned broadcast variables* was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. *Anyways, I think maybe the best option is to deal with it inside the RDD/RelationProvider, when it finds a big list to be pushed-down (as long as unhandledFilters makes it disappear), it can just broadcast it and then use it.* > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > Read last comment :) > As of now (AFAI
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 9:00 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > Read last comment :) > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task numbe
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:57 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates wit
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:56 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predica
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:55 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > Read last comment :) > As of now (AFAIK), users can only
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:54 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > Read last comment :) > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:45 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and reimplemented the relation) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code and reimplemented the relation) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackove
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:45 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and reimplemented the relation) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (speci
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:45 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code and reimplemented the relation) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas (it can be injected into Spark, but you have to copy quite a bit of code and reimplemented the relation) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:44 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas (it can be injected into Spark, but you have to copy quite a bit of code and reimplemented the relation) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:42 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter,
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:42 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (ex
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:41 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:41 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:40 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think?* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relation which doesn't seem so easy so the only place where value is already accessed is in the compute method). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think?* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:40 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think?* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" lis
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:39 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relation which doesn't seem so easy so the only place where value is already accessed is in the compute method). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think?* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in st
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:15 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/fullscan, which takes like 10x the time, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:15 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/fullscan, which takes like 10x the time, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/sc
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:12 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 any
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:11 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyo
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:10 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case clas
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:09 PM: Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) was (Author: fsainz): Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a bit :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:09 PM: Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:09 PM: Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with Pus
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:59 PM: I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) Just confirmed that all the plans execute in the driver, as expected (this is the first time I check "deeply" inside spark sources). Maybe the list can be passed as a function returning a list instead of a list (so Scala serializes just the function with the broadcast variable instead of the whole list), API would be compatible I think, but that forces not-to-use case classes + 99% breaks binary compatibility :(, ima try a few things before continuing. was (Author: fsainz): I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case). Maybe the list can be passed as a function returning a list instead of a list (so Scala serializes just the function with the broadcast variable instead of the whole list), API would be compatible I think, but that forces not-to-use case classes + 99% breaks binary compatibility :( > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown >
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:15 PM: I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case). Maybe the list can be passed as a function returning a list instead of a list (so Scala serializes just the function with the broadcast variable instead of the whole list), API would be compatible I think, but that forces not-to-use case classes + 99% breaks binary compatibility :( was (Author: fsainz): I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case). Maybe the list can be passed as a function returning a list instead of a list, API would be compatible I think, but that forces not-to-use case classes + 99% breaks binary compatibility :( > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:14 PM: I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case). Maybe the list can be passed as a function returning a list instead of a list, API would be compatible I think, but that forces not-to-use case classes + 99% breaks binary compatibility :( was (Author: fsainz): I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case). Maybe the list can be passed as a function returning a list instead of a list, API would be compatible I think, but that forces not-to-use case classes + 99% binary compatibility :( > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:14 PM: I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case). Maybe the list can be passed as a function returning a list instead of a list, API would be compatible I think, but that forces not-to-use case classes + 99% binary compatibility :( was (Author: fsainz): I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case), passing the list as a function returning a list, can work and API would be compatible I think, but that breaks case classes + 99% binary compatibility :( > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses th
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:13 PM: I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case), passing the list as a function returning a list, can work and API would be compatible I think, but that breaks case classes + 99% binary compatibility :( was (Author: fsainz): I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:08 PM: I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:43 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:41 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:40 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I use the current In pushdown) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:38 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown) was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:38 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I use the current In pushdown) was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:26 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:08 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :). I'm basing it on InSet (actually delegating everything to the InSet implementation) and adding a new method to the Column isInBroadcastCollection. > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org