[jira] [Created] (SPARK-31477) Dump codegen and compile time in BenchmarkQueryTest
Xiao Li created SPARK-31477: --- Summary: Dump codegen and compile time in BenchmarkQueryTest Key: SPARK-31477 URL: https://issues.apache.org/jira/browse/SPARK-31477 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Xiao Li Assignee: Xiao Li nice to measure the codegen and compilation time costs in TPC-DS queries -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31459) When using the insert overwrite directory syntax, if the target path is an existing file, the final run result is incorrect
[ https://issues.apache.org/jira/browse/SPARK-31459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-31459: -- Affects Version/s: (was: 3) 3.0.0 > When using the insert overwrite directory syntax, if the target path is an > existing file, the final run result is incorrect > --- > > Key: SPARK-31459 > URL: https://issues.apache.org/jira/browse/SPARK-31459 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.5, 3.0.0 > Environment: spark2.4.5 >Reporter: mcdull_zhang >Priority: Major > Labels: sql > > When using the insert overwrite directory syntax, if the target path is an > existing file, the final operation result is incorrect. > At present, Spark will not delete the existing files. After the calculation > is completed, one of the result files will be renamed to the result path. > This is different from hive's behavior. Hive will delete the existing target > file. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-31390) Document Window Function
[ https://issues.apache.org/jira/browse/SPARK-31390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Takeshi Yamamuro resolved SPARK-31390. -- Fix Version/s: 3.0.0 Assignee: Huaxin Gao Resolution: Fixed Resolved by https://github.com/apache/spark/pull/28220 > Document Window Function > > > Key: SPARK-31390 > URL: https://issues.apache.org/jira/browse/SPARK-31390 > Project: Spark > Issue Type: Sub-task > Components: Documentation, SQL >Affects Versions: 3.0.0 >Reporter: Huaxin Gao >Assignee: Huaxin Gao >Priority: Major > Fix For: 3.0.0 > > > Document Window Function -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31476) Add an ExpressionInfo entry for EXTRACT
[ https://issues.apache.org/jira/browse/SPARK-31476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Takeshi Yamamuro updated SPARK-31476: - Description: This ticket intends to add an ExpressionInfo entry for EXTRACT for better documentations. > Add an ExpressionInfo entry for EXTRACT > --- > > Key: SPARK-31476 > URL: https://issues.apache.org/jira/browse/SPARK-31476 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Takeshi Yamamuro >Priority: Major > > This ticket intends to add an ExpressionInfo entry for EXTRACT for better > documentations. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31476) Add an ExpressionInfo entry for EXTRACT
Takeshi Yamamuro created SPARK-31476: Summary: Add an ExpressionInfo entry for EXTRACT Key: SPARK-31476 URL: https://issues.apache.org/jira/browse/SPARK-31476 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Takeshi Yamamuro -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29302) dynamic partition overwrite with speculation enabled
[ https://issues.apache.org/jira/browse/SPARK-29302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086124#comment-17086124 ] Mridul Muralidharan commented on SPARK-29302: - I agree with [~feiwang], it looks like newTaskTempFile is not robust to speculative execution and task failures when dynamicPartitionOverwrite is enabled IMO. This will need to be fixed - it is currently using the same path irrespective of which attempt it is. > dynamic partition overwrite with speculation enabled > > > Key: SPARK-29302 > URL: https://issues.apache.org/jira/browse/SPARK-29302 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4 >Reporter: feiwang >Priority: Major > Attachments: screenshot-1.png, screenshot-2.png > > > Now, for a dynamic partition overwrite operation, the filename of a task > output is determinable. > So, if speculation is enabled, would a task conflict with its relative > speculation task? > Would the two tasks concurrent write a same file? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-29302) dynamic partition overwrite with speculation enabled
[ https://issues.apache.org/jira/browse/SPARK-29302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated SPARK-29302: Comment: was deleted (was: Drive by observations: * Speculative execution does not run the speculative task concurrently on the same node running the task (so definitely not same executor). * Do we have additional details about the tasks which are failing ? Did they fail before (for unrelated reasons) before failing for path conflict ? ) > dynamic partition overwrite with speculation enabled > > > Key: SPARK-29302 > URL: https://issues.apache.org/jira/browse/SPARK-29302 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4 >Reporter: feiwang >Priority: Major > Attachments: screenshot-1.png, screenshot-2.png > > > Now, for a dynamic partition overwrite operation, the filename of a task > output is determinable. > So, if speculation is enabled, would a task conflict with its relative > speculation task? > Would the two tasks concurrent write a same file? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29302) dynamic partition overwrite with speculation enabled
[ https://issues.apache.org/jira/browse/SPARK-29302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086117#comment-17086117 ] Mridul Muralidharan commented on SPARK-29302: - Drive by observations: * Speculative execution does not run the speculative task concurrently on the same node running the task (so definitely not same executor). * Do we have additional details about the tasks which are failing ? Did they fail before (for unrelated reasons) before failing for path conflict ? > dynamic partition overwrite with speculation enabled > > > Key: SPARK-29302 > URL: https://issues.apache.org/jira/browse/SPARK-29302 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4 >Reporter: feiwang >Priority: Major > Attachments: screenshot-1.png, screenshot-2.png > > > Now, for a dynamic partition overwrite operation, the filename of a task > output is determinable. > So, if speculation is enabled, would a task conflict with its relative > speculation task? > Would the two tasks concurrent write a same file? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31420) Infinite timeline redraw in job details page
[ https://issues.apache.org/jira/browse/SPARK-31420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-31420: -- Fix Version/s: (was: 3.1.0) (was: 2.4.5) 2.4.6 > Infinite timeline redraw in job details page > > > Key: SPARK-31420 > URL: https://issues.apache.org/jira/browse/SPARK-31420 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.4, 2.4.5, 3.0.0, 3.1.0 >Reporter: Gengliang Wang >Assignee: Kousuke Saruta >Priority: Major > Fix For: 3.0.0, 2.4.6 > > Attachments: timeline.mov > > > In the job page, the timeline section keeps changing the position style and > shaking. We can see that there is a warning "infinite loop in redraw" from > the console, which can be related to > https://github.com/visjs/vis-timeline/issues/17 > I am using the history server with the events under > "core/src/test/resources/spark-events" to reproduce. > I have also uploaded a screen recording. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Florentino Sainz updated SPARK-31417: - Description: *I would like some help to explore if the following feature makes sense and what's the best way to implement it. Allow users to use "isin" (or similar) predicates with huge Lists (which might be broadcasted).* *Abandoning, I think maybe the best option without changing all the architecture is to deal with it inside the RDD/RelationProvider, when it finds a big list to be pushed-down (as long as unhandled Filters makes it disappear), it can just broadcast it and then use it. Only problem is that this might leave non-cleaned broadcast variables, unless there's a way (or someone implements it) to cleanup such broadcasts after an action.* Read last comment for more info :) As of now (AFAIK), users can only provide a list/sequence of elements which will be sent as part of the "In" predicate, which is part of the task, to the executors. So when this list is huge, this causes tasks to be "huge" (specially when the task number is big). I'm coming from https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list (I'm the creator of the post, and also the 2nd answer). I know this is not common, but can be useful for people reading from "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the executors) versus 30 minutes doing a full-scan (even if semi-filtered, most of the data has the same nature and thus is hard to filter, unless I use this "200.000" list) on my table. More or less I have a clear view (explained in stackoverflow) on what to modify If I wanted to create my own "BroadcastIn" in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, however with my limited "idea", anyone who implemented In pushdown would have to re-adapt it to the new "BroadcastIn" I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working for a corporation which uses that version). Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend current case class "In" while keeping the compatibility with PushDown implementations (aka creating a new Predicate not allowed I think), but not having only Seq[Expression] in the case class, but also allow Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a test-run with a predicate which receives and Broadcast variable and uses the value inside, and it works much better. was: *I would like some help to explore if the following feature makes sense and what's the best way to implement it. Allow users to use "isin" (or similar) predicates with huge Lists (which might be broadcasted).* Abandoning, I think maybe the best option without changing all the architecture is to deal with it inside the RDD/RelationProvider, when it finds a big list to be pushed-down (as long as unhandledFilters makes it disappear), it can just broadcast it and then use it. Only problem is that this might leave non-cleaned broadcast variables Read last comment for more info :) As of now (AFAIK), users can only provide a list/sequence of elements which will be sent as part of the "In" predicate, which is part of the task, to the executors. So when this list is huge, this causes tasks to be "huge" (specially when the task number is big). I'm coming from https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list (I'm the creator of the post, and also the 2nd answer). I know this is not common, but can be useful for people reading from "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the executors) versus 30 minutes doing a full-scan (even if semi-filtered, most of the data has the same nature and thus is hard to filter, unless I use this "200.000" list) on my table. More or less I have a clear view (explained in stackoverflow) on what to modify If I wanted to create my own "BroadcastIn" in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, however with my limited "idea", anyone who implemented In pushdown would have to re-adapt it to the new "BroadcastIn" I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working for a corporation w
[jira] [Updated] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Florentino Sainz updated SPARK-31417: - Description: *I would like some help to explore if the following feature makes sense and what's the best way to implement it. Allow users to use "isin" (or similar) predicates with huge Lists (which might be broadcasted).* Abandoning, I think maybe the best option without changing all the architecture is to deal with it inside the RDD/RelationProvider, when it finds a big list to be pushed-down (as long as unhandledFilters makes it disappear), it can just broadcast it and then use it. Only problem is that this might leave non-cleaned broadcast variables Read last comment for more info :) As of now (AFAIK), users can only provide a list/sequence of elements which will be sent as part of the "In" predicate, which is part of the task, to the executors. So when this list is huge, this causes tasks to be "huge" (specially when the task number is big). I'm coming from https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list (I'm the creator of the post, and also the 2nd answer). I know this is not common, but can be useful for people reading from "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the executors) versus 30 minutes doing a full-scan (even if semi-filtered, most of the data has the same nature and thus is hard to filter, unless I use this "200.000" list) on my table. More or less I have a clear view (explained in stackoverflow) on what to modify If I wanted to create my own "BroadcastIn" in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, however with my limited "idea", anyone who implemented In pushdown would have to re-adapt it to the new "BroadcastIn" I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working for a corporation which uses that version). Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend current case class "In" while keeping the compatibility with PushDown implementations (aka creating a new Predicate not allowed I think), but not having only Seq[Expression] in the case class, but also allow Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a test-run with a predicate which receives and Broadcast variable and uses the value inside, and it works much better. was: *I would like some help to explore if the following feature makes sense and what's the best way to implement it. Allow users to use "isin" (or similar) predicates with huge Lists (which might be broadcasted).* Read last comment :) As of now (AFAIK), users can only provide a list/sequence of elements which will be sent as part of the "In" predicate, which is part of the task, to the executors. So when this list is huge, this causes tasks to be "huge" (specially when the task number is big). I'm coming from https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list (I'm the creator of the post, and also the 2nd answer). I know this is not common, but can be useful for people reading from "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the executors) versus 30 minutes doing a full-scan (even if semi-filtered, most of the data has the same nature and thus is hard to filter, unless I use this "200.000" list) on my table. More or less I have a clear view (explained in stackoverflow) on what to modify If I wanted to create my own "BroadcastIn" in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, however with my limited "idea", anyone who implemented In pushdown would have to re-adapt it to the new "BroadcastIn" I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working for a corporation which uses that version). Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend current case class "In" while keeping the compatibility with PushDown implementations (aka creating a new Predicate not allowed I think), but not having only Seq[Expression] in the case class, but also allow Broadcast[Seq[Expression]]?* This is what makes the tasks
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 9:26 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. *Anyways, I think maybe the best option is to deal with it inside the RDD/RelationProvider, when it finds a big list to be pushed-down (as long as unhandledFilters makes it disappear), it can just broadcast it and then use it.* was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > Read last comment :) > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-br
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 9:26 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. *Anyways, I think maybe the best option without changing all the architecture is to deal with it inside the RDD/RelationProvider, when it finds a big list to be pushed-down (as long as unhandledFilters makes it disappear), it can just broadcast it and then use it. Only problem is that this might leave non-cleaned broadcast variables* was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. *Anyways, I think maybe the best option is to deal with it inside the RDD/RelationProvider, when it finds a big list to be pushed-down (as long as unhandledFilters makes it disappear), it can just broadcast it and then use it.* > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > Read last comment :) > As of now (AFAI
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 9:00 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > Read last comment :) > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task numbe
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:57 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates wit
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:56 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predica
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:55 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ). I "lost" half a day, but did learn quite a bit/confirmed self-assumptions on how Spark works in the inside :). When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > Read last comment :) > As of now (AFAIK), users can only
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:54 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can make them a little complex, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > Read last comment :) > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the
[jira] [Updated] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Florentino Sainz updated SPARK-31417: - Description: *I would like some help to explore if the following feature makes sense and what's the best way to implement it. Allow users to use "isin" (or similar) predicates with huge Lists (which might be broadcasted).* Read last comment :) As of now (AFAIK), users can only provide a list/sequence of elements which will be sent as part of the "In" predicate, which is part of the task, to the executors. So when this list is huge, this causes tasks to be "huge" (specially when the task number is big). I'm coming from https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list (I'm the creator of the post, and also the 2nd answer). I know this is not common, but can be useful for people reading from "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the executors) versus 30 minutes doing a full-scan (even if semi-filtered, most of the data has the same nature and thus is hard to filter, unless I use this "200.000" list) on my table. More or less I have a clear view (explained in stackoverflow) on what to modify If I wanted to create my own "BroadcastIn" in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, however with my limited "idea", anyone who implemented In pushdown would have to re-adapt it to the new "BroadcastIn" I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working for a corporation which uses that version). Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend current case class "In" while keeping the compatibility with PushDown implementations (aka creating a new Predicate not allowed I think), but not having only Seq[Expression] in the case class, but also allow Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a test-run with a predicate which receives and Broadcast variable and uses the value inside, and it works much better. was: Read last comment :) *I would like some help to explore if the following feature makes sense and what's the best way to implement it. Allow users to use "isin" (or similar) predicates with huge Lists (which might be broadcasted).* As of now (AFAIK), users can only provide a list/sequence of elements which will be sent as part of the "In" predicate, which is part of the task, to the executors. So when this list is huge, this causes tasks to be "huge" (specially when the task number is big). I'm coming from https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list (I'm the creator of the post, and also the 2nd answer). I know this is not common, but can be useful for people reading from "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the executors) versus 30 minutes doing a full-scan (even if semi-filtered, most of the data has the same nature and thus is hard to filter, unless I use this "200.000" list) on my table. More or less I have a clear view (explained in stackoverflow) on what to modify If I wanted to create my own "BroadcastIn" in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, however with my limited "idea", anyone who implemented In pushdown would have to re-adapt it to the new "BroadcastIn" I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working for a corporation which uses that version). Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend current case class "In" while keeping the compatibility with PushDown implementations (aka creating a new Predicate not allowed I think), but not having only Seq[Expression] in the case class, but also allow Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a test-run with a predicate which receives and Broadcast variable and uses the value inside, and it works much better. > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 >
[jira] [Resolved] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Florentino Sainz resolved SPARK-31417. -- Resolution: Abandoned > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Florentino Sainz updated SPARK-31417: - Description: Read last comment :) *I would like some help to explore if the following feature makes sense and what's the best way to implement it. Allow users to use "isin" (or similar) predicates with huge Lists (which might be broadcasted).* As of now (AFAIK), users can only provide a list/sequence of elements which will be sent as part of the "In" predicate, which is part of the task, to the executors. So when this list is huge, this causes tasks to be "huge" (specially when the task number is big). I'm coming from https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list (I'm the creator of the post, and also the 2nd answer). I know this is not common, but can be useful for people reading from "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the executors) versus 30 minutes doing a full-scan (even if semi-filtered, most of the data has the same nature and thus is hard to filter, unless I use this "200.000" list) on my table. More or less I have a clear view (explained in stackoverflow) on what to modify If I wanted to create my own "BroadcastIn" in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, however with my limited "idea", anyone who implemented In pushdown would have to re-adapt it to the new "BroadcastIn" I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working for a corporation which uses that version). Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend current case class "In" while keeping the compatibility with PushDown implementations (aka creating a new Predicate not allowed I think), but not having only Seq[Expression] in the case class, but also allow Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a test-run with a predicate which receives and Broadcast variable and uses the value inside, and it works much better. was: *I would like some help to explore if the following feature makes sense and what's the best way to implement it. Allow users to use "isin" (or similar) predicates with huge Lists (which might be broadcasted).* As of now (AFAIK), users can only provide a list/sequence of elements which will be sent as part of the "In" predicate, which is part of the task, to the executors. So when this list is huge, this causes tasks to be "huge" (specially when the task number is big). I'm coming from https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list (I'm the creator of the post, and also the 2nd answer). I know this is not common, but can be useful for people reading from "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the executors) versus 30 minutes doing a full-scan (even if semi-filtered, most of the data has the same nature and thus is hard to filter, unless I use this "200.000" list) on my table. More or less I have a clear view (explained in stackoverflow) on what to modify If I wanted to create my own "BroadcastIn" in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, however with my limited "idea", anyone who implemented In pushdown would have to re-adapt it to the new "BroadcastIn" I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working for a corporation which uses that version). Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend current case class "In" while keeping the compatibility with PushDown implementations (aka creating a new Predicate not allowed I think), but not having only Seq[Expression] in the case class, but also allow Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a test-run with a predicate which receives and Broadcast variable and uses the value inside, and it works much better. > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark >
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:45 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and reimplemented the relation) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code and reimplemented the relation) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackove
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:45 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and also reimplemented the underlying relation which will pushdown the filter) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code of one Spark's Strategy and reimplemented the relation) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (speci
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:45 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas/hacky way (it can be injected into Spark, but you have to copy quite a bit of code and reimplemented the relation) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas (it can be injected into Spark, but you have to copy quite a bit of code and reimplemented the relation) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:44 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* If anyone is interested on me implementing it, or just providing the ideas (it can be injected into Spark, but you have to copy quite a bit of code and reimplemented the relation) just ping me. PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than
[jira] [Updated] (SPARK-31446) Make html elements for a paged table possible to have different id attribute.
[ https://issues.apache.org/jira/browse/SPARK-31446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-31446: -- Fix Version/s: (was: 3.1.0) 3.0.0 > Make html elements for a paged table possible to have different id attribute. > - > > Key: SPARK-31446 > URL: https://issues.apache.org/jira/browse/SPARK-31446 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 3.0.0, 3.1.0 >Reporter: Kousuke Saruta >Assignee: Kousuke Saruta >Priority: Minor > Fix For: 3.0.0 > > > Some pages have a paged table and page navigations above / below the table. > But corresponding HTML elements between the two page navigations for a table > have the same id attribute. Every id element should be unique. > For example, there are two `form-completedJob-table-page` id in JobsPage. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:42 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard, basically delaying the creation of those filters at the underlying technology until the compute method is called...). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter,
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:42 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently can be a little hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (ex
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:41 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:41 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:40 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think?* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relation which doesn't seem so easy so the only place where value is already accessed is in the compute method). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think?* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:40 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relations to explode it efficiently and only access data at compute method looks hard). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think?* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" lis
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:39 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... *For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and also adapting all the RDD/Relation which doesn't seem so easy so the only place where value is already accessed is in the compute method). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think?* PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in st
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:15 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/fullscan, which takes like 10x the time, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:15 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes like 10x the time in Kudu, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, otherwise I just go leftjoin strategy/fullscan, which takes like 10x the time, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/sc
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:12 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). Specially, I don't know if this can be useful for something apart from big user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 any
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:11 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter, aka Source.In. In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast when inside the executor (aka in the RDD implementation)... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyo
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:10 PM: Well... just finished my PoC by adding a custom Predicate + a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case clas
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:09 PM: Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) was (Author: fsainz): Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a bit :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:09 PM: Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:09 PM: Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter when the size is suitable, the amount of data I get from that table fits in memory "perfectly"] was (Author: fsainz): Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a lot :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... For now I'm abandoning it, I know how to implement it, but I think the change is quite big and not sure if it's worth it (adding new Source and all). What you guys think? PD: In my software I just improved the behaviour with a coalesce(X) so there are less big tasks around :) [after pushing down the filter, the amount of data I get from that table fits in memory "perfectly"] > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with Pus
[jira] [Commented] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039 ] Florentino Sainz commented on SPARK-31417: -- Well... just finished my PoC by adding a custom Strategy which will transform the relations/scans before the provided one does (@ spark 2.4). It seems that making this work (at least for pushdown cases, without pushdown, task size does improve quite a bit :P ) When the filter gets pushed down, the change I did is useless, basically because what gets pushed in the task is the pushed filter (which is "saved" inside the provider). In order to implement this, unless breaking backwards compatibility with current BaseRelations, Spark would need another source.In (source.BroadcastedIn?) and those who use it have know its a broadcasted variable, so when they implement the PushDown, they actually call the ".value" of the broadcast... > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-27249) Developers API for Transformers beyond UnaryTransformer
[ https://issues.apache.org/jira/browse/SPARK-27249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086029#comment-17086029 ] Nick Afshartous edited comment on SPARK-27249 at 4/17/20, 7:54 PM: --- [~enrush] Hi Everett, Using the {{Iterator}} approach would seem to deviate from the existing {{Transformer}} contract. More specifically, the {{transform}} function should output a new {{DataFrame}} object. What I would propose is adding a new function {{Transformer.compose}} to allow the composition of {{Transformers}}. {code} def compose(other: Transformer): Transformer = { new Transformer { override def transform(dataset: Dataset[_]): DataFrame = { other.transform(this.transform(dataset)); } ... {code} Then one could {{compose}} {{Transformers}} which effectively would enable multi-column transformations. {code} val dataFrame = ... val transformers = List(transformer1, transformer2, transformer3) val multiColumnTransformer = transformers.reduce((x, y) => x.compose(y)) multiColumnTransformer.transform(dataFrame) {code} I'd be happy to submit a PR if this meets your requirments. was (Author: nafshartous): [~enrush] Hi Everett, Using the {{Iterator}} approach would seem to deviate from the existing {{Transformer}} contract. More specifically, the {{transform}} function should output a new {{DataFrame}} object. What I would propose is adding a new function {{Transformer.compose}} to allow the composition of {{Transformers}}. {code} def compose(other: Transformer): Transformer = { new Transformer { override def transform(dataset: Dataset[_]): DataFrame = { other.transform(this.transform(dataset)); } ... {code} Then one could {{compose}} {{Transformers}} which effectively would enable multi-column transformations. {code} val dataFrame = ... val transformers = List(transformer1, transformer2, transformer3) val multiColumnTransformer = transformers.reduce((x, y) => x.compose(y)) multiColumnTransformer.transform(dataFrame) {code} I'd be happy to submit a PR if this meets your requirments. > Developers API for Transformers beyond UnaryTransformer > --- > > Key: SPARK-27249 > URL: https://issues.apache.org/jira/browse/SPARK-27249 > Project: Spark > Issue Type: New Feature > Components: ML >Affects Versions: 3.1.0 >Reporter: Everett Rush >Priority: Minor > Labels: starter > Attachments: Screen Shot 2020-01-17 at 4.20.57 PM.png > > Original Estimate: 96h > Remaining Estimate: 96h > > It would be nice to have a developers' API for dataset transformations that > need more than one column from a row (ie UnaryTransformer inputs one column > and outputs one column) or that contain objects too expensive to initialize > repeatedly in a UDF such as a database connection. > > Design: > Abstract class PartitionTransformer extends Transformer and defines the > partition transformation function as Iterator[Row] => Iterator[Row] > NB: This parallels the UnaryTransformer createTransformFunc method > > When developers subclass this transformer, they can provide their own schema > for the output Row in which case the PartitionTransformer creates a row > encoder and executes the transformation. Alternatively the developer can set > output Datatype and output col name. Then the PartitionTransformer class will > create a new schema, a row encoder, and execute the transformation. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27249) Developers API for Transformers beyond UnaryTransformer
[ https://issues.apache.org/jira/browse/SPARK-27249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086029#comment-17086029 ] Nick Afshartous commented on SPARK-27249: - [~enrush] Hi Everett, Using the {{Iterator}} approach would seem to deviate from the existing {{Transformer}} contract. More specifically, the {{transform}} function should output a new {{DataFrame}} object. What I would propose is adding a new function {{Transformer.compose}} to allow the composition of {{Transformers}}. {code} def compose(other: Transformer): Transformer = { new Transformer { override def transform(dataset: Dataset[_]): DataFrame = { other.transform(this.transform(dataset)); } ... {code} Then one could {{compose}} {{Transformers}} which effectively would enable multi-column transformations. {code} val dataFrame = ... val transformers = List(transformer1, transformer2, transformer3) val multiColumnTransformer = transformers.reduce((x, y) => x.compose(y)) multiColumnTransformer.transform(dataFrame) {code} I'd be happy to submit a PR if this meets your requirments. > Developers API for Transformers beyond UnaryTransformer > --- > > Key: SPARK-27249 > URL: https://issues.apache.org/jira/browse/SPARK-27249 > Project: Spark > Issue Type: New Feature > Components: ML >Affects Versions: 3.1.0 >Reporter: Everett Rush >Priority: Minor > Labels: starter > Attachments: Screen Shot 2020-01-17 at 4.20.57 PM.png > > Original Estimate: 96h > Remaining Estimate: 96h > > It would be nice to have a developers' API for dataset transformations that > need more than one column from a row (ie UnaryTransformer inputs one column > and outputs one column) or that contain objects too expensive to initialize > repeatedly in a UDF such as a database connection. > > Design: > Abstract class PartitionTransformer extends Transformer and defines the > partition transformation function as Iterator[Row] => Iterator[Row] > NB: This parallels the UnaryTransformer createTransformFunc method > > When developers subclass this transformer, they can provide their own schema > for the output Row in which case the PartitionTransformer creates a row > encoder and executes the transformation. Alternatively the developer can set > output Datatype and output col name. Then the PartitionTransformer class will > create a new schema, a row encoder, and execute the transformation. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31475) Broadcast stage in AQE did not timeout
Wei Xue created SPARK-31475: --- Summary: Broadcast stage in AQE did not timeout Key: SPARK-31475 URL: https://issues.apache.org/jira/browse/SPARK-31475 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Wei Xue -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31474) Consistancy between dayofweek/dow in extract expression and dayofweek function
[ https://issues.apache.org/jira/browse/SPARK-31474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kent Yao updated SPARK-31474: - Summary: Consistancy between dayofweek/dow in extract expression and dayofweek function (was: Consistancy betwwen dayofweek/dow in extract expression and dayofweek function) > Consistancy between dayofweek/dow in extract expression and dayofweek function > -- > > Key: SPARK-31474 > URL: https://issues.apache.org/jira/browse/SPARK-31474 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Kent Yao >Priority: Major > > {code:sql} > spark-sql> SELECT extract(dayofweek from '2009-07-26'); > 1 > spark-sql> SELECT extract(dow from '2009-07-26'); > 0 > spark-sql> SELECT extract(isodow from '2009-07-26'); > 7 > spark-sql> SELECT dayofweek('2009-07-26'); > 1 > spark-sql> SELECT weekday('2009-07-26'); > 6 > {code} > Currently, there are 4 types of day-of-week range: > the function dayofweek(2.3.0) and extracting dayofweek(2.4.0) result as of > Sunday(1) to Saturday(7) > extracting dow(3.0.0) results as of Sunday(0) to Saturday(6) > extracting isodow (3.0.0) results as of Monday(1) to Sunday(7) > the function weekday(2.4.0) results as of Monday(0) to Sunday(6) > Actually, extracting dayofweek and dow are both derived from PostgreSQL but > have different meanings. > https://issues.apache.org/jira/browse/SPARK-23903 > https://issues.apache.org/jira/browse/SPARK-28623 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31474) Consistancy betwwen dayofweek/dow in extract expression and dayofweek function
Kent Yao created SPARK-31474: Summary: Consistancy betwwen dayofweek/dow in extract expression and dayofweek function Key: SPARK-31474 URL: https://issues.apache.org/jira/browse/SPARK-31474 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Kent Yao {code:sql} spark-sql> SELECT extract(dayofweek from '2009-07-26'); 1 spark-sql> SELECT extract(dow from '2009-07-26'); 0 spark-sql> SELECT extract(isodow from '2009-07-26'); 7 spark-sql> SELECT dayofweek('2009-07-26'); 1 spark-sql> SELECT weekday('2009-07-26'); 6 {code} Currently, there are 4 types of day-of-week range: the function dayofweek(2.3.0) and extracting dayofweek(2.4.0) result as of Sunday(1) to Saturday(7) extracting dow(3.0.0) results as of Sunday(0) to Saturday(6) extracting isodow (3.0.0) results as of Monday(1) to Sunday(7) the function weekday(2.4.0) results as of Monday(0) to Sunday(6) Actually, extracting dayofweek and dow are both derived from PostgreSQL but have different meanings. https://issues.apache.org/jira/browse/SPARK-23903 https://issues.apache.org/jira/browse/SPARK-28623 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:59 PM: I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) Just confirmed that all the plans execute in the driver, as expected (this is the first time I check "deeply" inside spark sources). Maybe the list can be passed as a function returning a list instead of a list (so Scala serializes just the function with the broadcast variable instead of the whole list), API would be compatible I think, but that forces not-to-use case classes + 99% breaks binary compatibility :(, ima try a few things before continuing. was (Author: fsainz): I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case). Maybe the list can be passed as a function returning a list instead of a list (so Scala serializes just the function with the broadcast variable instead of the whole list), API would be compatible I think, but that forces not-to-use case classes + 99% breaks binary compatibility :( > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown >
[jira] [Created] (SPARK-31473) AQE should set active session during execution
Wei Xue created SPARK-31473: --- Summary: AQE should set active session during execution Key: SPARK-31473 URL: https://issues.apache.org/jira/browse/SPARK-31473 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Wei Xue -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:15 PM: I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case). Maybe the list can be passed as a function returning a list instead of a list (so Scala serializes just the function with the broadcast variable instead of the whole list), API would be compatible I think, but that forces not-to-use case classes + 99% breaks binary compatibility :( was (Author: fsainz): I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case). Maybe the list can be passed as a function returning a list instead of a list, API would be compatible I think, but that forces not-to-use case classes + 99% breaks binary compatibility :( > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:14 PM: I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case). Maybe the list can be passed as a function returning a list instead of a list, API would be compatible I think, but that forces not-to-use case classes + 99% breaks binary compatibility :( was (Author: fsainz): I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case). Maybe the list can be passed as a function returning a list instead of a list, API would be compatible I think, but that forces not-to-use case classes + 99% binary compatibility :( > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:14 PM: I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case). Maybe the list can be passed as a function returning a list instead of a list, API would be compatible I think, but that forces not-to-use case classes + 99% binary compatibility :( was (Author: fsainz): I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case), passing the list as a function returning a list, can work and API would be compatible I think, but that breaks case classes + 99% binary compatibility :( > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses th
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:13 PM: I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) If that's the case (this is the first time I check "deeply" inside spark sources, but I guess that's the case), passing the list as a function returning a list, can work and API would be compatible I think, but that breaks case classes + 99% binary compatibility :( was (Author: fsainz): I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:08 PM: I'm going ahead with something (@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1537) Add integration with Yarn's Application Timeline Server
[ https://issues.apache.org/jira/browse/SPARK-1537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085922#comment-17085922 ] Daniel Templeton commented on SPARK-1537: - [~vanzin], is that because the community has invested instead in making SHS that central metrics store and UI? What about clusters with mixed workloads? > Add integration with Yarn's Application Timeline Server > --- > > Key: SPARK-1537 > URL: https://issues.apache.org/jira/browse/SPARK-1537 > Project: Spark > Issue Type: New Feature > Components: YARN >Reporter: Marcelo Masiero Vanzin >Priority: Major > Attachments: SPARK-1537.txt, spark-1573.patch > > > It would be nice to have Spark integrate with Yarn's Application Timeline > Server (see YARN-321, YARN-1530). This would allow users running Spark on > Yarn to have a single place to go for all their history needs, and avoid > having to manage a separate service (Spark's built-in server). > At the moment, there's a working version of the ATS in the Hadoop 2.4 branch, > although there is still some ongoing work. But the basics are there, and I > wouldn't expect them to change (much) at this point. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:43 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is File/DataSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:41 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors which is what I'm trying to check now). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:40 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I want to reuse the current In pushdown) was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I use the current In pushdown) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:38 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown) was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:38 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown, at least If I use the current In pushdown) was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. Is FileSourceStrategy executed in the driver? (this is where I get values in order to pushdown) > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30495) How to disable 'spark.security.credentials.${service}.enabled' in Structured streaming while connecting to a kafka cluster
[ https://issues.apache.org/jira/browse/SPARK-30495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085914#comment-17085914 ] Eric commented on SPARK-30495: -- Anyone knows when this fix will make it to a public build? I see that it is part of the RC1 tag, but I can't find any RC1 binaries in maven. I am using 3.0.0 preview2 and this bug is a blocker for us. Any idea when 3.0.0 will be officially out? > How to disable 'spark.security.credentials.${service}.enabled' in Structured > streaming while connecting to a kafka cluster > -- > > Key: SPARK-30495 > URL: https://issues.apache.org/jira/browse/SPARK-30495 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: act_coder >Assignee: Gabor Somogyi >Priority: Major > Fix For: 3.0.0 > > > Trying to read data from a secured Kafka cluster using spark structured > streaming. Also, using the below library to read the data - > +*"spark-sql-kafka-0-10_2.12":"3.0.0-preview"*+ since it has the feature to > specify our custom group id (instead of spark setting its own custom group > id) > +*Dependency used in code:*+ > org.apache.spark > spark-sql-kafka-0-10_2.12 > 3.0.0-preview > > +*Logs:*+ > Getting the below error - even after specifying the required JAAS > configuration in spark options. > Caused by: java.lang.IllegalArgumentException: requirement failed: > *Delegation token must exist for this connector*. at > scala.Predef$.require(Predef.scala:281) at > org.apache.spark.kafka010.KafkaTokenUtil$.isConnectorUsingCurrentToken(KafkaTokenUtil.scala:299) > at > > org.apache.spark.sql.kafka010.KafkaDataConsumer.getOrRetrieveConsumer(KafkaDataConsumer.scala:533) > at > > org.apache.spark.sql.kafka010.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:275) > > +*Spark configuration used to read from Kafka:*+ > val kafkaDF = sparkSession.readStream > .format("kafka") > .option("kafka.bootstrap.servers", bootStrapServer) > .option("subscribe", kafkaTopic ) > > //Setting JAAS Configuration > .option("kafka.sasl.jaas.config", KAFKA_JAAS_SASL) > .option("kafka.sasl.mechanism", "PLAIN") > .option("kafka.security.protocol", "SASL_SSL") > // Setting custom consumer group id > .option("kafka.group.id", "test_cg") > .load() > > Following document specifies that we can disable the feature of obtaining > delegation token - > > [https://spark.apache.org/docs/3.0.0-preview/structured-streaming-kafka-integration.html] > Tried setting this property *spark.security.credentials.kafka.enabled to* > *false in spark config,* but it is still failing with the same error. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:26 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :) (unless this post-translation is also transferred from the driver to the executors). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:08 PM: I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :). I'm basing it on InSet (actually delegating everything to the InSet implementation not to duplicate code) and adding a new method to the Column isInBroadcastCollection. was (Author: fsainz): I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :). I'm basing it on InSet (actually delegating everything to the InSet implementation) and adding a new method to the Column isInBroadcastCollection. > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31417) Allow broadcast variables when using isin code
[ https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900 ] Florentino Sainz commented on SPARK-31417: -- I'm going ahead with something. Just realized the In in the pushdown predicates it's not the one exposed in predicates, but a post-translation, so adding a new class is not a drama :). I'm basing it on InSet (actually delegating everything to the InSet implementation) and adding a new method to the Column isInBroadcastCollection. > Allow broadcast variables when using isin code > -- > > Key: SPARK-31417 > URL: https://issues.apache.org/jira/browse/SPARK-31417 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: Florentino Sainz >Priority: Minor > > *I would like some help to explore if the following feature makes sense and > what's the best way to implement it. Allow users to use "isin" (or similar) > predicates with huge Lists (which might be broadcasted).* > As of now (AFAIK), users can only provide a list/sequence of elements which > will be sent as part of the "In" predicate, which is part of the task, to the > executors. So when this list is huge, this causes tasks to be "huge" > (specially when the task number is big). > I'm coming from > https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list > (I'm the creator of the post, and also the 2nd answer). > I know this is not common, but can be useful for people reading from > "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my > concrete case, querying Kudu with 200.000 elements in the isin takes > ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the > executors) versus 30 minutes doing a full-scan (even if semi-filtered, most > of the data has the same nature and thus is hard to filter, unless I use this > "200.000" list) on my table. > More or less I have a clear view (explained in stackoverflow) on what to > modify If I wanted to create my own "BroadcastIn" in > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala, > however with my limited "idea", anyone who implemented In pushdown would > have to re-adapt it to the new "BroadcastIn" > I've looked at Spark 3.0 sources and nothing has changed there, but I'm not > 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working > for a corporation which uses that version). > Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an > expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend > current case class "In" while keeping the compatibility with PushDown > implementations (aka creating a new Predicate not allowed I think), but not > having only Seq[Expression] in the case class, but also allow > Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a > test-run with a predicate which receives and Broadcast variable and uses the > value inside, and it works much better. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27913) Spark SQL's native ORC reader implements its own schema evolution
[ https://issues.apache.org/jira/browse/SPARK-27913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085852#comment-17085852 ] Giri Dandu commented on SPARK-27913: [~viirya] Sorry for late reply. I re-ran the same test in spark 2.4.5 and it *is NOT working* but it works in 2.3.0. I get the same error in spark 2.4.5 {code:java} Caused by: java.lang.ArrayIndexOutOfBoundsException: 1Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.orc.mapred.OrcStruct.getFieldValue(OrcStruct.java:49) at org.apache.spark.sql.execution.datasources.orc.OrcDeserializer$$anonfun$org$apache$spark$sql$execution$datasources$orc$OrcDeserializer$$newWriter$14.apply(OrcDeserializer.scala:133) at org.apache.spark.sql.execution.datasources.orc.OrcDeserializer$$anonfun$org$apache$spark$sql$execution$datasources$orc$OrcDeserializer$$newWriter$14.apply(OrcDeserializer.scala:123) at org.apache.spark.sql.execution.datasources.orc.OrcDeserializer$$anonfun$2$$anonfun$apply$1.apply(OrcDeserializer.scala:51) at org.apache.spark.sql.execution.datasources.orc.OrcDeserializer$$anonfun$2$$anonfun$apply$1.apply(OrcDeserializer.scala:51) at org.apache.spark.sql.execution.datasources.orc.OrcDeserializer.deserialize(OrcDeserializer.scala:64) at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anonfun$buildReaderWithPartitionValues$2$$anonfun$apply$8.apply(OrcFileFormat.scala:234) at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anonfun$buildReaderWithPartitionValues$2$$anonfun$apply$8.apply(OrcFileFormat.scala:233) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:104) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) {code} > Spark SQL's native ORC reader implements its own schema evolution > - > > Key: SPARK-27913 > URL: https://issues.apache.org/jira/browse/SPARK-27913 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.3 >Reporter: Owen O'Malley >Priority: Major > > ORC's reader handles a wide range of schema evolution, but the Spark SQL > native ORC bindings do not provide the desired schema to the ORC reader. This > causes a regression when moving spark.sql.orc.impl from 'hive' to 'native'. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-31468) Null types should be implicitly casted to Decimal types
[ https://issues.apache.org/jira/browse/SPARK-31468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-31468. - Fix Version/s: 3.0.0 Resolution: Fixed Issue resolved by pull request 28241 [https://github.com/apache/spark/pull/28241] > Null types should be implicitly casted to Decimal types > --- > > Key: SPARK-31468 > URL: https://issues.apache.org/jira/browse/SPARK-31468 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Takeshi Yamamuro >Assignee: Takeshi Yamamuro >Priority: Major > Fix For: 3.0.0 > > > A query below fails in master/branch-3.0 and passed in v2.4.5; > {code} > scala> Seq(BigDecimal(10)).toDF("v1").selectExpr("v1 = NULL").explain(true) > org.apache.spark.sql.AnalysisException: cannot resolve '(`v1` = NULL)' due to > data type mismatch: differing types in '(`v1` = NULL)' (decimal(38,18) and > null).; line 1 pos 0; > 'Project [(v1#5 = null) AS (v1 = NULL)#7] > +- Project [value#2 AS v1#5] >+- LocalRelation [value#2] > ... > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-31468) Null types should be implicitly casted to Decimal types
[ https://issues.apache.org/jira/browse/SPARK-31468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-31468: --- Assignee: Takeshi Yamamuro > Null types should be implicitly casted to Decimal types > --- > > Key: SPARK-31468 > URL: https://issues.apache.org/jira/browse/SPARK-31468 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Takeshi Yamamuro >Assignee: Takeshi Yamamuro >Priority: Major > > A query below fails in master/branch-3.0 and passed in v2.4.5; > {code} > scala> Seq(BigDecimal(10)).toDF("v1").selectExpr("v1 = NULL").explain(true) > org.apache.spark.sql.AnalysisException: cannot resolve '(`v1` = NULL)' due to > data type mismatch: differing types in '(`v1` = NULL)' (decimal(38,18) and > null).; line 1 pos 0; > 'Project [(v1#5 = null) AS (v1 = NULL)#7] > +- Project [value#2 AS v1#5] >+- LocalRelation [value#2] > ... > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31472) allGather() may return null messages
wuyi created SPARK-31472: Summary: allGather() may return null messages Key: SPARK-31472 URL: https://issues.apache.org/jira/browse/SPARK-31472 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.0.0 Reporter: wuyi {code:java} [info] BarrierTaskContextSuite: [info] - share messages with allGather() call *** FAILED *** (18 seconds, 705 milliseconds) [info] org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(0, 2) finished unsuccessfully. [info] java.lang.NullPointerException [info] at scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:204) [info] at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:204) [info] at scala.collection.IndexedSeqOptimized.toList(IndexedSeqOptimized.scala:285) [info] at scala.collection.IndexedSeqOptimized.toList$(IndexedSeqOptimized.scala:284) [info] at scala.collection.mutable.ArrayOps$ofRef.toList(ArrayOps.scala:198) [info] at org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$4(BarrierTaskContextSuite.scala:68) [info] at org.apache.spark.rdd.RDDBarrier.$anonfun$mapPartitions$2(RDDBarrier.scala:51) [info] at org.apache.spark.rdd.RDDBarrier.$anonfun$mapPartitions$2$adapted(RDDBarrier.scala:51) [info] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) [info] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) [info] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) [info] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) [info] at org.apache.spark.scheduler.Task.run(Task.scala:127) [info] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:460) [info] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) [info] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:463) [info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [info] at java.lang.Thread.run(Thread.java:748) [info] at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2094) [info] at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2043) [info] at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2042) [info] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) [info] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) [info] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2042) [info] at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1831) [info] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2271) [info] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2223) [info] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2212) [info] at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) [info] at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:822) [info] at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108) [info] at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129) [info] at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148) [info] at org.apache.spark.SparkContext.runJob(SparkContext.scala:2173) [info] at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) [info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) [info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) [info] at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) [info] at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) [info] at org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$3(BarrierTaskContextSuite.scala:71) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186) [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:151) [info] at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSui
[jira] [Resolved] (SPARK-31469) Make extract interval field ANSI compliance
[ https://issues.apache.org/jira/browse/SPARK-31469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-31469. - Fix Version/s: 3.0.0 Resolution: Fixed Issue resolved by pull request 28242 [https://github.com/apache/spark/pull/28242] > Make extract interval field ANSI compliance > --- > > Key: SPARK-31469 > URL: https://issues.apache.org/jira/browse/SPARK-31469 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0, 3.1.0 >Reporter: Kent Yao >Assignee: Kent Yao >Priority: Major > Fix For: 3.0.0 > > > Currently, we can extract > millennium/century/decade/year/quarter/month/week/day/hour/minute/second(with > fractions)//millisecond/microseconds and epoch from interval values > millennium/century/decade/year means how many the interval months part can be > converted to that unit-value. > while month/day and so on, the integral remainder of the previous unit. > while getting epoch we have treat month as 30 days which varies the natural > Calendar rules we use. > To avoid ambiguity, I suggest we should only support those extract field > defined ANSI with their abbreviations. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-31469) Make extract interval field ANSI compliance
[ https://issues.apache.org/jira/browse/SPARK-31469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-31469: --- Assignee: Kent Yao > Make extract interval field ANSI compliance > --- > > Key: SPARK-31469 > URL: https://issues.apache.org/jira/browse/SPARK-31469 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0, 3.1.0 >Reporter: Kent Yao >Assignee: Kent Yao >Priority: Major > > Currently, we can extract > millennium/century/decade/year/quarter/month/week/day/hour/minute/second(with > fractions)//millisecond/microseconds and epoch from interval values > millennium/century/decade/year means how many the interval months part can be > converted to that unit-value. > while month/day and so on, the integral remainder of the previous unit. > while getting epoch we have treat month as 30 days which varies the natural > Calendar rules we use. > To avoid ambiguity, I suggest we should only support those extract field > defined ANSI with their abbreviations. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26902) Support java.time.Instant as an external type of TimestampType
[ https://issues.apache.org/jira/browse/SPARK-26902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085759#comment-17085759 ] Jorge Machado commented on SPARK-26902: --- what about Supporting the interface Temporal ? > Support java.time.Instant as an external type of TimestampType > -- > > Key: SPARK-26902 > URL: https://issues.apache.org/jira/browse/SPARK-26902 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Major > Fix For: 3.0.0 > > > Currently, Spark supports the java.sql.Date and java.sql.Timestamp types as > external types for Catalyst's DateType and TimestampType. It accepts and > produces values of such types. Since Java 8, base classes for dates and > timestamps are java.time.Instant, java.time.LocalDate/LocalDateTime, and > java.time.ZonedDateTime. Need to add new converters from/to Instant. > The Instant type holds epoch seconds (and nanoseconds), and directly reflects > to Catalyst's TimestampType. > Main motivations for the changes: > - Smoothly support Java 8 time API > - Avoid inconsistency of calendars used inside Spark 3.0 (Proleptic Gregorian > calendar) and inside of java.sql.Timestamp (hybrid calendar - Julian + > Gregorian). > - Make conversion independent from current system timezone. > In case of collecting values of Date/TimestampType, the following SQL config > can control types of returned values: > - spark.sql.catalyst.timestampType with supported values > "java.sql.Timestamp" (by default) and "java.time.Instant" -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-31436) MinHash keyDistance optimization
[ https://issues.apache.org/jira/browse/SPARK-31436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean R. Owen resolved SPARK-31436. -- Fix Version/s: 3.1.0 Resolution: Fixed Issue resolved by pull request 28206 [https://github.com/apache/spark/pull/28206] > MinHash keyDistance optimization > > > Key: SPARK-31436 > URL: https://issues.apache.org/jira/browse/SPARK-31436 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 3.1.0 >Reporter: zhengruifeng >Assignee: zhengruifeng >Priority: Minor > Fix For: 3.1.0 > > > current implementation is based on set operation, it is inefficient -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-31436) MinHash keyDistance optimization
[ https://issues.apache.org/jira/browse/SPARK-31436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean R. Owen reassigned SPARK-31436: Assignee: zhengruifeng > MinHash keyDistance optimization > > > Key: SPARK-31436 > URL: https://issues.apache.org/jira/browse/SPARK-31436 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 3.1.0 >Reporter: zhengruifeng >Assignee: zhengruifeng >Priority: Minor > > current implementation is based on set operation, it is inefficient -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31471) Add a script to run multiple benchmarks
Maxim Gekk created SPARK-31471: -- Summary: Add a script to run multiple benchmarks Key: SPARK-31471 URL: https://issues.apache.org/jira/browse/SPARK-31471 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 3.1.0 Reporter: Maxim Gekk Add a python script to run multiple benchmarks. The script can be taken from [https://github.com/apache/spark/pull/27078] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31448) Difference in Storage Levels used in cache() and persist() for pyspark dataframes
[ https://issues.apache.org/jira/browse/SPARK-31448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abhishek Dixit updated SPARK-31448: --- Labels: pys (was: ) > Difference in Storage Levels used in cache() and persist() for pyspark > dataframes > - > > Key: SPARK-31448 > URL: https://issues.apache.org/jira/browse/SPARK-31448 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 >Reporter: Abhishek Dixit >Priority: Major > Labels: pys > > There is a difference in default storage level *MEMORY_AND_DISK* in pyspark > and scala. > *Scala*: StorageLevel(true, true, false, true) > *Pyspark:* StorageLevel(True, True, False, False) > > *Problem Description:* > Calling *df.cache()* for pyspark dataframe directly invokes Scala method > cache() and Storage Level used is StorageLevel(true, true, false, true). > But calling *df.persist()* for pyspark dataframe sets the > newStorageLevel=StorageLevel(true, true, false, false) inside pyspark and > then invokes Scala function persist(newStorageLevel). > *Possible Fix:* > Invoke pyspark function persist inside pyspark function cache instead of > calling the scala function directly. > I can raise a PR for this fix if someone can confirm that this is a bug and > the possible fix is the correct approach. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31448) Difference in Storage Levels used in cache() and persist() for pyspark dataframes
[ https://issues.apache.org/jira/browse/SPARK-31448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abhishek Dixit updated SPARK-31448: --- Labels: (was: pys) > Difference in Storage Levels used in cache() and persist() for pyspark > dataframes > - > > Key: SPARK-31448 > URL: https://issues.apache.org/jira/browse/SPARK-31448 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 >Reporter: Abhishek Dixit >Priority: Major > > There is a difference in default storage level *MEMORY_AND_DISK* in pyspark > and scala. > *Scala*: StorageLevel(true, true, false, true) > *Pyspark:* StorageLevel(True, True, False, False) > > *Problem Description:* > Calling *df.cache()* for pyspark dataframe directly invokes Scala method > cache() and Storage Level used is StorageLevel(true, true, false, true). > But calling *df.persist()* for pyspark dataframe sets the > newStorageLevel=StorageLevel(true, true, false, false) inside pyspark and > then invokes Scala function persist(newStorageLevel). > *Possible Fix:* > Invoke pyspark function persist inside pyspark function cache instead of > calling the scala function directly. > I can raise a PR for this fix if someone can confirm that this is a bug and > the possible fix is the correct approach. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-31423) DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC
[ https://issues.apache.org/jira/browse/SPARK-31423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-31423. - Resolution: Not A Problem > DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC > -- > > Key: SPARK-31423 > URL: https://issues.apache.org/jira/browse/SPARK-31423 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.1.0 >Reporter: Bruce Robbins >Priority: Major > > There is a range of days (1582-10-05 to 1582-10-14) for which DATEs and > TIMESTAMPS are changed when stored in ORC. The value is off by 10 days. > For example: > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.show // seems fine > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_date") > scala> spark.read.orc("/tmp/funny_orc_date").show // off by 10 days > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > ORC has the same issue with TIMESTAMPS: > {noformat} > scala> val df = sql("select cast('1582-10-14 00:00:00' as TIMESTAMP) ts") > df: org.apache.spark.sql.DataFrame = [ts: timestamp] > scala> df.show // seems fine > +---+ > | ts| > +---+ > |1582-10-14 00:00:00| > +---+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_timestamp") > scala> spark.read.orc("/tmp/funny_orc_timestamp").show(truncate=false) // off > by 10 days > +---+ > |ts | > +---+ > |1582-10-24 00:00:00| > +---+ > scala> > {noformat} > However, when written to Parquet or Avro, DATES and TIMESTAMPs for this range > do not change. > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").parquet("/tmp/funny_parquet_date") > scala> spark.read.parquet("/tmp/funny_parquet_date").show // reflects > original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").format("avro").save("/tmp/funny_avro_date") > scala> spark.read.format("avro").load("/tmp/funny_avro_date").show // > reflects original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> > {noformat} > It's unclear to me whether ORC is behaving correctly or not, as this is how > Spark 2.4 works with DATEs and TIMESTAMPs in general (and also how Spark 3.x > works with DATEs and TIMESTAMPs in general when > {{spark.sql.legacy.timeParserPolicy}} is set to {{LEGACY}}). In Spark 2.4, > DATEs and TIMESTAMPs in this range don't exist: > {noformat} > scala> sql("select cast('1582-10-14' as DATE) dt").show // the same cast done > in Spark 2.4 > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > I assume the following snippet is relevant (from the Wikipedia entry on the > Gregorian calendar): > {quote}To deal with the 10 days' difference (between calendar and > reality)[Note 2] that this drift had already reached, the date was advanced > so that 4 October 1582 was followed by 15 October 1582 > {quote} > Spark 3.x should treat DATEs and TIMESTAMPS in this range consistently, and > probably based on spark.sql.legacy.timeParserPolicy (or some other config) > rather than file format. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31423) DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC
[ https://issues.apache.org/jira/browse/SPARK-31423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085585#comment-17085585 ] Wenchen Fan commented on SPARK-31423: - I'm closing it as "not a bug". This happens because we "rebase" the datetime values for the calendar changes. There are some dates exist in one calendar but not another. What we can do is to move to the next valid date. Eventually every place should use the standard calendar so we won't have this problem. > DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC > -- > > Key: SPARK-31423 > URL: https://issues.apache.org/jira/browse/SPARK-31423 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.1.0 >Reporter: Bruce Robbins >Priority: Major > > There is a range of days (1582-10-05 to 1582-10-14) for which DATEs and > TIMESTAMPS are changed when stored in ORC. The value is off by 10 days. > For example: > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.show // seems fine > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_date") > scala> spark.read.orc("/tmp/funny_orc_date").show // off by 10 days > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > ORC has the same issue with TIMESTAMPS: > {noformat} > scala> val df = sql("select cast('1582-10-14 00:00:00' as TIMESTAMP) ts") > df: org.apache.spark.sql.DataFrame = [ts: timestamp] > scala> df.show // seems fine > +---+ > | ts| > +---+ > |1582-10-14 00:00:00| > +---+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_timestamp") > scala> spark.read.orc("/tmp/funny_orc_timestamp").show(truncate=false) // off > by 10 days > +---+ > |ts | > +---+ > |1582-10-24 00:00:00| > +---+ > scala> > {noformat} > However, when written to Parquet or Avro, DATES and TIMESTAMPs for this range > do not change. > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").parquet("/tmp/funny_parquet_date") > scala> spark.read.parquet("/tmp/funny_parquet_date").show // reflects > original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").format("avro").save("/tmp/funny_avro_date") > scala> spark.read.format("avro").load("/tmp/funny_avro_date").show // > reflects original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> > {noformat} > It's unclear to me whether ORC is behaving correctly or not, as this is how > Spark 2.4 works with DATEs and TIMESTAMPs in general (and also how Spark 3.x > works with DATEs and TIMESTAMPs in general when > {{spark.sql.legacy.timeParserPolicy}} is set to {{LEGACY}}). In Spark 2.4, > DATEs and TIMESTAMPs in this range don't exist: > {noformat} > scala> sql("select cast('1582-10-14' as DATE) dt").show // the same cast done > in Spark 2.4 > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > I assume the following snippet is relevant (from the Wikipedia entry on the > Gregorian calendar): > {quote}To deal with the 10 days' difference (between calendar and > reality)[Note 2] that this drift had already reached, the date was advanced > so that 4 October 1582 was followed by 15 October 1582 > {quote} > Spark 3.x should treat DATEs and TIMESTAMPS in this range consistently, and > probably based on spark.sql.legacy.timeParserPolicy (or some other config) > rather than file format. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31470) Introduce SORTED BY clause in CREATE TABLE statement
Yuming Wang created SPARK-31470: --- Summary: Introduce SORTED BY clause in CREATE TABLE statement Key: SPARK-31470 URL: https://issues.apache.org/jira/browse/SPARK-31470 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Yuming Wang We usually sort on frequently filtered columns when writing data to improve query performance. But there is no these info in the table information. {code:sql} CREATE TABLE t(day INT, hour INT, year INT, month INT) USING parquet PARTITIONED BY (year, month) SORTED BY (day, hour); {code} Impala support this clause: https://issues.apache.org/jira/browse/IMPALA-4166 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31469) Make extract interval field ANSI compliance
Kent Yao created SPARK-31469: Summary: Make extract interval field ANSI compliance Key: SPARK-31469 URL: https://issues.apache.org/jira/browse/SPARK-31469 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0, 3.1.0 Reporter: Kent Yao Currently, we can extract millennium/century/decade/year/quarter/month/week/day/hour/minute/second(with fractions)//millisecond/microseconds and epoch from interval values millennium/century/decade/year means how many the interval months part can be converted to that unit-value. while month/day and so on, the integral remainder of the previous unit. while getting epoch we have treat month as 30 days which varies the natural Calendar rules we use. To avoid ambiguity, I suggest we should only support those extract field defined ANSI with their abbreviations. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31467) Fix test issue with table named `test` in hive/SQLQuerySuite
[ https://issues.apache.org/jira/browse/SPARK-31467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] feiwang updated SPARK-31467: Description: If we add ut in hive/SQLQuerySuite and use table named `test`. We may meet these exceptions. {code:java} org.apache.spark.sql.AnalysisException: Inserting into an RDD-based table is not allowed.;; [info] 'InsertIntoTable Project [_1#1403 AS key#1406, _2#1404 AS value#1407], Map(name -> Some(n1)), true, false [info] +- Project [col1#3850] [info]+- LocalRelation [col1#3850] {code} {code:java} org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: Table or view 'test' already exists in database 'default'; [info] at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateTable$1.apply$mcV$sp(HiveExternalCatalog.scala:226) [info] at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateTable$1.apply(HiveExternalCatalog.scala:216) [info] at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateTable$1.apply(HiveExternalCatalog.scala:216) [info] at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97) [info] at org.apache.spark.sql.hive.HiveExternalCatalog.doCreateTable(HiveExternalCatalog.scala:216) {code} > Fix test issue with table named `test` in hive/SQLQuerySuite > > > Key: SPARK-31467 > URL: https://issues.apache.org/jira/browse/SPARK-31467 > Project: Spark > Issue Type: Bug > Components: Tests >Affects Versions: 3.1.0 >Reporter: feiwang >Priority: Major > > If we add ut in hive/SQLQuerySuite and use table named `test`. We may meet > these exceptions. > {code:java} > org.apache.spark.sql.AnalysisException: Inserting into an RDD-based table is > not allowed.;; > [info] 'InsertIntoTable Project [_1#1403 AS key#1406, _2#1404 AS value#1407], > Map(name -> Some(n1)), true, false > [info] +- Project [col1#3850] > [info]+- LocalRelation [col1#3850] > {code} > {code:java} > org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: Table or > view 'test' already exists in database 'default'; > [info] at > org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateTable$1.apply$mcV$sp(HiveExternalCatalog.scala:226) > [info] at > org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateTable$1.apply(HiveExternalCatalog.scala:216) > [info] at > org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateTable$1.apply(HiveExternalCatalog.scala:216) > [info] at > org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97) > [info] at > org.apache.spark.sql.hive.HiveExternalCatalog.doCreateTable(HiveExternalCatalog.scala:216) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31468) Null types should be implicitly casted to Decimal types
Takeshi Yamamuro created SPARK-31468: Summary: Null types should be implicitly casted to Decimal types Key: SPARK-31468 URL: https://issues.apache.org/jira/browse/SPARK-31468 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Takeshi Yamamuro A query below fails in master/branch-3.0 and passed in v2.4.5; {code} scala> Seq(BigDecimal(10)).toDF("v1").selectExpr("v1 = NULL").explain(true) org.apache.spark.sql.AnalysisException: cannot resolve '(`v1` = NULL)' due to data type mismatch: differing types in '(`v1` = NULL)' (decimal(38,18) and null).; line 1 pos 0; 'Project [(v1#5 = null) AS (v1 = NULL)#7] +- Project [value#2 AS v1#5] +- LocalRelation [value#2] ... {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31467) Fix test issue with table named `test` in hive/SQLQuerySuite
feiwang created SPARK-31467: --- Summary: Fix test issue with table named `test` in hive/SQLQuerySuite Key: SPARK-31467 URL: https://issues.apache.org/jira/browse/SPARK-31467 Project: Spark Issue Type: Bug Components: Tests Affects Versions: 3.1.0 Reporter: feiwang -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-30913) Add version information to the configuration of Tests.scala
[ https://issues.apache.org/jira/browse/SPARK-30913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-30913: Assignee: jiaan.geng > Add version information to the configuration of Tests.scala > --- > > Key: SPARK-30913 > URL: https://issues.apache.org/jira/browse/SPARK-30913 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: jiaan.geng >Assignee: jiaan.geng >Priority: Major > Fix For: 3.0.0, 3.1.0 > > > core/src/main/scala/org/apache/spark/internal/config/Tests.scala -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org