[jira] [Created] (SPARK-31477) Dump codegen and compile time in BenchmarkQueryTest

2020-04-17 Thread Xiao Li (Jira)
Xiao Li created SPARK-31477:
---

 Summary: Dump codegen and compile time in BenchmarkQueryTest
 Key: SPARK-31477
 URL: https://issues.apache.org/jira/browse/SPARK-31477
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Xiao Li
Assignee: Xiao Li


nice to measure the codegen and compilation time costs in TPC-DS queries



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31459) When using the insert overwrite directory syntax, if the target path is an existing file, the final run result is incorrect

2020-04-17 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-31459:
--
Affects Version/s: (was: 3)
   3.0.0

> When using the insert overwrite directory syntax, if the target path is an 
> existing file, the final run result is incorrect
> ---
>
> Key: SPARK-31459
> URL: https://issues.apache.org/jira/browse/SPARK-31459
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.5, 3.0.0
> Environment: spark2.4.5
>Reporter: mcdull_zhang
>Priority: Major
>  Labels: sql
>
> When using the insert overwrite directory syntax, if the target path is an 
> existing file, the final operation result is incorrect.
> At present, Spark will not delete the existing files. After the calculation 
> is completed, one of the result files will be renamed to the result path.
> This is different from hive's behavior. Hive will delete the existing target 
> file.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-31390) Document Window Function

2020-04-17 Thread Takeshi Yamamuro (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Takeshi Yamamuro resolved SPARK-31390.
--
Fix Version/s: 3.0.0
 Assignee: Huaxin Gao
   Resolution: Fixed

Resolved by https://github.com/apache/spark/pull/28220

> Document Window Function
> 
>
> Key: SPARK-31390
> URL: https://issues.apache.org/jira/browse/SPARK-31390
> Project: Spark
>  Issue Type: Sub-task
>  Components: Documentation, SQL
>Affects Versions: 3.0.0
>Reporter: Huaxin Gao
>Assignee: Huaxin Gao
>Priority: Major
> Fix For: 3.0.0
>
>
> Document Window Function



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31476) Add an ExpressionInfo entry for EXTRACT

2020-04-17 Thread Takeshi Yamamuro (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Takeshi Yamamuro updated SPARK-31476:
-
Description: This ticket intends to add an ExpressionInfo entry for EXTRACT 
for better documentations.

> Add an ExpressionInfo entry for EXTRACT
> ---
>
> Key: SPARK-31476
> URL: https://issues.apache.org/jira/browse/SPARK-31476
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Takeshi Yamamuro
>Priority: Major
>
> This ticket intends to add an ExpressionInfo entry for EXTRACT for better 
> documentations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31476) Add an ExpressionInfo entry for EXTRACT

2020-04-17 Thread Takeshi Yamamuro (Jira)
Takeshi Yamamuro created SPARK-31476:


 Summary: Add an ExpressionInfo entry for EXTRACT
 Key: SPARK-31476
 URL: https://issues.apache.org/jira/browse/SPARK-31476
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Takeshi Yamamuro






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29302) dynamic partition overwrite with speculation enabled

2020-04-17 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086124#comment-17086124
 ] 

Mridul Muralidharan commented on SPARK-29302:
-

I agree with [~feiwang], it looks like newTaskTempFile is not robust to 
speculative execution and task failures when dynamicPartitionOverwrite is 
enabled IMO.
This will need to be fixed - it is currently using the same path irrespective 
of which attempt it is.

> dynamic partition overwrite with speculation enabled
> 
>
> Key: SPARK-29302
> URL: https://issues.apache.org/jira/browse/SPARK-29302
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4
>Reporter: feiwang
>Priority: Major
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> Now, for a dynamic partition overwrite operation,  the filename of a task 
> output is determinable.
> So, if speculation is enabled,  would a task conflict with  its relative 
> speculation task?
> Would the two tasks concurrent write a same file?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-29302) dynamic partition overwrite with speculation enabled

2020-04-17 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-29302:

Comment: was deleted

(was: Drive by observations:

* Speculative execution does not run the speculative task concurrently on the 
same node running the task (so definitely not same executor).
* Do we have additional details about the tasks which are failing ? Did they 
fail before (for unrelated reasons) before failing for path conflict ?
)

> dynamic partition overwrite with speculation enabled
> 
>
> Key: SPARK-29302
> URL: https://issues.apache.org/jira/browse/SPARK-29302
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4
>Reporter: feiwang
>Priority: Major
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> Now, for a dynamic partition overwrite operation,  the filename of a task 
> output is determinable.
> So, if speculation is enabled,  would a task conflict with  its relative 
> speculation task?
> Would the two tasks concurrent write a same file?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29302) dynamic partition overwrite with speculation enabled

2020-04-17 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086117#comment-17086117
 ] 

Mridul Muralidharan commented on SPARK-29302:
-

Drive by observations:

* Speculative execution does not run the speculative task concurrently on the 
same node running the task (so definitely not same executor).
* Do we have additional details about the tasks which are failing ? Did they 
fail before (for unrelated reasons) before failing for path conflict ?


> dynamic partition overwrite with speculation enabled
> 
>
> Key: SPARK-29302
> URL: https://issues.apache.org/jira/browse/SPARK-29302
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4
>Reporter: feiwang
>Priority: Major
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> Now, for a dynamic partition overwrite operation,  the filename of a task 
> output is determinable.
> So, if speculation is enabled,  would a task conflict with  its relative 
> speculation task?
> Would the two tasks concurrent write a same file?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31420) Infinite timeline redraw in job details page

2020-04-17 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-31420:
--
Fix Version/s: (was: 3.1.0)
   (was: 2.4.5)
   2.4.6

> Infinite timeline redraw in job details page
> 
>
> Key: SPARK-31420
> URL: https://issues.apache.org/jira/browse/SPARK-31420
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.4, 2.4.5, 3.0.0, 3.1.0
>Reporter: Gengliang Wang
>Assignee: Kousuke Saruta
>Priority: Major
> Fix For: 3.0.0, 2.4.6
>
> Attachments: timeline.mov
>
>
> In the job page, the timeline section keeps changing the position style and 
> shaking. We can see that there is a warning "infinite loop in redraw" from 
> the console, which can be related to 
> https://github.com/visjs/vis-timeline/issues/17
> I am using the history server with the events under 
> "core/src/test/resources/spark-events" to reproduce.
> I have also uploaded a screen recording.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florentino Sainz updated SPARK-31417:
-
Description: 
*I would like some help to explore if the following feature makes sense and 
what's the best way to implement it. Allow users to use "isin" (or similar) 
predicates with huge Lists (which might be broadcasted).*

*Abandoning, I think maybe the best option without changing all the 
architecture is to deal with it inside the RDD/RelationProvider, when it finds 
a big list to be pushed-down (as long as unhandled Filters makes it disappear), 
it can just broadcast it and then use it. Only problem is that this might leave 
non-cleaned broadcast variables, unless there's a way (or someone implements 
it) to cleanup such broadcasts after an action.*

Read last comment for more info :)

As of now (AFAIK), users can only provide a list/sequence of elements which 
will be sent as part of the "In" predicate, which is part of the task, to the 
executors. So when this list is huge, this causes tasks to be "huge" (specially 
when the task number is big).

I'm coming from 
https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
(I'm the creator of the post, and also the 2nd answer). 

I know this is not common, but can be useful for people reading from "servers" 
which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete 
case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 
minute "calculating plan" to send the BIG tasks to the executors) versus 30 
minutes doing a full-scan (even if semi-filtered, most of the data has the same 
nature and thus is hard to filter, unless I use this "200.000" list) on my 
table.

More or less I have a clear view (explained in stackoverflow) on what to modify 
If I wanted to create my own "BroadcastIn" in 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
 however with my limited "idea", anyone who implemented In pushdown would have 
to re-adapt it to the new "BroadcastIn"

I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
for a corporation which uses that version).

Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
current case class "In" while keeping the compatibility with PushDown 
implementations (aka creating a new Predicate not allowed I think), but not 
having only Seq[Expression] in the case class, but also allow 
Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
test-run with a predicate which receives and Broadcast variable and uses the 
value inside, and it works much better.

  was:
*I would like some help to explore if the following feature makes sense and 
what's the best way to implement it. Allow users to use "isin" (or similar) 
predicates with huge Lists (which might be broadcasted).*

Abandoning, I think maybe the best option without changing all the architecture 
is to deal with it inside the RDD/RelationProvider, when it finds a big list to 
be pushed-down (as long as unhandledFilters makes it disappear), it can just 
broadcast it and then use it. Only problem is that this might leave non-cleaned 
broadcast variables

Read last comment for more info :)

As of now (AFAIK), users can only provide a list/sequence of elements which 
will be sent as part of the "In" predicate, which is part of the task, to the 
executors. So when this list is huge, this causes tasks to be "huge" (specially 
when the task number is big).

I'm coming from 
https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
(I'm the creator of the post, and also the 2nd answer). 

I know this is not common, but can be useful for people reading from "servers" 
which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete 
case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 
minute "calculating plan" to send the BIG tasks to the executors) versus 30 
minutes doing a full-scan (even if semi-filtered, most of the data has the same 
nature and thus is hard to filter, unless I use this "200.000" list) on my 
table.

More or less I have a clear view (explained in stackoverflow) on what to modify 
If I wanted to create my own "BroadcastIn" in 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
 however with my limited "idea", anyone who implemented In pushdown would have 
to re-adapt it to the new "BroadcastIn"

I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
for a corporation w

[jira] [Updated] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florentino Sainz updated SPARK-31417:
-
Description: 
*I would like some help to explore if the following feature makes sense and 
what's the best way to implement it. Allow users to use "isin" (or similar) 
predicates with huge Lists (which might be broadcasted).*

Abandoning, I think maybe the best option without changing all the architecture 
is to deal with it inside the RDD/RelationProvider, when it finds a big list to 
be pushed-down (as long as unhandledFilters makes it disappear), it can just 
broadcast it and then use it. Only problem is that this might leave non-cleaned 
broadcast variables

Read last comment for more info :)

As of now (AFAIK), users can only provide a list/sequence of elements which 
will be sent as part of the "In" predicate, which is part of the task, to the 
executors. So when this list is huge, this causes tasks to be "huge" (specially 
when the task number is big).

I'm coming from 
https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
(I'm the creator of the post, and also the 2nd answer). 

I know this is not common, but can be useful for people reading from "servers" 
which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete 
case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 
minute "calculating plan" to send the BIG tasks to the executors) versus 30 
minutes doing a full-scan (even if semi-filtered, most of the data has the same 
nature and thus is hard to filter, unless I use this "200.000" list) on my 
table.

More or less I have a clear view (explained in stackoverflow) on what to modify 
If I wanted to create my own "BroadcastIn" in 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
 however with my limited "idea", anyone who implemented In pushdown would have 
to re-adapt it to the new "BroadcastIn"

I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
for a corporation which uses that version).

Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
current case class "In" while keeping the compatibility with PushDown 
implementations (aka creating a new Predicate not allowed I think), but not 
having only Seq[Expression] in the case class, but also allow 
Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
test-run with a predicate which receives and Broadcast variable and uses the 
value inside, and it works much better.

  was:
*I would like some help to explore if the following feature makes sense and 
what's the best way to implement it. Allow users to use "isin" (or similar) 
predicates with huge Lists (which might be broadcasted).*


Read last comment :)

As of now (AFAIK), users can only provide a list/sequence of elements which 
will be sent as part of the "In" predicate, which is part of the task, to the 
executors. So when this list is huge, this causes tasks to be "huge" (specially 
when the task number is big).

I'm coming from 
https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
(I'm the creator of the post, and also the 2nd answer). 

I know this is not common, but can be useful for people reading from "servers" 
which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete 
case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 
minute "calculating plan" to send the BIG tasks to the executors) versus 30 
minutes doing a full-scan (even if semi-filtered, most of the data has the same 
nature and thus is hard to filter, unless I use this "200.000" list) on my 
table.

More or less I have a clear view (explained in stackoverflow) on what to modify 
If I wanted to create my own "BroadcastIn" in 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
 however with my limited "idea", anyone who implemented In pushdown would have 
to re-adapt it to the new "BroadcastIn"

I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
for a corporation which uses that version).

Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
current case class "In" while keeping the compatibility with PushDown 
implementations (aka creating a new Predicate not allowed I think), but not 
having only Seq[Expression] in the case class, but also allow 
Broadcast[Seq[Expression]]?* This is what makes the tasks 

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 9:26 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

*Anyways, I think maybe the best option is to deal with it inside the 
RDD/RelationProvider, when it finds a big list to be pushed-down (as long as 
unhandledFilters makes it disappear), it can just broadcast it and then use it.*




was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> Read last comment :)
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-br

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 9:26 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

*Anyways, I think maybe the best option without changing all the architecture 
is to deal with it inside the RDD/RelationProvider, when it finds a big list to 
be pushed-down (as long as unhandledFilters makes it disappear), it can just 
broadcast it and then use it. Only problem is that this might leave non-cleaned 
broadcast variables* 




was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

*Anyways, I think maybe the best option is to deal with it inside the 
RDD/RelationProvider, when it finds a big list to be pushed-down (as long as 
unhandledFilters makes it disappear), it can just broadcast it and then use it.*



> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> Read last comment :)
> As of now (AFAI

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 9:00 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.



was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> Read last comment :)
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task numbe

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:57 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates wit

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:56 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P ). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predica

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:55 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P ). I "lost" half a day, but did 
learn quite a bit/confirmed self-assumptions on how Spark works in the inside 
:).

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> Read last comment :)
> As of now (AFAIK), users can only 

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:54 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can make them a little complex, 
basically delaying the creation of those filters at the underlying technology 
until the compute method is called...). Specially, I don't know if this can be 
useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> Read last comment :)
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 

[jira] [Updated] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florentino Sainz updated SPARK-31417:
-
Description: 
*I would like some help to explore if the following feature makes sense and 
what's the best way to implement it. Allow users to use "isin" (or similar) 
predicates with huge Lists (which might be broadcasted).*


Read last comment :)

As of now (AFAIK), users can only provide a list/sequence of elements which 
will be sent as part of the "In" predicate, which is part of the task, to the 
executors. So when this list is huge, this causes tasks to be "huge" (specially 
when the task number is big).

I'm coming from 
https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
(I'm the creator of the post, and also the 2nd answer). 

I know this is not common, but can be useful for people reading from "servers" 
which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete 
case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 
minute "calculating plan" to send the BIG tasks to the executors) versus 30 
minutes doing a full-scan (even if semi-filtered, most of the data has the same 
nature and thus is hard to filter, unless I use this "200.000" list) on my 
table.

More or less I have a clear view (explained in stackoverflow) on what to modify 
If I wanted to create my own "BroadcastIn" in 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
 however with my limited "idea", anyone who implemented In pushdown would have 
to re-adapt it to the new "BroadcastIn"

I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
for a corporation which uses that version).

Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
current case class "In" while keeping the compatibility with PushDown 
implementations (aka creating a new Predicate not allowed I think), but not 
having only Seq[Expression] in the case class, but also allow 
Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
test-run with a predicate which receives and Broadcast variable and uses the 
value inside, and it works much better.

  was:
Read last comment :)

*I would like some help to explore if the following feature makes sense and 
what's the best way to implement it. Allow users to use "isin" (or similar) 
predicates with huge Lists (which might be broadcasted).*

As of now (AFAIK), users can only provide a list/sequence of elements which 
will be sent as part of the "In" predicate, which is part of the task, to the 
executors. So when this list is huge, this causes tasks to be "huge" (specially 
when the task number is big).

I'm coming from 
https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
(I'm the creator of the post, and also the 2nd answer). 

I know this is not common, but can be useful for people reading from "servers" 
which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete 
case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 
minute "calculating plan" to send the BIG tasks to the executors) versus 30 
minutes doing a full-scan (even if semi-filtered, most of the data has the same 
nature and thus is hard to filter, unless I use this "200.000" list) on my 
table.

More or less I have a clear view (explained in stackoverflow) on what to modify 
If I wanted to create my own "BroadcastIn" in 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
 however with my limited "idea", anyone who implemented In pushdown would have 
to re-adapt it to the new "BroadcastIn"

I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
for a corporation which uses that version).

Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
current case class "In" while keeping the compatibility with PushDown 
implementations (aka creating a new Predicate not allowed I think), but not 
having only Seq[Expression] in the case class, but also allow 
Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
test-run with a predicate which receives and Broadcast variable and uses the 
value inside, and it works much better.


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
>   

[jira] [Resolved] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florentino Sainz resolved SPARK-31417.
--
Resolution: Abandoned

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florentino Sainz updated SPARK-31417:
-
Description: 
Read last comment :)

*I would like some help to explore if the following feature makes sense and 
what's the best way to implement it. Allow users to use "isin" (or similar) 
predicates with huge Lists (which might be broadcasted).*

As of now (AFAIK), users can only provide a list/sequence of elements which 
will be sent as part of the "In" predicate, which is part of the task, to the 
executors. So when this list is huge, this causes tasks to be "huge" (specially 
when the task number is big).

I'm coming from 
https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
(I'm the creator of the post, and also the 2nd answer). 

I know this is not common, but can be useful for people reading from "servers" 
which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete 
case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 
minute "calculating plan" to send the BIG tasks to the executors) versus 30 
minutes doing a full-scan (even if semi-filtered, most of the data has the same 
nature and thus is hard to filter, unless I use this "200.000" list) on my 
table.

More or less I have a clear view (explained in stackoverflow) on what to modify 
If I wanted to create my own "BroadcastIn" in 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
 however with my limited "idea", anyone who implemented In pushdown would have 
to re-adapt it to the new "BroadcastIn"

I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
for a corporation which uses that version).

Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
current case class "In" while keeping the compatibility with PushDown 
implementations (aka creating a new Predicate not allowed I think), but not 
having only Seq[Expression] in the case class, but also allow 
Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
test-run with a predicate which receives and Broadcast variable and uses the 
value inside, and it works much better.

  was:
*I would like some help to explore if the following feature makes sense and 
what's the best way to implement it. Allow users to use "isin" (or similar) 
predicates with huge Lists (which might be broadcasted).*

As of now (AFAIK), users can only provide a list/sequence of elements which 
will be sent as part of the "In" predicate, which is part of the task, to the 
executors. So when this list is huge, this causes tasks to be "huge" (specially 
when the task number is big).

I'm coming from 
https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
(I'm the creator of the post, and also the 2nd answer). 

I know this is not common, but can be useful for people reading from "servers" 
which are not in the same nodes than Spark (maybe S3/cloud?). In my concrete 
case, querying Kudu with 200.000 elements in the isin takes ~2minutes (+1 
minute "calculating plan" to send the BIG tasks to the executors) versus 30 
minutes doing a full-scan (even if semi-filtered, most of the data has the same 
nature and thus is hard to filter, unless I use this "200.000" list) on my 
table.

More or less I have a clear view (explained in stackoverflow) on what to modify 
If I wanted to create my own "BroadcastIn" in 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
 however with my limited "idea", anyone who implemented In pushdown would have 
to re-adapt it to the new "BroadcastIn"

I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
for a corporation which uses that version).

Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
current case class "In" while keeping the compatibility with PushDown 
implementations (aka creating a new Predicate not allowed I think), but not 
having only Seq[Expression] in the case class, but also allow 
Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
test-run with a predicate which receives and Broadcast variable and uses the 
value inside, and it works much better.


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
> 

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:45 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and reimplemented the relation) just ping 
me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code and reimplemented the relation) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackove

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:45 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and also reimplemented the underlying 
relation which will pushdown the filter) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code of one Spark's Strategy and reimplemented the relation) just ping 
me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (speci

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:45 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the 
ideas/hacky way (it can be injected into Spark, but you have to copy quite a 
bit of code and reimplemented the relation) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the ideas (it 
can be injected into Spark, but you have to copy quite a bit of code and 
reimplemented the relation) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:44 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

If anyone is interested on me implementing it, or just providing the ideas (it 
can be injected into Spark, but you have to copy quite a bit of code and 
reimplemented the relation) just ping me.

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than 

[jira] [Updated] (SPARK-31446) Make html elements for a paged table possible to have different id attribute.

2020-04-17 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-31446:
--
Fix Version/s: (was: 3.1.0)
   3.0.0

> Make html elements for a paged table possible to have different id attribute.
> -
>
> Key: SPARK-31446
> URL: https://issues.apache.org/jira/browse/SPARK-31446
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 3.0.0, 3.1.0
>Reporter: Kousuke Saruta
>Assignee: Kousuke Saruta
>Priority: Minor
> Fix For: 3.0.0
>
>
> Some pages have a paged table and  page navigations above / below the table.
> But corresponding HTML elements between the two page navigations for a table 
> have the same id attribute. Every id element should be unique.
> For example, there are two `form-completedJob-table-page` id in JobsPage.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:42 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard, basically 
delaying the creation of those filters at the underlying technology until the 
compute method is called...). Specially, I don't know if this can be useful for 
something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter,

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:42 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently can be a little hard). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently and only access data at compute 
method looks hard). Specially, I don't know if this can be useful for something 
apart from big user-provided "isins" (maybe some join-pushdown-optimization I 
don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (ex

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:41 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode 
 it efficiently and only access data at compute method looks hard). Specially, 
I don't know if this can be useful for something apart from big user-provided 
"isins" (maybe some join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode 
 it efficiently and only access data at compute method looks hard). Specially, 
I don't know if this can be useful for something apart from big user-provided 
"isins" (maybe some join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:41 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode it efficiently and only access data at compute 
method looks hard). Specially, I don't know if this can be useful for something 
apart from big user-provided "isins" (maybe some join-pushdown-optimization I 
don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode 
 it efficiently and only access data at compute method looks hard). Specially, 
I don't know if this can be useful for something apart from big user-provided 
"isins" (maybe some join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More 

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:40 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode 
 it efficiently and only access data at compute method looks hard). Specially, 
I don't know if this can be useful for something apart from big user-provided 
"isins" (maybe some join-pushdown-optimization I don't realize?) What you guys 
think?*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relation which doesn't seem so easy so the only place where value 
is already accessed is in the compute method). Specially, I don't know if this 
can be useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?) What you guys think?*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard 

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:40 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode 
 it efficiently and only access data at compute method looks hard). Specially, 
I don't know if this can be useful for something apart from big user-provided 
"isins" (maybe some join-pushdown-optimization I don't realize?)*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relations to explode 
 it efficiently and only access data at compute method looks hard). Specially, 
I don't know if this can be useful for something apart from big user-provided 
"isins" (maybe some join-pushdown-optimization I don't realize?) What you guys 
think?*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" lis

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:39 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


*For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and also adapting 
all the RDD/Relation which doesn't seem so easy so the only place where value 
is already accessed is in the compute method). Specially, I don't know if this 
can be useful for something apart from big user-provided "isins" (maybe some 
join-pushdown-optimization I don't realize?) What you guys think?*

PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) 
What you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in st

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:15 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) 
What you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/fullscan, which takes like 10x 
the time, the amount of data I get from that table fits in memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) 
What you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:15 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) 
What you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/almost fullscan, which takes 
like 10x the time in Kudu, the amount of data I get from that table fits in 
memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) 
What you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, otherwise I just go leftjoin strategy/fullscan, which takes like 10x 
the time, the amount of data I get from that table fits in memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/sc

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:12 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). 
Specially, I don't know if this can be useful for something apart from big 
user-provided "isins" (maybe some join-pushdown-optimization I don't realize?) 
What you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 any

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:11 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter, aka Source.In.

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast when inside the executor (aka in the RDD implementation)... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyo

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:10 PM:


Well... just finished my PoC by adding a custom Predicate + a custom Strategy 
which will transform the relations/scans before the provided one does (@ spark 
2.4). It seems that making this work (at least for pushdown cases, without 
pushdown, task size does improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case clas

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:09 PM:


Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :)


was (Author: fsainz):
Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a bit :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:09 PM:


Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter, the amount of data 
I get from that table fits in memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :)

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case 

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 8:09 PM:


Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter when the size is 
suitable, the amount of data I get from that table fits in memory "perfectly"]


was (Author: fsainz):
Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a lot :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


For now I'm abandoning it, I know how to implement it, but I think the change 
is quite big and not sure if it's worth it (adding new Source and all). What 
you guys think?


PD: In my software I just improved the behaviour with a coalesce(X) so there 
are less big tasks around :) [after pushing down the filter, the amount of data 
I get from that table fits in memory "perfectly"]

> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with Pus

[jira] [Commented] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086039#comment-17086039
 ] 

Florentino Sainz commented on SPARK-31417:
--

Well... just finished my PoC by adding a custom Strategy which will transform 
the relations/scans before the provided one does (@ spark 2.4). It seems that 
making this work (at least for pushdown cases, without pushdown, task size does 
improve quite a bit :P )

When the filter gets pushed down, the change I did is useless, basically 
because what gets pushed in the task is the pushed filter (which is "saved" 
inside the provider).

In order to implement this, unless breaking backwards compatibility with 
current BaseRelations, Spark would need another source.In 
(source.BroadcastedIn?) and those who use it have know its a broadcasted 
variable, so when they implement the PushDown, they actually call the ".value" 
of the broadcast... 


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27249) Developers API for Transformers beyond UnaryTransformer

2020-04-17 Thread Nick Afshartous (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086029#comment-17086029
 ] 

Nick Afshartous edited comment on SPARK-27249 at 4/17/20, 7:54 PM:
---

[~enrush] Hi Everett,

Using the {{Iterator}} approach would seem to deviate from the existing 
{{Transformer}} contract.  More specifically, the {{transform}} function should 
output a new {{DataFrame}} object.  What I would propose is adding a new 
function {{Transformer.compose}} to allow the composition of {{Transformers}}.  

{code}
  def compose(other: Transformer): Transformer = {
new Transformer {
  override def transform(dataset: Dataset[_]): DataFrame = {
other.transform(this.transform(dataset));
  }
...
{code}

Then one could {{compose}} {{Transformers}} which effectively would enable 
multi-column transformations.  

{code}
 val dataFrame = ...
 val transformers = List(transformer1, transformer2, transformer3)
 val multiColumnTransformer = transformers.reduce((x, y) => x.compose(y))

 multiColumnTransformer.transform(dataFrame)
{code}

I'd be happy to submit a PR if this meets your requirments.


was (Author: nafshartous):
[~enrush] Hi Everett,

Using the {{Iterator}} approach would seem to deviate from the existing 
{{Transformer}} contract.  More specifically, the {{transform}} function should 
output a new {{DataFrame}} object.  What I would propose is adding a new 
function {{Transformer.compose}} to allow the composition of {{Transformers}}.  

{code}
  def compose(other: Transformer): Transformer = {
new Transformer {
  override def transform(dataset: Dataset[_]): DataFrame = {
other.transform(this.transform(dataset));
  }
...
{code}

Then one could {{compose}} {{Transformers}} which effectively would enable 
multi-column transformations.  

{code}
 val dataFrame = ...
 val transformers = List(transformer1, transformer2, transformer3)
 val multiColumnTransformer = transformers.reduce((x, y) => x.compose(y))

multiColumnTransformer.transform(dataFrame)
{code}

I'd be happy to submit a PR if this meets your requirments.

> Developers API for Transformers beyond UnaryTransformer
> ---
>
> Key: SPARK-27249
> URL: https://issues.apache.org/jira/browse/SPARK-27249
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 3.1.0
>Reporter: Everett Rush
>Priority: Minor
>  Labels: starter
> Attachments: Screen Shot 2020-01-17 at 4.20.57 PM.png
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> It would be nice to have a developers' API for dataset transformations that 
> need more than one column from a row (ie UnaryTransformer inputs one column 
> and outputs one column) or that contain objects too expensive to initialize 
> repeatedly in a UDF such as a database connection. 
>  
> Design:
> Abstract class PartitionTransformer extends Transformer and defines the 
> partition transformation function as Iterator[Row] => Iterator[Row]
> NB: This parallels the UnaryTransformer createTransformFunc method
>  
> When developers subclass this transformer, they can provide their own schema 
> for the output Row in which case the PartitionTransformer creates a row 
> encoder and executes the transformation. Alternatively the developer can set 
> output Datatype and output col name. Then the PartitionTransformer class will 
> create a new schema, a row encoder, and execute the transformation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27249) Developers API for Transformers beyond UnaryTransformer

2020-04-17 Thread Nick Afshartous (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086029#comment-17086029
 ] 

Nick Afshartous commented on SPARK-27249:
-

[~enrush] Hi Everett,

Using the {{Iterator}} approach would seem to deviate from the existing 
{{Transformer}} contract.  More specifically, the {{transform}} function should 
output a new {{DataFrame}} object.  What I would propose is adding a new 
function {{Transformer.compose}} to allow the composition of {{Transformers}}.  

{code}
  def compose(other: Transformer): Transformer = {
new Transformer {
  override def transform(dataset: Dataset[_]): DataFrame = {
other.transform(this.transform(dataset));
  }
...
{code}

Then one could {{compose}} {{Transformers}} which effectively would enable 
multi-column transformations.  

{code}
 val dataFrame = ...
 val transformers = List(transformer1, transformer2, transformer3)
 val multiColumnTransformer = transformers.reduce((x, y) => x.compose(y))

multiColumnTransformer.transform(dataFrame)
{code}

I'd be happy to submit a PR if this meets your requirments.

> Developers API for Transformers beyond UnaryTransformer
> ---
>
> Key: SPARK-27249
> URL: https://issues.apache.org/jira/browse/SPARK-27249
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 3.1.0
>Reporter: Everett Rush
>Priority: Minor
>  Labels: starter
> Attachments: Screen Shot 2020-01-17 at 4.20.57 PM.png
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> It would be nice to have a developers' API for dataset transformations that 
> need more than one column from a row (ie UnaryTransformer inputs one column 
> and outputs one column) or that contain objects too expensive to initialize 
> repeatedly in a UDF such as a database connection. 
>  
> Design:
> Abstract class PartitionTransformer extends Transformer and defines the 
> partition transformation function as Iterator[Row] => Iterator[Row]
> NB: This parallels the UnaryTransformer createTransformFunc method
>  
> When developers subclass this transformer, they can provide their own schema 
> for the output Row in which case the PartitionTransformer creates a row 
> encoder and executes the transformation. Alternatively the developer can set 
> output Datatype and output col name. Then the PartitionTransformer class will 
> create a new schema, a row encoder, and execute the transformation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31475) Broadcast stage in AQE did not timeout

2020-04-17 Thread Wei Xue (Jira)
Wei Xue created SPARK-31475:
---

 Summary: Broadcast stage in AQE did not timeout
 Key: SPARK-31475
 URL: https://issues.apache.org/jira/browse/SPARK-31475
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Wei Xue






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31474) Consistancy between dayofweek/dow in extract expression and dayofweek function

2020-04-17 Thread Kent Yao (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kent Yao updated SPARK-31474:
-
Summary: Consistancy between dayofweek/dow in extract expression and 
dayofweek function  (was: Consistancy betwwen dayofweek/dow in extract 
expression and dayofweek function)

> Consistancy between dayofweek/dow in extract expression and dayofweek function
> --
>
> Key: SPARK-31474
> URL: https://issues.apache.org/jira/browse/SPARK-31474
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Kent Yao
>Priority: Major
>
> {code:sql}
> spark-sql> SELECT extract(dayofweek from '2009-07-26');
> 1
> spark-sql> SELECT extract(dow from '2009-07-26');
> 0
> spark-sql> SELECT extract(isodow from '2009-07-26');
> 7
> spark-sql> SELECT dayofweek('2009-07-26');
> 1
> spark-sql> SELECT weekday('2009-07-26');
> 6
> {code}
> Currently, there are 4 types of day-of-week range: 
> the function dayofweek(2.3.0) and extracting dayofweek(2.4.0) result as of 
> Sunday(1) to Saturday(7)
> extracting dow(3.0.0) results as of Sunday(0) to Saturday(6)
> extracting isodow (3.0.0)  results as of Monday(1) to Sunday(7)
> the function weekday(2.4.0) results as of Monday(0) to Sunday(6)
> Actually, extracting dayofweek and dow are both derived from PostgreSQL but 
> have different meanings.
> https://issues.apache.org/jira/browse/SPARK-23903
> https://issues.apache.org/jira/browse/SPARK-28623



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31474) Consistancy betwwen dayofweek/dow in extract expression and dayofweek function

2020-04-17 Thread Kent Yao (Jira)
Kent Yao created SPARK-31474:


 Summary: Consistancy betwwen dayofweek/dow in extract expression 
and dayofweek function
 Key: SPARK-31474
 URL: https://issues.apache.org/jira/browse/SPARK-31474
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Kent Yao



{code:sql}
spark-sql> SELECT extract(dayofweek from '2009-07-26');
1
spark-sql> SELECT extract(dow from '2009-07-26');
0
spark-sql> SELECT extract(isodow from '2009-07-26');
7
spark-sql> SELECT dayofweek('2009-07-26');
1
spark-sql> SELECT weekday('2009-07-26');
6
{code}
Currently, there are 4 types of day-of-week range: 
the function dayofweek(2.3.0) and extracting dayofweek(2.4.0) result as of 
Sunday(1) to Saturday(7)
extracting dow(3.0.0) results as of Sunday(0) to Saturday(6)
extracting isodow (3.0.0)  results as of Monday(1) to Sunday(7)
the function weekday(2.4.0) results as of Monday(0) to Sunday(6)

Actually, extracting dayofweek and dow are both derived from PostgreSQL but 
have different meanings.
https://issues.apache.org/jira/browse/SPARK-23903
https://issues.apache.org/jira/browse/SPARK-28623






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:59 PM:


I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


Just confirmed that all the plans execute in the driver, as expected (this is 
the first time I check "deeply" inside spark sources). Maybe the list can be 
passed as a function returning a list instead of a list (so Scala serializes 
just the function with the broadcast variable instead of the whole list), API 
would be compatible I think, but that forces not-to-use case classes + 99% 
breaks binary compatibility :(, ima try a few things before continuing.



was (Author: fsainz):
I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list (so Scala serializes just the 
function with the broadcast variable instead of the whole list), API would be 
compatible I think, but that forces not-to-use case classes + 99% breaks binary 
compatibility :(


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
>

[jira] [Created] (SPARK-31473) AQE should set active session during execution

2020-04-17 Thread Wei Xue (Jira)
Wei Xue created SPARK-31473:
---

 Summary: AQE should set active session during execution
 Key: SPARK-31473
 URL: https://issues.apache.org/jira/browse/SPARK-31473
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Wei Xue






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:15 PM:


I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list (so Scala serializes just the 
function with the broadcast variable instead of the whole list), API would be 
compatible I think, but that forces not-to-use case classes + 99% breaks binary 
compatibility :(



was (Author: fsainz):
I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list, API would be compatible I think, 
but that forces not-to-use case classes + 99% breaks binary compatibility :(


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:14 PM:


I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list, API would be compatible I think, 
but that forces not-to-use case classes + 99% breaks binary compatibility :(



was (Author: fsainz):
I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list, API would be compatible I think, 
but that forces not-to-use case classes + 99% binary compatibility :(


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:14 PM:


I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case). Maybe the list can be passed as a 
function returning a list instead of a list, API would be compatible I think, 
but that forces not-to-use case classes + 99% binary compatibility :(



was (Author: fsainz):
I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case), passing the list as a function returning 
a list, can work and API would be compatible I think, but that breaks case 
classes + 99% binary compatibility :(


> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses th

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:13 PM:


I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)


If that's the case (this is the first time I check "deeply" inside spark 
sources, but I guess that's the case), passing the list as a function returning 
a list, can work and API would be compatible I think, but that breaks case 
classes + 99% binary compatibility :(



was (Author: fsainz):
I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)






> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache

[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 5:08 PM:


I'm going ahead with something 
(@https://github.com/TinoSM/spark/tree/SPARK-31417_broadcast_isin). Just 
realized the In in the pushdown predicates it's not the one exposed in 
predicates, but a post-translation, so adding a new class is not a drama :) 
(unless this post-translation is also transferred from the driver to the 
executors which is what I'm trying to check now). I'm basing it on InSet 
(actually delegating everything to the InSet implementation not to duplicate 
code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)







was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors which is what I'm trying to check now). I'm basing 
it on InSet (actually delegating everything to the InSet implementation not to 
duplicate code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)






> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1537) Add integration with Yarn's Application Timeline Server

2020-04-17 Thread Daniel Templeton (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-1537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085922#comment-17085922
 ] 

Daniel Templeton commented on SPARK-1537:
-

[~vanzin], is that because the community has invested instead in making SHS 
that central metrics store and UI?  What about clusters with mixed workloads?

> Add integration with Yarn's Application Timeline Server
> ---
>
> Key: SPARK-1537
> URL: https://issues.apache.org/jira/browse/SPARK-1537
> Project: Spark
>  Issue Type: New Feature
>  Components: YARN
>Reporter: Marcelo Masiero Vanzin
>Priority: Major
> Attachments: SPARK-1537.txt, spark-1573.patch
>
>
> It would be nice to have Spark integrate with Yarn's Application Timeline 
> Server (see YARN-321, YARN-1530). This would allow users running Spark on 
> Yarn to have a single place to go for all their history needs, and avoid 
> having to manage a separate service (Spark's built-in server).
> At the moment, there's a working version of the ATS in the Hadoop 2.4 branch, 
> although there is still some ongoing work. But the basics are there, and I 
> wouldn't expect them to change (much) at this point.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:43 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors which is what I'm trying to check now). I'm basing 
it on InSet (actually delegating everything to the InSet implementation not to 
duplicate code) and adding a new method to the Column isInBroadcastCollection.



Is File/DataSourceStrategy executed in the driver? (this is where I get values 
in order to pushdown, at least If I want to reuse the current In pushdown)







was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors which is what I'm trying to check now). I'm basing 
it on InSet (actually delegating everything to the InSet implementation not to 
duplicate code) and adding a new method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown, at least If I want to reuse the current In pushdown)






> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:41 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors which is what I'm trying to check now). I'm basing 
it on InSet (actually delegating everything to the InSet implementation not to 
duplicate code) and adding a new method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown, at least If I want to reuse the current In pushdown)







was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown, at least If I want to reuse the current In pushdown)






> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:40 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown, at least If I want to reuse the current In pushdown)







was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown, at least If I use the current In pushdown)






> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:38 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown)







was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.







> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:38 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown, at least If I use the current In pushdown)







was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.



Is FileSourceStrategy executed in the driver? (this is where I get values in 
order to pushdown)






> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30495) How to disable 'spark.security.credentials.${service}.enabled' in Structured streaming while connecting to a kafka cluster

2020-04-17 Thread Eric (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085914#comment-17085914
 ] 

Eric commented on SPARK-30495:
--

Anyone knows when this fix will make it to a public build?  I see that it is 
part of the RC1 tag, but I can't find any RC1 binaries in maven.  I am using 
3.0.0 preview2 and this bug is a blocker for us.  Any idea when 3.0.0 will be 
officially out?

> How to disable 'spark.security.credentials.${service}.enabled' in Structured 
> streaming while connecting to a kafka cluster
> --
>
> Key: SPARK-30495
> URL: https://issues.apache.org/jira/browse/SPARK-30495
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: act_coder
>Assignee: Gabor Somogyi
>Priority: Major
> Fix For: 3.0.0
>
>
> Trying to read data from a secured Kafka cluster using spark structured
>  streaming. Also, using the below library to read the data -
>  +*"spark-sql-kafka-0-10_2.12":"3.0.0-preview"*+ since it has the feature to
>  specify our custom group id (instead of spark setting its own custom group
>  id)
> +*Dependency used in code:*+
>         org.apache.spark
>          spark-sql-kafka-0-10_2.12
>          3.0.0-preview
>  
> +*Logs:*+
> Getting the below error - even after specifying the required JAAS
>  configuration in spark options.
> Caused by: java.lang.IllegalArgumentException: requirement failed:
>  *Delegation token must exist for this connector*. at
>  scala.Predef$.require(Predef.scala:281) at
> org.apache.spark.kafka010.KafkaTokenUtil$.isConnectorUsingCurrentToken(KafkaTokenUtil.scala:299)
>  at
>  
> org.apache.spark.sql.kafka010.KafkaDataConsumer.getOrRetrieveConsumer(KafkaDataConsumer.scala:533)
>  at
>  
> org.apache.spark.sql.kafka010.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:275)
>  
> +*Spark configuration used to read from Kafka:*+
> val kafkaDF = sparkSession.readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", bootStrapServer)
>  .option("subscribe", kafkaTopic )
>  
> //Setting JAAS Configuration
> .option("kafka.sasl.jaas.config", KAFKA_JAAS_SASL)
>  .option("kafka.sasl.mechanism", "PLAIN")
>  .option("kafka.security.protocol", "SASL_SSL")
> // Setting custom consumer group id
> .option("kafka.group.id", "test_cg")
>  .load()
>  
> Following document specifies that we can disable the feature of obtaining
>  delegation token -
>  
> [https://spark.apache.org/docs/3.0.0-preview/structured-streaming-kafka-integration.html]
> Tried setting this property *spark.security.credentials.kafka.enabled to*
>  *false in spark config,* but it is still failing with the same error.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:26 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :) (unless this post-translation is also transferred from 
the driver to the executors). I'm basing it on InSet (actually delegating 
everything to the InSet implementation not to duplicate code) and adding a new 
method to the Column isInBroadcastCollection.








was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :). I'm basing it on InSet (actually delegating everything 
to the InSet implementation not to duplicate code) and adding a new method to 
the Column isInBroadcastCollection.







> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz edited comment on SPARK-31417 at 4/17/20, 4:08 PM:


I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :). I'm basing it on InSet (actually delegating everything 
to the InSet implementation not to duplicate code) and adding a new method to 
the Column isInBroadcastCollection.








was (Author: fsainz):
I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :). I'm basing it on InSet (actually delegating everything 
to the InSet implementation) and adding a new method to the Column 
isInBroadcastCollection.







> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31417) Allow broadcast variables when using isin code

2020-04-17 Thread Florentino Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085900#comment-17085900
 ] 

Florentino Sainz commented on SPARK-31417:
--

I'm going ahead with something. Just realized the In in the pushdown predicates 
it's not the one exposed in predicates, but a post-translation, so adding a new 
class is not a drama :). I'm basing it on InSet (actually delegating everything 
to the InSet implementation) and adding a new method to the Column 
isInBroadcastCollection.







> Allow broadcast variables when using isin code
> --
>
> Key: SPARK-31417
> URL: https://issues.apache.org/jira/browse/SPARK-31417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Florentino Sainz
>Priority: Minor
>
> *I would like some help to explore if the following feature makes sense and 
> what's the best way to implement it. Allow users to use "isin" (or similar) 
> predicates with huge Lists (which might be broadcasted).*
> As of now (AFAIK), users can only provide a list/sequence of elements which 
> will be sent as part of the "In" predicate, which is part of the task, to the 
> executors. So when this list is huge, this causes tasks to be "huge" 
> (specially when the task number is big).
> I'm coming from 
> https://stackoverflow.com/questions/6172/scala-spark-isin-broadcast-list 
> (I'm the creator of the post, and also the 2nd answer). 
> I know this is not common, but can be useful for people reading from 
> "servers" which are not in the same nodes than Spark (maybe S3/cloud?). In my 
> concrete case, querying Kudu with 200.000 elements in the isin takes 
> ~2minutes (+1 minute "calculating plan" to send the BIG tasks to the 
> executors) versus 30 minutes doing a full-scan (even if semi-filtered, most 
> of the data has the same nature and thus is hard to filter, unless I use this 
> "200.000" list) on my table.
> More or less I have a clear view (explained in stackoverflow) on what to 
> modify If I wanted to create my own "BroadcastIn" in 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala,
>  however with my limited "idea", anyone who implemented In pushdown would 
> have to re-adapt it to the new "BroadcastIn"
> I've looked at Spark 3.0 sources and nothing has changed there, but I'm not 
> 100% sure if there's an alternative there (I'm stuck in 2.4 anyways, working 
> for a corporation which uses that version).
> Maybe I can go ahead with a PR (slowly, in my free time), however I'm not an 
> expert in Scala syntax/syntactic sugar, *anyone has any idea on how to extend 
> current case class "In" while keeping the compatibility with PushDown 
> implementations (aka creating a new Predicate not allowed I think), but not 
> having only Seq[Expression] in the case class, but also allow 
> Broadcast[Seq[Expression]]?* This is what makes the tasks huge, I made a 
> test-run with a predicate which receives and Broadcast variable and uses the 
> value inside, and it works much better.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27913) Spark SQL's native ORC reader implements its own schema evolution

2020-04-17 Thread Giri Dandu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085852#comment-17085852
 ] 

Giri Dandu commented on SPARK-27913:


[~viirya] Sorry for late reply.

I re-ran the same test in spark 2.4.5 and it *is NOT working* but it works in 
2.3.0. I get the same error in spark 2.4.5

 
{code:java}
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1Caused by: 
java.lang.ArrayIndexOutOfBoundsException: 1 at 
org.apache.orc.mapred.OrcStruct.getFieldValue(OrcStruct.java:49) at 
org.apache.spark.sql.execution.datasources.orc.OrcDeserializer$$anonfun$org$apache$spark$sql$execution$datasources$orc$OrcDeserializer$$newWriter$14.apply(OrcDeserializer.scala:133)
 at 
org.apache.spark.sql.execution.datasources.orc.OrcDeserializer$$anonfun$org$apache$spark$sql$execution$datasources$orc$OrcDeserializer$$newWriter$14.apply(OrcDeserializer.scala:123)
 at 
org.apache.spark.sql.execution.datasources.orc.OrcDeserializer$$anonfun$2$$anonfun$apply$1.apply(OrcDeserializer.scala:51)
 at 
org.apache.spark.sql.execution.datasources.orc.OrcDeserializer$$anonfun$2$$anonfun$apply$1.apply(OrcDeserializer.scala:51)
 at 
org.apache.spark.sql.execution.datasources.orc.OrcDeserializer.deserialize(OrcDeserializer.scala:64)
 at 
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anonfun$buildReaderWithPartitionValues$2$$anonfun$apply$8.apply(OrcFileFormat.scala:234)
 at 
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anonfun$buildReaderWithPartitionValues$2$$anonfun$apply$8.apply(OrcFileFormat.scala:233)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:104)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) 
{code}

> Spark SQL's native ORC reader implements its own schema evolution
> -
>
> Key: SPARK-27913
> URL: https://issues.apache.org/jira/browse/SPARK-27913
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.3
>Reporter: Owen O'Malley
>Priority: Major
>
> ORC's reader handles a wide range of schema evolution, but the Spark SQL 
> native ORC bindings do not provide the desired schema to the ORC reader. This 
> causes a regression when moving spark.sql.orc.impl from 'hive' to 'native'.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-31468) Null types should be implicitly casted to Decimal types

2020-04-17 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-31468.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

Issue resolved by pull request 28241
[https://github.com/apache/spark/pull/28241]

> Null types should be implicitly casted to Decimal types
> ---
>
> Key: SPARK-31468
> URL: https://issues.apache.org/jira/browse/SPARK-31468
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Takeshi Yamamuro
>Assignee: Takeshi Yamamuro
>Priority: Major
> Fix For: 3.0.0
>
>
> A query below fails in master/branch-3.0 and passed in v2.4.5;
> {code}
> scala> Seq(BigDecimal(10)).toDF("v1").selectExpr("v1 = NULL").explain(true)
> org.apache.spark.sql.AnalysisException: cannot resolve '(`v1` = NULL)' due to 
> data type mismatch: differing types in '(`v1` = NULL)' (decimal(38,18) and 
> null).; line 1 pos 0;
> 'Project [(v1#5 = null) AS (v1 = NULL)#7]
> +- Project [value#2 AS v1#5]
>+- LocalRelation [value#2]
> ...
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-31468) Null types should be implicitly casted to Decimal types

2020-04-17 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-31468:
---

Assignee: Takeshi Yamamuro

> Null types should be implicitly casted to Decimal types
> ---
>
> Key: SPARK-31468
> URL: https://issues.apache.org/jira/browse/SPARK-31468
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Takeshi Yamamuro
>Assignee: Takeshi Yamamuro
>Priority: Major
>
> A query below fails in master/branch-3.0 and passed in v2.4.5;
> {code}
> scala> Seq(BigDecimal(10)).toDF("v1").selectExpr("v1 = NULL").explain(true)
> org.apache.spark.sql.AnalysisException: cannot resolve '(`v1` = NULL)' due to 
> data type mismatch: differing types in '(`v1` = NULL)' (decimal(38,18) and 
> null).; line 1 pos 0;
> 'Project [(v1#5 = null) AS (v1 = NULL)#7]
> +- Project [value#2 AS v1#5]
>+- LocalRelation [value#2]
> ...
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31472) allGather() may return null messages

2020-04-17 Thread wuyi (Jira)
wuyi created SPARK-31472:


 Summary: allGather() may return null messages 
 Key: SPARK-31472
 URL: https://issues.apache.org/jira/browse/SPARK-31472
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: wuyi


{code:java}
[info] BarrierTaskContextSuite:
[info] - share messages with allGather() call *** FAILED *** (18 seconds, 705 
milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: 
Could not recover from a failed barrier ResultStage. Most recent failure 
reason: Stage failed because barrier task ResultTask(0, 2) finished 
unsuccessfully.
[info] java.lang.NullPointerException
[info]  at 
scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:204)
[info]  at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:204)
[info]  at 
scala.collection.IndexedSeqOptimized.toList(IndexedSeqOptimized.scala:285)
[info]  at 
scala.collection.IndexedSeqOptimized.toList$(IndexedSeqOptimized.scala:284)
[info]  at scala.collection.mutable.ArrayOps$ofRef.toList(ArrayOps.scala:198)
[info]  at 
org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$4(BarrierTaskContextSuite.scala:68)
[info]  at 
org.apache.spark.rdd.RDDBarrier.$anonfun$mapPartitions$2(RDDBarrier.scala:51)
[info]  at 
org.apache.spark.rdd.RDDBarrier.$anonfun$mapPartitions$2$adapted(RDDBarrier.scala:51)
[info]  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
[info]  at org.apache.spark.scheduler.Task.run(Task.scala:127)
[info]  at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:460)
[info]  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
[info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:463)
[info]  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]  at java.lang.Thread.run(Thread.java:748)
[info]   at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2094)
[info]   at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2043)
[info]   at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2042)
[info]   at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
[info]   at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
[info]   at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2042)
[info]   at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1831)
[info]   at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2271)
[info]   at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2223)
[info]   at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2212)
[info]   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
[info]   at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:822)
[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108)
[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129)
[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148)
[info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2173)
[info]   at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
[info]   at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info]   at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[info]   at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
[info]   at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
[info]   at 
org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$3(BarrierTaskContextSuite.scala:71)
[info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:151)
[info]   at 
org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSui

[jira] [Resolved] (SPARK-31469) Make extract interval field ANSI compliance

2020-04-17 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-31469.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

Issue resolved by pull request 28242
[https://github.com/apache/spark/pull/28242]

> Make extract interval field ANSI compliance
> ---
>
> Key: SPARK-31469
> URL: https://issues.apache.org/jira/browse/SPARK-31469
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0, 3.1.0
>Reporter: Kent Yao
>Assignee: Kent Yao
>Priority: Major
> Fix For: 3.0.0
>
>
> Currently, we can extract 
> millennium/century/decade/year/quarter/month/week/day/hour/minute/second(with 
> fractions)//millisecond/microseconds and epoch from interval values
> millennium/century/decade/year means how many the interval months part can be 
> converted to that unit-value.
> while month/day and so on, the integral remainder of the previous unit.
> while getting epoch we have treat month as 30 days which varies the natural 
> Calendar rules we use.
> To avoid ambiguity,  I suggest we should only support those extract field 
> defined ANSI with their abbreviations. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-31469) Make extract interval field ANSI compliance

2020-04-17 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-31469:
---

Assignee: Kent Yao

> Make extract interval field ANSI compliance
> ---
>
> Key: SPARK-31469
> URL: https://issues.apache.org/jira/browse/SPARK-31469
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0, 3.1.0
>Reporter: Kent Yao
>Assignee: Kent Yao
>Priority: Major
>
> Currently, we can extract 
> millennium/century/decade/year/quarter/month/week/day/hour/minute/second(with 
> fractions)//millisecond/microseconds and epoch from interval values
> millennium/century/decade/year means how many the interval months part can be 
> converted to that unit-value.
> while month/day and so on, the integral remainder of the previous unit.
> while getting epoch we have treat month as 30 days which varies the natural 
> Calendar rules we use.
> To avoid ambiguity,  I suggest we should only support those extract field 
> defined ANSI with their abbreviations. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26902) Support java.time.Instant as an external type of TimestampType

2020-04-17 Thread Jorge Machado (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-26902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085759#comment-17085759
 ] 

Jorge Machado commented on SPARK-26902:
---

what about Supporting the interface Temporal ?

> Support java.time.Instant as an external type of TimestampType
> --
>
> Key: SPARK-26902
> URL: https://issues.apache.org/jira/browse/SPARK-26902
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Major
> Fix For: 3.0.0
>
>
> Currently, Spark supports the java.sql.Date and java.sql.Timestamp types as 
> external types for Catalyst's DateType and TimestampType. It accepts and 
> produces values of such types. Since Java 8, base classes for dates and 
> timestamps are java.time.Instant, java.time.LocalDate/LocalDateTime, and 
> java.time.ZonedDateTime. Need to add new converters from/to Instant.
> The Instant type holds epoch seconds (and nanoseconds), and directly reflects 
> to Catalyst's TimestampType.
> Main motivations for the changes:
> - Smoothly support Java 8 time API
> - Avoid inconsistency of calendars used inside Spark 3.0 (Proleptic Gregorian 
> calendar) and inside of java.sql.Timestamp (hybrid calendar - Julian + 
> Gregorian). 
> - Make conversion independent from current system timezone.
> In case of collecting values of Date/TimestampType, the following SQL config 
> can control types of returned values:
>  - spark.sql.catalyst.timestampType with supported values 
> "java.sql.Timestamp" (by default) and "java.time.Instant"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-31436) MinHash keyDistance optimization

2020-04-17 Thread Sean R. Owen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean R. Owen resolved SPARK-31436.
--
Fix Version/s: 3.1.0
   Resolution: Fixed

Issue resolved by pull request 28206
[https://github.com/apache/spark/pull/28206]

> MinHash keyDistance optimization
> 
>
> Key: SPARK-31436
> URL: https://issues.apache.org/jira/browse/SPARK-31436
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 3.1.0
>Reporter: zhengruifeng
>Assignee: zhengruifeng
>Priority: Minor
> Fix For: 3.1.0
>
>
> current implementation is based on set operation, it is inefficient



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-31436) MinHash keyDistance optimization

2020-04-17 Thread Sean R. Owen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean R. Owen reassigned SPARK-31436:


Assignee: zhengruifeng

> MinHash keyDistance optimization
> 
>
> Key: SPARK-31436
> URL: https://issues.apache.org/jira/browse/SPARK-31436
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 3.1.0
>Reporter: zhengruifeng
>Assignee: zhengruifeng
>Priority: Minor
>
> current implementation is based on set operation, it is inefficient



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31471) Add a script to run multiple benchmarks

2020-04-17 Thread Maxim Gekk (Jira)
Maxim Gekk created SPARK-31471:
--

 Summary: Add a script to run multiple benchmarks
 Key: SPARK-31471
 URL: https://issues.apache.org/jira/browse/SPARK-31471
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 3.1.0
Reporter: Maxim Gekk


Add a python script to run multiple benchmarks. The script can be taken from 
[https://github.com/apache/spark/pull/27078]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31448) Difference in Storage Levels used in cache() and persist() for pyspark dataframes

2020-04-17 Thread Abhishek Dixit (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Dixit updated SPARK-31448:
---
Labels: pys  (was: )

> Difference in Storage Levels used in cache() and persist() for pyspark 
> dataframes
> -
>
> Key: SPARK-31448
> URL: https://issues.apache.org/jira/browse/SPARK-31448
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Abhishek Dixit
>Priority: Major
>  Labels: pys
>
> There is a difference in default storage level *MEMORY_AND_DISK* in pyspark 
> and scala.
> *Scala*: StorageLevel(true, true, false, true)
> *Pyspark:* StorageLevel(True, True, False, False)
>  
> *Problem Description:* 
> Calling *df.cache()*  for pyspark dataframe directly invokes Scala method 
> cache() and Storage Level used is StorageLevel(true, true, false, true).
> But calling *df.persist()* for pyspark dataframe sets the 
> newStorageLevel=StorageLevel(true, true, false, false) inside pyspark and 
> then invokes Scala function persist(newStorageLevel).
> *Possible Fix:*
> Invoke pyspark function persist inside pyspark function cache instead of 
> calling the scala function directly.
> I can raise a PR for this fix if someone can confirm that this is a bug and 
> the possible fix is the correct approach.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31448) Difference in Storage Levels used in cache() and persist() for pyspark dataframes

2020-04-17 Thread Abhishek Dixit (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Dixit updated SPARK-31448:
---
Labels:   (was: pys)

> Difference in Storage Levels used in cache() and persist() for pyspark 
> dataframes
> -
>
> Key: SPARK-31448
> URL: https://issues.apache.org/jira/browse/SPARK-31448
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Abhishek Dixit
>Priority: Major
>
> There is a difference in default storage level *MEMORY_AND_DISK* in pyspark 
> and scala.
> *Scala*: StorageLevel(true, true, false, true)
> *Pyspark:* StorageLevel(True, True, False, False)
>  
> *Problem Description:* 
> Calling *df.cache()*  for pyspark dataframe directly invokes Scala method 
> cache() and Storage Level used is StorageLevel(true, true, false, true).
> But calling *df.persist()* for pyspark dataframe sets the 
> newStorageLevel=StorageLevel(true, true, false, false) inside pyspark and 
> then invokes Scala function persist(newStorageLevel).
> *Possible Fix:*
> Invoke pyspark function persist inside pyspark function cache instead of 
> calling the scala function directly.
> I can raise a PR for this fix if someone can confirm that this is a bug and 
> the possible fix is the correct approach.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-31423) DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC

2020-04-17 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-31423.
-
Resolution: Not A Problem

> DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC
> --
>
> Key: SPARK-31423
> URL: https://issues.apache.org/jira/browse/SPARK-31423
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.1.0
>Reporter: Bruce Robbins
>Priority: Major
>
> There is a range of days (1582-10-05 to 1582-10-14) for which DATEs and 
> TIMESTAMPS are changed when stored in ORC. The value is off by 10 days.
> For example:
> {noformat}
> scala> val df = sql("select cast('1582-10-14' as DATE) dt")
> df: org.apache.spark.sql.DataFrame = [dt: date]
> scala> df.show // seems fine
> +--+
> |dt|
> +--+
> |1582-10-14|
> +--+
> scala> df.write.mode("overwrite").orc("/tmp/funny_orc_date")
> scala> spark.read.orc("/tmp/funny_orc_date").show // off by 10 days
> +--+
> |dt|
> +--+
> |1582-10-24|
> +--+
> scala>
> {noformat}
> ORC has the same issue with TIMESTAMPS:
> {noformat}
> scala> val df = sql("select cast('1582-10-14 00:00:00' as TIMESTAMP) ts")
> df: org.apache.spark.sql.DataFrame = [ts: timestamp]
> scala> df.show // seems fine
> +---+
> | ts|
> +---+
> |1582-10-14 00:00:00|
> +---+
> scala> df.write.mode("overwrite").orc("/tmp/funny_orc_timestamp")
> scala> spark.read.orc("/tmp/funny_orc_timestamp").show(truncate=false) // off 
> by 10 days
> +---+
> |ts |
> +---+
> |1582-10-24 00:00:00|
> +---+
> scala> 
> {noformat}
> However, when written to Parquet or Avro, DATES and TIMESTAMPs for this range 
> do not change.
> {noformat}
> scala> val df = sql("select cast('1582-10-14' as DATE) dt")
> df: org.apache.spark.sql.DataFrame = [dt: date]
> scala> df.write.mode("overwrite").parquet("/tmp/funny_parquet_date")
> scala> spark.read.parquet("/tmp/funny_parquet_date").show // reflects 
> original value
> +--+
> |dt|
> +--+
> |1582-10-14|
> +--+
> scala> val df = sql("select cast('1582-10-14' as DATE) dt")
> df: org.apache.spark.sql.DataFrame = [dt: date]
> scala> df.write.mode("overwrite").format("avro").save("/tmp/funny_avro_date")
> scala> spark.read.format("avro").load("/tmp/funny_avro_date").show // 
> reflects original value
> +--+
> |dt|
> +--+
> |1582-10-14|
> +--+
> scala> 
> {noformat}
> It's unclear to me whether ORC is behaving correctly or not, as this is how 
> Spark 2.4 works with DATEs and TIMESTAMPs in general (and also how Spark 3.x 
> works with DATEs and TIMESTAMPs in general when 
> {{spark.sql.legacy.timeParserPolicy}} is set to {{LEGACY}}). In Spark 2.4, 
> DATEs and TIMESTAMPs in this range don't exist:
> {noformat}
> scala> sql("select cast('1582-10-14' as DATE) dt").show // the same cast done 
> in Spark 2.4
> +--+
> |dt|
> +--+
> |1582-10-24|
> +--+
> scala> 
> {noformat}
> I assume the following snippet is relevant (from the Wikipedia entry on the 
> Gregorian calendar):
> {quote}To deal with the 10 days' difference (between calendar and 
> reality)[Note 2] that this drift had already reached, the date was advanced 
> so that 4 October 1582 was followed by 15 October 1582
> {quote}
> Spark 3.x should treat DATEs and TIMESTAMPS in this range consistently, and 
> probably based on spark.sql.legacy.timeParserPolicy (or some other config) 
> rather than file format.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31423) DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC

2020-04-17 Thread Wenchen Fan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085585#comment-17085585
 ] 

Wenchen Fan commented on SPARK-31423:
-

I'm closing it as "not a bug". This happens because we "rebase" the datetime 
values for the calendar changes. There are some dates exist in one calendar but 
not another. What we can do is to move to the next valid date. Eventually every 
place should use the standard calendar so we won't have this problem.

> DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC
> --
>
> Key: SPARK-31423
> URL: https://issues.apache.org/jira/browse/SPARK-31423
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.1.0
>Reporter: Bruce Robbins
>Priority: Major
>
> There is a range of days (1582-10-05 to 1582-10-14) for which DATEs and 
> TIMESTAMPS are changed when stored in ORC. The value is off by 10 days.
> For example:
> {noformat}
> scala> val df = sql("select cast('1582-10-14' as DATE) dt")
> df: org.apache.spark.sql.DataFrame = [dt: date]
> scala> df.show // seems fine
> +--+
> |dt|
> +--+
> |1582-10-14|
> +--+
> scala> df.write.mode("overwrite").orc("/tmp/funny_orc_date")
> scala> spark.read.orc("/tmp/funny_orc_date").show // off by 10 days
> +--+
> |dt|
> +--+
> |1582-10-24|
> +--+
> scala>
> {noformat}
> ORC has the same issue with TIMESTAMPS:
> {noformat}
> scala> val df = sql("select cast('1582-10-14 00:00:00' as TIMESTAMP) ts")
> df: org.apache.spark.sql.DataFrame = [ts: timestamp]
> scala> df.show // seems fine
> +---+
> | ts|
> +---+
> |1582-10-14 00:00:00|
> +---+
> scala> df.write.mode("overwrite").orc("/tmp/funny_orc_timestamp")
> scala> spark.read.orc("/tmp/funny_orc_timestamp").show(truncate=false) // off 
> by 10 days
> +---+
> |ts |
> +---+
> |1582-10-24 00:00:00|
> +---+
> scala> 
> {noformat}
> However, when written to Parquet or Avro, DATES and TIMESTAMPs for this range 
> do not change.
> {noformat}
> scala> val df = sql("select cast('1582-10-14' as DATE) dt")
> df: org.apache.spark.sql.DataFrame = [dt: date]
> scala> df.write.mode("overwrite").parquet("/tmp/funny_parquet_date")
> scala> spark.read.parquet("/tmp/funny_parquet_date").show // reflects 
> original value
> +--+
> |dt|
> +--+
> |1582-10-14|
> +--+
> scala> val df = sql("select cast('1582-10-14' as DATE) dt")
> df: org.apache.spark.sql.DataFrame = [dt: date]
> scala> df.write.mode("overwrite").format("avro").save("/tmp/funny_avro_date")
> scala> spark.read.format("avro").load("/tmp/funny_avro_date").show // 
> reflects original value
> +--+
> |dt|
> +--+
> |1582-10-14|
> +--+
> scala> 
> {noformat}
> It's unclear to me whether ORC is behaving correctly or not, as this is how 
> Spark 2.4 works with DATEs and TIMESTAMPs in general (and also how Spark 3.x 
> works with DATEs and TIMESTAMPs in general when 
> {{spark.sql.legacy.timeParserPolicy}} is set to {{LEGACY}}). In Spark 2.4, 
> DATEs and TIMESTAMPs in this range don't exist:
> {noformat}
> scala> sql("select cast('1582-10-14' as DATE) dt").show // the same cast done 
> in Spark 2.4
> +--+
> |dt|
> +--+
> |1582-10-24|
> +--+
> scala> 
> {noformat}
> I assume the following snippet is relevant (from the Wikipedia entry on the 
> Gregorian calendar):
> {quote}To deal with the 10 days' difference (between calendar and 
> reality)[Note 2] that this drift had already reached, the date was advanced 
> so that 4 October 1582 was followed by 15 October 1582
> {quote}
> Spark 3.x should treat DATEs and TIMESTAMPS in this range consistently, and 
> probably based on spark.sql.legacy.timeParserPolicy (or some other config) 
> rather than file format.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31470) Introduce SORTED BY clause in CREATE TABLE statement

2020-04-17 Thread Yuming Wang (Jira)
Yuming Wang created SPARK-31470:
---

 Summary: Introduce SORTED BY clause in CREATE TABLE statement
 Key: SPARK-31470
 URL: https://issues.apache.org/jira/browse/SPARK-31470
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Yuming Wang


We usually sort on frequently filtered columns when writing data to improve 
query performance. But there is no these info in the table information.
 

{code:sql}
CREATE TABLE t(day INT, hour INT, year INT, month INT)
USING parquet
PARTITIONED BY (year, month)
SORTED BY (day, hour);
{code}

 

Impala support this clause: https://issues.apache.org/jira/browse/IMPALA-4166



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31469) Make extract interval field ANSI compliance

2020-04-17 Thread Kent Yao (Jira)
Kent Yao created SPARK-31469:


 Summary: Make extract interval field ANSI compliance
 Key: SPARK-31469
 URL: https://issues.apache.org/jira/browse/SPARK-31469
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0, 3.1.0
Reporter: Kent Yao


Currently, we can extract 
millennium/century/decade/year/quarter/month/week/day/hour/minute/second(with 
fractions)//millisecond/microseconds and epoch from interval values

millennium/century/decade/year means how many the interval months part can be 
converted to that unit-value.

while month/day and so on, the integral remainder of the previous unit.

while getting epoch we have treat month as 30 days which varies the natural 
Calendar rules we use.

To avoid ambiguity,  I suggest we should only support those extract field 
defined ANSI with their abbreviations. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31467) Fix test issue with table named `test` in hive/SQLQuerySuite

2020-04-17 Thread feiwang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

feiwang updated SPARK-31467:

Description: 
If we add ut in hive/SQLQuerySuite and use table named `test`. We may meet 
these exceptions.
{code:java}
 org.apache.spark.sql.AnalysisException: Inserting into an RDD-based table is 
not allowed.;;
[info] 'InsertIntoTable Project [_1#1403 AS key#1406, _2#1404 AS value#1407], 
Map(name -> Some(n1)), true, false
[info] +- Project [col1#3850]
[info]+- LocalRelation [col1#3850]
{code}


{code:java}
org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: Table or 
view 'test' already exists in database 'default';
[info]   at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateTable$1.apply$mcV$sp(HiveExternalCatalog.scala:226)
[info]   at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateTable$1.apply(HiveExternalCatalog.scala:216)
[info]   at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateTable$1.apply(HiveExternalCatalog.scala:216)
[info]   at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
[info]   at 
org.apache.spark.sql.hive.HiveExternalCatalog.doCreateTable(HiveExternalCatalog.scala:216)
{code}



> Fix test issue with table named `test` in hive/SQLQuerySuite
> 
>
> Key: SPARK-31467
> URL: https://issues.apache.org/jira/browse/SPARK-31467
> Project: Spark
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 3.1.0
>Reporter: feiwang
>Priority: Major
>
> If we add ut in hive/SQLQuerySuite and use table named `test`. We may meet 
> these exceptions.
> {code:java}
>  org.apache.spark.sql.AnalysisException: Inserting into an RDD-based table is 
> not allowed.;;
> [info] 'InsertIntoTable Project [_1#1403 AS key#1406, _2#1404 AS value#1407], 
> Map(name -> Some(n1)), true, false
> [info] +- Project [col1#3850]
> [info]+- LocalRelation [col1#3850]
> {code}
> {code:java}
> org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: Table or 
> view 'test' already exists in database 'default';
> [info]   at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateTable$1.apply$mcV$sp(HiveExternalCatalog.scala:226)
> [info]   at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateTable$1.apply(HiveExternalCatalog.scala:216)
> [info]   at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateTable$1.apply(HiveExternalCatalog.scala:216)
> [info]   at 
> org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
> [info]   at 
> org.apache.spark.sql.hive.HiveExternalCatalog.doCreateTable(HiveExternalCatalog.scala:216)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31468) Null types should be implicitly casted to Decimal types

2020-04-17 Thread Takeshi Yamamuro (Jira)
Takeshi Yamamuro created SPARK-31468:


 Summary: Null types should be implicitly casted to Decimal types
 Key: SPARK-31468
 URL: https://issues.apache.org/jira/browse/SPARK-31468
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Takeshi Yamamuro


A query below fails in master/branch-3.0 and passed in v2.4.5;
{code}
scala> Seq(BigDecimal(10)).toDF("v1").selectExpr("v1 = NULL").explain(true)
org.apache.spark.sql.AnalysisException: cannot resolve '(`v1` = NULL)' due to 
data type mismatch: differing types in '(`v1` = NULL)' (decimal(38,18) and 
null).; line 1 pos 0;
'Project [(v1#5 = null) AS (v1 = NULL)#7]
+- Project [value#2 AS v1#5]
   +- LocalRelation [value#2]
...
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31467) Fix test issue with table named `test` in hive/SQLQuerySuite

2020-04-17 Thread feiwang (Jira)
feiwang created SPARK-31467:
---

 Summary: Fix test issue with table named `test` in 
hive/SQLQuerySuite
 Key: SPARK-31467
 URL: https://issues.apache.org/jira/browse/SPARK-31467
 Project: Spark
  Issue Type: Bug
  Components: Tests
Affects Versions: 3.1.0
Reporter: feiwang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-30913) Add version information to the configuration of Tests.scala

2020-04-17 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-30913:


Assignee: jiaan.geng

> Add version information to the configuration of Tests.scala
> ---
>
> Key: SPARK-30913
> URL: https://issues.apache.org/jira/browse/SPARK-30913
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: jiaan.geng
>Assignee: jiaan.geng
>Priority: Major
> Fix For: 3.0.0, 3.1.0
>
>
> core/src/main/scala/org/apache/spark/internal/config/Tests.scala



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org