[jira] [Updated] (SPARK-33559) Column pruning with monotonically_increasing_id

2020-11-25 Thread Gaetan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaetan updated SPARK-33559:
---
Affects Version/s: (was: 3.0.1)
   2.4.0

> Column pruning with monotonically_increasing_id
> ---
>
> Key: SPARK-33559
> URL: https://issues.apache.org/jira/browse/SPARK-33559
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Gaetan
>Priority: Minor
>
> {code:java}
> df = ss.read.parquet("/path/to/parquet/dataset") 
> df.select("partnerid").withColumn("index", 
> sf.monotonically_increasing_id()).explain(True){code}
>  {{We should expect to only read partnerid from parquet dataset but we 
> actually read the whole dataset:}}
> {code:java}
> ... == Physical Plan == Project [partnerid#6794, 
> monotonically_increasing_id() AS index#24939L] +- FileScan parquet 
> [impression_id#6550,arbitrage_id#6551,display_timestamp#6552L,requesttimestamputc#6553,affiliateid#6554,amp_adrequest_type#6555,app_id#6556,app_name#6557,appnexus_viewability#6558,apxpagevertical#6559,arbitrage_time#6560,banner_type#6561,display_type_int#6562,has_multiple_display_types#6563,bannerid#6564,bid_app_id_hash#6565,bid_url_domain_hash#6566,bidding_details#6567,bid_level_core#6568,bidrandomization_user_factor#6569,bidrandomization_user_mu#6570,bidrandomization_user_sigma#6571,big_lastrequesttimestampsession#6572,big_nbrequestaffiliatesession#6573,...
>  566 more fields] ...{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33559) Column pruning with monotonically_increasing_id

2020-11-25 Thread Gaetan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaetan updated SPARK-33559:
---
Description: 
{code:java}
df = ss.read.parquet("/path/to/parquet/dataset") 
df.select("partnerid").withColumn("index", 
sf.monotonically_increasing_id()).explain(True){code}

 {{We should expect to only read partnerid from parquet dataset but we actually 
read the whole dataset:}}


{code:java}
... == Physical Plan == Project [partnerid#6794, monotonically_increasing_id() 
AS index#24939L] +- FileScan parquet 
[impression_id#6550,arbitrage_id#6551,display_timestamp#6552L,requesttimestamputc#6553,affiliateid#6554,amp_adrequest_type#6555,app_id#6556,app_name#6557,appnexus_viewability#6558,apxpagevertical#6559,arbitrage_time#6560,banner_type#6561,display_type_int#6562,has_multiple_display_types#6563,bannerid#6564,bid_app_id_hash#6565,bid_url_domain_hash#6566,bidding_details#6567,bid_level_core#6568,bidrandomization_user_factor#6569,bidrandomization_user_mu#6570,bidrandomization_user_sigma#6571,big_lastrequesttimestampsession#6572,big_nbrequestaffiliatesession#6573,...
 566 more fields] ...{code}

  was:
{{}}{{}}
{code:java}
df = ss.read.parquet("/path/to/parquet/dataset") 
df.select("partnerid").withColumn("index", 
sf.monotonically_increasing_id()).explain(True){code}
{{}}
{{We should expect to only read partnerid from parquet dataset but we actually 
read the whole dataset:}}
{{}}
{code:java}
... == Physical Plan == Project [partnerid#6794, monotonically_increasing_id() 
AS index#24939L] +- FileScan parquet 
[impression_id#6550,arbitrage_id#6551,display_timestamp#6552L,requesttimestamputc#6553,affiliateid#6554,amp_adrequest_type#6555,app_id#6556,app_name#6557,appnexus_viewability#6558,apxpagevertical#6559,arbitrage_time#6560,banner_type#6561,display_type_int#6562,has_multiple_display_types#6563,bannerid#6564,bid_app_id_hash#6565,bid_url_domain_hash#6566,bidding_details#6567,bid_level_core#6568,bidrandomization_user_factor#6569,bidrandomization_user_mu#6570,bidrandomization_user_sigma#6571,big_lastrequesttimestampsession#6572,big_nbrequestaffiliatesession#6573,...
 566 more fields] ...{code}
{{}}


> Column pruning with monotonically_increasing_id
> ---
>
> Key: SPARK-33559
> URL: https://issues.apache.org/jira/browse/SPARK-33559
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Gaetan
>Priority: Minor
>
> {code:java}
> df = ss.read.parquet("/path/to/parquet/dataset") 
> df.select("partnerid").withColumn("index", 
> sf.monotonically_increasing_id()).explain(True){code}
>  {{We should expect to only read partnerid from parquet dataset but we 
> actually read the whole dataset:}}
> {code:java}
> ... == Physical Plan == Project [partnerid#6794, 
> monotonically_increasing_id() AS index#24939L] +- FileScan parquet 
> [impression_id#6550,arbitrage_id#6551,display_timestamp#6552L,requesttimestamputc#6553,affiliateid#6554,amp_adrequest_type#6555,app_id#6556,app_name#6557,appnexus_viewability#6558,apxpagevertical#6559,arbitrage_time#6560,banner_type#6561,display_type_int#6562,has_multiple_display_types#6563,bannerid#6564,bid_app_id_hash#6565,bid_url_domain_hash#6566,bidding_details#6567,bid_level_core#6568,bidrandomization_user_factor#6569,bidrandomization_user_mu#6570,bidrandomization_user_sigma#6571,big_lastrequesttimestampsession#6572,big_nbrequestaffiliatesession#6573,...
>  566 more fields] ...{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33559) Column pruning with monotonically_increasing_id

2020-11-25 Thread Gaetan (Jira)
Gaetan created SPARK-33559:
--

 Summary: Column pruning with monotonically_increasing_id
 Key: SPARK-33559
 URL: https://issues.apache.org/jira/browse/SPARK-33559
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.0.1
Reporter: Gaetan


{{}}{{}}
{code:java}
df = ss.read.parquet("/path/to/parquet/dataset") 
df.select("partnerid").withColumn("index", 
sf.monotonically_increasing_id()).explain(True){code}
{{}}
{{We should expect to only read partnerid from parquet dataset but we actually 
read the whole dataset:}}
{{}}
{code:java}
... == Physical Plan == Project [partnerid#6794, monotonically_increasing_id() 
AS index#24939L] +- FileScan parquet 
[impression_id#6550,arbitrage_id#6551,display_timestamp#6552L,requesttimestamputc#6553,affiliateid#6554,amp_adrequest_type#6555,app_id#6556,app_name#6557,appnexus_viewability#6558,apxpagevertical#6559,arbitrage_time#6560,banner_type#6561,display_type_int#6562,has_multiple_display_types#6563,bannerid#6564,bid_app_id_hash#6565,bid_url_domain_hash#6566,bidding_details#6567,bid_level_core#6568,bidrandomization_user_factor#6569,bidrandomization_user_mu#6570,bidrandomization_user_sigma#6571,big_lastrequesttimestampsession#6572,big_nbrequestaffiliatesession#6573,...
 566 more fields] ...{code}
{{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33519) Batch UDF in scala

2020-11-23 Thread Gaetan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaetan updated SPARK-33519:
---
Description: 
Hello,

Contrary to Python, there is only one type of Scala UDF, that let us define a 
Scala function to apply on a set of Column and which is called +for each row+. 
One advantage of Scala UDF over mapPartitions is that Catalyst is able to see 
what are the inputs which are then used for column pruning, predicate pushdown 
and other optimization rules. But in some use cases, there can be a setup phase 
that we only want to execute once per worker right before processing inputs. 
For such use cases, Scala UDF is not well suited and mapPartitions is used 
instead like this:

 
{code:java}
ds.mapPartitions(
  it => {
setup()
process(it)
  }
){code}
After having looked at the code, I figured that Python UDF are implemented via 
query plans that retrieve a RDD via their children and that call mapPartitions 
of that RDD to work with batches of inputs. These query plans are generated by 
Catalyst by extracting Python UDFs (rule ExtractPythonUDFs).

-

*Implementation details*:

1. we could implement a new Expression ScalaBatchUDF and add a boolean isBatch 
to Expression that tells whether an Expression is batch or not. SparkPlan 
SelectExec, FilterExec (and probably more) would be modified to handle batch 
Expression:
 * Generated code would include code that call batch Expression with batch of 
inputs instead of one single input.
 * doExecute() method will call batch Expression with batch of inputs instead 
of one single input.

A SparkPlan could be composed of "single" Expressions and batch expressions. It 
is a first idea that would need to be refined.

2. Another solution could also be to do as for Python UDFs: a batch UDF, 
implemented as Expression, is extracted from the query plan it belongs to and 
transformed into a query plan ScalaBatchUDFExec (which become child of the 
query plan that the batch UDF belongs to).

What do you think ?

-

Here is a very small description of *one of our use cases of Spark* that could 
greatly benefit from Scala batch UDFs:

We are using Spark to distribute some computation run in C#. To do so, we call 
the method mapPartitions of the DataFrame that represents our data. Inside 
mapPartitions, we:
 * First connect to the C# process
 * Then iterate over the inputs by sending each input to the C# process and by 
getting back the results.

The use of mapPartitions was motivated by the setup (connection to the C# 
process) that happens for each partition.

Now that we have a first working version, we would like to improve it by 
limiting the columns to read. We don't want to select columns that are required 
by our computation right before the mapPartitions because it would result in 
filtering out columns that could be required by other transformations in the 
workflow. Instead, we would like to take advantage of Catalyst for column 
pruning, predict pushdowns and other optimization rules. Using a Scala UDF to 
replace the mapPartitions would not be efficient because we would connect to 
the C# process for each row. An alternative would be a Scala "batch" UDF which 
would be applied on the columns that are needed for our computation, to take 
advantage of Catalyst and its optimizing rules, and which input would be an 
iterator like mapPartitions. 

  was:
Hello,

Contrary to Python, there is only one type of Scala UDF, that let us define a 
Scala function to apply on a set of Column and which is called +for each row+. 
One advantage of Scala UDF over mapPartitions is that Catalyst is able to see 
what are the inputs which are then used for column pruning, predicate pushdown 
and other optimization rules. But in some use cases, there can be a setup phase 
that we only want to execute once per worker right before processing inputs. 
For such use cases, Scala UDF is not well suited and mapPartitions is used 
instead like this:

 
{code:java}
ds.mapPartitions(
  it => {
setup()
process(it)
  }
){code}
After having looked at the code, I figured that Python UDF are implemented via 
query plans that retrieve a RDD via their children and that call mapPartitions 
of that RDD to work with batches of inputs. These query plans are generated by 
Catalyst by extracting Python UDFs (rule ExtractPythonUDFs).

*Implementation details*: we could implement a new Expression ScalaBatchUDF and 
add a boolean isBatch to Expression that tells whether an Expression is batch 
or not. SparkPlan SelectExec, FilterExec (and probably more) would be modified 
to handle batch Expression:
 * Generated code would include code that call batch Expression with batch of 
inputs instead of one single input.
 * doExecute() method will call batch Expression with batch of inputs instead 
of one single input.

A SparkPlan could be composed of "single" Expressions and batch 

[jira] [Updated] (SPARK-33519) Batch UDF in scala

2020-11-23 Thread Gaetan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaetan updated SPARK-33519:
---
Description: 
Hello,

Contrary to Python, there is only one type of Scala UDF, that let us define a 
Scala function to apply on a set of Column and which is called +for each row+. 
One advantage of Scala UDF over mapPartitions is that Catalyst is able to see 
what are the inputs which are then used for column pruning, predicate pushdown 
and other optimization rules. But in some use cases, there can be a setup phase 
that we only want to execute once per worker right before processing inputs. 
For such use cases, Scala UDF is not well suited and mapPartitions is used 
instead like this:

 
{code:java}
ds.mapPartitions(
  it => {
setup()
process(it)
  }
){code}
After having looked at the code, I figured that Python UDF are implemented via 
query plans that retrieve a RDD via their children and that call mapPartitions 
of that RDD to work with batches of inputs. These query plans are generated by 
Catalyst by extracting Python UDFs (rule ExtractPythonUDFs).

*Implementation details*: we could implement a new Expression ScalaBatchUDF and 
add a boolean isBatch to Expression that tells whether an Expression is batch 
or not. SparkPlan SelectExec, FilterExec (and probably more) would be modified 
to handle batch Expression:
 * Generated code would include code that call batch Expression with batch of 
inputs instead of one single input.
 * doExecute() method will call batch Expression with batch of inputs instead 
of one single input.

A SparkPlan could be composed of "single" Expressions and batch expressions. It 
is a first idea that would need to be refined. What do you think ?

Here is a very small description of *one of our use cases of Spark* that could 
greatly benefit from Scala batch UDFs:

We are using Spark to distribute some computation run in C#. To do so, we call 
the method mapPartitions of the DataFrame that represents our data. Inside 
mapPartitions, we:
 * First connect to the C# process
 * Then iterate over the inputs by sending each input to the C# process and by 
getting back the results.

The use of mapPartitions was motivated by the setup (connection to the C# 
process) that happens for each partition.

Now that we have a first working version, we would like to improve it by 
limiting the columns to read. We don't want to select columns that are required 
by our computation right before the mapPartitions because it would result in 
filtering out columns that could be required by other transformations in the 
workflow. Instead, we would like to take advantage of Catalyst for column 
pruning, predict pushdowns and other optimization rules. Using a Scala UDF to 
replace the mapPartitions would not be efficient because we would connect to 
the C# process for each row. An alternative would be a Scala "batch" UDF which 
would be applied on the columns that are needed for our computation, to take 
advantage of Catalyst and its optimizing rules, and which input would be an 
iterator like mapPartitions. 

  was:
Hello,

Contrary to Python, there is only one type of Scala UDF, that let us define a 
Scala function to apply on a set of Column and which is called +for each row+. 
One advantage of Scala UDF over mapPartitions is that Catalyst is able to see 
what are the inputs which are then used for column pruning, predicate pushdown 
and other optimization rules. But in some use cases, there can be a setup phase 
that we only want to execute once per worker right before processing inputs. 
For such use cases, Scala UDF is not well suited and mapPartitions is used 
instead like this:

 
{code:java}
ds.mapPartitions(
  it => {
setup()
process(it)
  }
){code}
After having looked at the code, I figured that Python UDF are implemented via 
query plans that retrieve a RDD via their children and that call mapPartitions 
of that RDD to work with batches of inputs. These query plans are generated by 
Catalyst by extracting Python UDFs (rule ExtractPythonUDFs).

 

Like for Python UDFs, we could implement Scala batch UDFs with query plans to 
work with a batch of inputs instead of one input. What do you think ?

Here is a very small description of one of our use cases of Spark that could 
greatly benefit from Scala batch UDFs:

We are using Spark to distribute some computation run in C#. To do so, we call 
the method mapPartitions of the DataFrame that represents our data. Inside 
mapPartitions, we:
 * First connect to the C# process
 * Then iterate over the inputs by sending each input to the C# process and by 
getting back the results.

The use of mapPartitions was motivated by the setup (connection to the C# 
process) that happens for each partition.

Now that we have a first working version, we would like to improve it by 
limiting the columns to read. We don't want to select columns that are required 

[jira] [Updated] (SPARK-33519) Batch UDF in scala

2020-11-23 Thread Gaetan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaetan updated SPARK-33519:
---
Issue Type: New Feature  (was: Wish)

> Batch UDF in scala
> --
>
> Key: SPARK-33519
> URL: https://issues.apache.org/jira/browse/SPARK-33519
> Project: Spark
>  Issue Type: New Feature
>  Components: Optimizer, Spark Core
>Affects Versions: 3.0.0, 3.0.1
>Reporter: Gaetan
>Priority: Major
>
> Hello,
> Contrary to Python, there is only one type of Scala UDF, that let us define a 
> Scala function to apply on a set of Column and which is called +for each 
> row+. One advantage of Scala UDF over mapPartitions is that Catalyst is able 
> to see what are the inputs which are then used for column pruning, predicate 
> pushdown and other optimization rules. But in some use cases, there can be a 
> setup phase that we only want to execute once per worker right before 
> processing inputs. For such use cases, Scala UDF is not well suited and 
> mapPartitions is used instead like this:
>  
> {code:java}
> ds.mapPartitions(
>   it => {
> setup()
> process(it)
>   }
> ){code}
> After having looked at the code, I figured that Python UDF are implemented 
> via query plans that retrieve a RDD via their children and that call 
> mapPartitions of that RDD to work with batches of inputs. These query plans 
> are generated by Catalyst by extracting Python UDFs (rule ExtractPythonUDFs).
>  
> Like for Python UDFs, we could implement Scala batch UDFs with query plans to 
> work with a batch of inputs instead of one input. What do you think ?
> Here is a very small description of one of our use cases of Spark that could 
> greatly benefit from Scala batch UDFs:
> We are using Spark to distribute some computation run in C#. To do so, we 
> call the method mapPartitions of the DataFrame that represents our data. 
> Inside mapPartitions, we:
>  * First connect to the C# process
>  * Then iterate over the inputs by sending each input to the C# process and 
> by getting back the results.
> The use of mapPartitions was motivated by the setup (connection to the C# 
> process) that happens for each partition.
> Now that we have a first working version, we would like to improve it by 
> limiting the columns to read. We don't want to select columns that are 
> required by our computation right before the mapPartitions because it would 
> result in filtering out columns that could be required by other 
> transformations in the workflow. Instead, we would like to take advantage of 
> Catalyst for column pruning, predict pushdowns and other optimization rules. 
> Using a Scala UDF to replace the mapPartitions would not be efficient because 
> we would connect to the C# process for each row. An alternative would be a 
> Scala "batch" UDF which would be applied on the columns that are needed for 
> our computation, to take advantage of Catalyst and its optimizing rules, and 
> which input would be an iterator like mapPartitions. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33519) Batch UDF in scala

2020-11-23 Thread Gaetan (Jira)
Gaetan created SPARK-33519:
--

 Summary: Batch UDF in scala
 Key: SPARK-33519
 URL: https://issues.apache.org/jira/browse/SPARK-33519
 Project: Spark
  Issue Type: Wish
  Components: Optimizer, Spark Core
Affects Versions: 3.0.1, 3.0.0
Reporter: Gaetan


Hello,

Contrary to Python, there is only one type of Scala UDF, that let us define a 
Scala function to apply on a set of Column and which is called +for each row+. 
One advantage of Scala UDF over mapPartitions is that Catalyst is able to see 
what are the inputs which are then used for column pruning, predicate pushdown 
and other optimization rules. But in some use cases, there can be a setup phase 
that we only want to execute once per worker right before processing inputs. 
For such use cases, Scala UDF is not well suited and mapPartitions is used 
instead like this:

 
{code:java}
ds.mapPartitions(
  it => {
setup()
process(it)
  }
){code}
After having looked at the code, I figured that Python UDF are implemented via 
query plans that retrieve a RDD via their children and that call mapPartitions 
of that RDD to work with batches of inputs. These query plans are generated by 
Catalyst by extracting Python UDFs (rule ExtractPythonUDFs).

 

Like for Python UDFs, we could implement Scala batch UDFs with query plans to 
work with a batch of inputs instead of one input. What do you think ?

Here is a very small description of one of our use cases of Spark that could 
greatly benefit from Scala batch UDFs:

We are using Spark to distribute some computation run in C#. To do so, we call 
the method mapPartitions of the DataFrame that represents our data. Inside 
mapPartitions, we:
 * First connect to the C# process
 * Then iterate over the inputs by sending each input to the C# process and by 
getting back the results.

The use of mapPartitions was motivated by the setup (connection to the C# 
process) that happens for each partition.

Now that we have a first working version, we would like to improve it by 
limiting the columns to read. We don't want to select columns that are required 
by our computation right before the mapPartitions because it would result in 
filtering out columns that could be required by other transformations in the 
workflow. Instead, we would like to take advantage of Catalyst for column 
pruning, predict pushdowns and other optimization rules. Using a Scala UDF to 
replace the mapPartitions would not be efficient because we would connect to 
the C# process for each row. An alternative would be a Scala "batch" UDF which 
would be applied on the columns that are needed for our computation, to take 
advantage of Catalyst and its optimizing rules, and which input would be an 
iterator like mapPartitions. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14043) Remove restriction on maxDepth for decision trees

2018-12-21 Thread Gaetan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-14043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16726544#comment-16726544
 ] 

Gaetan commented on SPARK-14043:


In my opinion this is a mistake to not invest in machine learning improvements. 
Random forest is a very efficient algorithm for statisticians, often more than 
some logistic or polynomial regressions. This limit of depth is inconvenient 
from 1 billion of observations, so that we are forced to use another 
technology, beacause the error rate is too important.

> Remove restriction on maxDepth for decision trees
> -
>
> Key: SPARK-14043
> URL: https://issues.apache.org/jira/browse/SPARK-14043
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Reporter: Joseph K. Bradley
>Priority: Minor
>
> We currently restrict decision trees (DecisionTree, GBT, RandomForest) to be 
> of maxDepth <= 30.  We should remove this restriction to support deep 
> (imbalanced) trees.
> Trees store an index for each node, where each index corresponds to a unique 
> position in a binary tree.  (I.e., the first index of row 0 is 1, the first 
> of row 1 is 2, the first of row 2 is 4, etc., IIRC)
> With some careful thought, we could probably avoid using indices altogether.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org