[jira] [Updated] (SPARK-33559) Column pruning with monotonically_increasing_id
[ https://issues.apache.org/jira/browse/SPARK-33559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaetan updated SPARK-33559: --- Affects Version/s: (was: 3.0.1) 2.4.0 > Column pruning with monotonically_increasing_id > --- > > Key: SPARK-33559 > URL: https://issues.apache.org/jira/browse/SPARK-33559 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Gaetan >Priority: Minor > > {code:java} > df = ss.read.parquet("/path/to/parquet/dataset") > df.select("partnerid").withColumn("index", > sf.monotonically_increasing_id()).explain(True){code} > {{We should expect to only read partnerid from parquet dataset but we > actually read the whole dataset:}} > {code:java} > ... == Physical Plan == Project [partnerid#6794, > monotonically_increasing_id() AS index#24939L] +- FileScan parquet > [impression_id#6550,arbitrage_id#6551,display_timestamp#6552L,requesttimestamputc#6553,affiliateid#6554,amp_adrequest_type#6555,app_id#6556,app_name#6557,appnexus_viewability#6558,apxpagevertical#6559,arbitrage_time#6560,banner_type#6561,display_type_int#6562,has_multiple_display_types#6563,bannerid#6564,bid_app_id_hash#6565,bid_url_domain_hash#6566,bidding_details#6567,bid_level_core#6568,bidrandomization_user_factor#6569,bidrandomization_user_mu#6570,bidrandomization_user_sigma#6571,big_lastrequesttimestampsession#6572,big_nbrequestaffiliatesession#6573,... > 566 more fields] ...{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33559) Column pruning with monotonically_increasing_id
[ https://issues.apache.org/jira/browse/SPARK-33559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaetan updated SPARK-33559: --- Description: {code:java} df = ss.read.parquet("/path/to/parquet/dataset") df.select("partnerid").withColumn("index", sf.monotonically_increasing_id()).explain(True){code} {{We should expect to only read partnerid from parquet dataset but we actually read the whole dataset:}} {code:java} ... == Physical Plan == Project [partnerid#6794, monotonically_increasing_id() AS index#24939L] +- FileScan parquet [impression_id#6550,arbitrage_id#6551,display_timestamp#6552L,requesttimestamputc#6553,affiliateid#6554,amp_adrequest_type#6555,app_id#6556,app_name#6557,appnexus_viewability#6558,apxpagevertical#6559,arbitrage_time#6560,banner_type#6561,display_type_int#6562,has_multiple_display_types#6563,bannerid#6564,bid_app_id_hash#6565,bid_url_domain_hash#6566,bidding_details#6567,bid_level_core#6568,bidrandomization_user_factor#6569,bidrandomization_user_mu#6570,bidrandomization_user_sigma#6571,big_lastrequesttimestampsession#6572,big_nbrequestaffiliatesession#6573,... 566 more fields] ...{code} was: {{}}{{}} {code:java} df = ss.read.parquet("/path/to/parquet/dataset") df.select("partnerid").withColumn("index", sf.monotonically_increasing_id()).explain(True){code} {{}} {{We should expect to only read partnerid from parquet dataset but we actually read the whole dataset:}} {{}} {code:java} ... == Physical Plan == Project [partnerid#6794, monotonically_increasing_id() AS index#24939L] +- FileScan parquet [impression_id#6550,arbitrage_id#6551,display_timestamp#6552L,requesttimestamputc#6553,affiliateid#6554,amp_adrequest_type#6555,app_id#6556,app_name#6557,appnexus_viewability#6558,apxpagevertical#6559,arbitrage_time#6560,banner_type#6561,display_type_int#6562,has_multiple_display_types#6563,bannerid#6564,bid_app_id_hash#6565,bid_url_domain_hash#6566,bidding_details#6567,bid_level_core#6568,bidrandomization_user_factor#6569,bidrandomization_user_mu#6570,bidrandomization_user_sigma#6571,big_lastrequesttimestampsession#6572,big_nbrequestaffiliatesession#6573,... 566 more fields] ...{code} {{}} > Column pruning with monotonically_increasing_id > --- > > Key: SPARK-33559 > URL: https://issues.apache.org/jira/browse/SPARK-33559 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Gaetan >Priority: Minor > > {code:java} > df = ss.read.parquet("/path/to/parquet/dataset") > df.select("partnerid").withColumn("index", > sf.monotonically_increasing_id()).explain(True){code} > {{We should expect to only read partnerid from parquet dataset but we > actually read the whole dataset:}} > {code:java} > ... == Physical Plan == Project [partnerid#6794, > monotonically_increasing_id() AS index#24939L] +- FileScan parquet > [impression_id#6550,arbitrage_id#6551,display_timestamp#6552L,requesttimestamputc#6553,affiliateid#6554,amp_adrequest_type#6555,app_id#6556,app_name#6557,appnexus_viewability#6558,apxpagevertical#6559,arbitrage_time#6560,banner_type#6561,display_type_int#6562,has_multiple_display_types#6563,bannerid#6564,bid_app_id_hash#6565,bid_url_domain_hash#6566,bidding_details#6567,bid_level_core#6568,bidrandomization_user_factor#6569,bidrandomization_user_mu#6570,bidrandomization_user_sigma#6571,big_lastrequesttimestampsession#6572,big_nbrequestaffiliatesession#6573,... > 566 more fields] ...{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33559) Column pruning with monotonically_increasing_id
Gaetan created SPARK-33559: -- Summary: Column pruning with monotonically_increasing_id Key: SPARK-33559 URL: https://issues.apache.org/jira/browse/SPARK-33559 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.0.1 Reporter: Gaetan {{}}{{}} {code:java} df = ss.read.parquet("/path/to/parquet/dataset") df.select("partnerid").withColumn("index", sf.monotonically_increasing_id()).explain(True){code} {{}} {{We should expect to only read partnerid from parquet dataset but we actually read the whole dataset:}} {{}} {code:java} ... == Physical Plan == Project [partnerid#6794, monotonically_increasing_id() AS index#24939L] +- FileScan parquet [impression_id#6550,arbitrage_id#6551,display_timestamp#6552L,requesttimestamputc#6553,affiliateid#6554,amp_adrequest_type#6555,app_id#6556,app_name#6557,appnexus_viewability#6558,apxpagevertical#6559,arbitrage_time#6560,banner_type#6561,display_type_int#6562,has_multiple_display_types#6563,bannerid#6564,bid_app_id_hash#6565,bid_url_domain_hash#6566,bidding_details#6567,bid_level_core#6568,bidrandomization_user_factor#6569,bidrandomization_user_mu#6570,bidrandomization_user_sigma#6571,big_lastrequesttimestampsession#6572,big_nbrequestaffiliatesession#6573,... 566 more fields] ...{code} {{}} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33519) Batch UDF in scala
[ https://issues.apache.org/jira/browse/SPARK-33519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaetan updated SPARK-33519: --- Description: Hello, Contrary to Python, there is only one type of Scala UDF, that let us define a Scala function to apply on a set of Column and which is called +for each row+. One advantage of Scala UDF over mapPartitions is that Catalyst is able to see what are the inputs which are then used for column pruning, predicate pushdown and other optimization rules. But in some use cases, there can be a setup phase that we only want to execute once per worker right before processing inputs. For such use cases, Scala UDF is not well suited and mapPartitions is used instead like this: {code:java} ds.mapPartitions( it => { setup() process(it) } ){code} After having looked at the code, I figured that Python UDF are implemented via query plans that retrieve a RDD via their children and that call mapPartitions of that RDD to work with batches of inputs. These query plans are generated by Catalyst by extracting Python UDFs (rule ExtractPythonUDFs). - *Implementation details*: 1. we could implement a new Expression ScalaBatchUDF and add a boolean isBatch to Expression that tells whether an Expression is batch or not. SparkPlan SelectExec, FilterExec (and probably more) would be modified to handle batch Expression: * Generated code would include code that call batch Expression with batch of inputs instead of one single input. * doExecute() method will call batch Expression with batch of inputs instead of one single input. A SparkPlan could be composed of "single" Expressions and batch expressions. It is a first idea that would need to be refined. 2. Another solution could also be to do as for Python UDFs: a batch UDF, implemented as Expression, is extracted from the query plan it belongs to and transformed into a query plan ScalaBatchUDFExec (which become child of the query plan that the batch UDF belongs to). What do you think ? - Here is a very small description of *one of our use cases of Spark* that could greatly benefit from Scala batch UDFs: We are using Spark to distribute some computation run in C#. To do so, we call the method mapPartitions of the DataFrame that represents our data. Inside mapPartitions, we: * First connect to the C# process * Then iterate over the inputs by sending each input to the C# process and by getting back the results. The use of mapPartitions was motivated by the setup (connection to the C# process) that happens for each partition. Now that we have a first working version, we would like to improve it by limiting the columns to read. We don't want to select columns that are required by our computation right before the mapPartitions because it would result in filtering out columns that could be required by other transformations in the workflow. Instead, we would like to take advantage of Catalyst for column pruning, predict pushdowns and other optimization rules. Using a Scala UDF to replace the mapPartitions would not be efficient because we would connect to the C# process for each row. An alternative would be a Scala "batch" UDF which would be applied on the columns that are needed for our computation, to take advantage of Catalyst and its optimizing rules, and which input would be an iterator like mapPartitions. was: Hello, Contrary to Python, there is only one type of Scala UDF, that let us define a Scala function to apply on a set of Column and which is called +for each row+. One advantage of Scala UDF over mapPartitions is that Catalyst is able to see what are the inputs which are then used for column pruning, predicate pushdown and other optimization rules. But in some use cases, there can be a setup phase that we only want to execute once per worker right before processing inputs. For such use cases, Scala UDF is not well suited and mapPartitions is used instead like this: {code:java} ds.mapPartitions( it => { setup() process(it) } ){code} After having looked at the code, I figured that Python UDF are implemented via query plans that retrieve a RDD via their children and that call mapPartitions of that RDD to work with batches of inputs. These query plans are generated by Catalyst by extracting Python UDFs (rule ExtractPythonUDFs). *Implementation details*: we could implement a new Expression ScalaBatchUDF and add a boolean isBatch to Expression that tells whether an Expression is batch or not. SparkPlan SelectExec, FilterExec (and probably more) would be modified to handle batch Expression: * Generated code would include code that call batch Expression with batch of inputs instead of one single input. * doExecute() method will call batch Expression with batch of inputs instead of one single input. A SparkPlan could be composed of "single" Expressions and batch
[jira] [Updated] (SPARK-33519) Batch UDF in scala
[ https://issues.apache.org/jira/browse/SPARK-33519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaetan updated SPARK-33519: --- Description: Hello, Contrary to Python, there is only one type of Scala UDF, that let us define a Scala function to apply on a set of Column and which is called +for each row+. One advantage of Scala UDF over mapPartitions is that Catalyst is able to see what are the inputs which are then used for column pruning, predicate pushdown and other optimization rules. But in some use cases, there can be a setup phase that we only want to execute once per worker right before processing inputs. For such use cases, Scala UDF is not well suited and mapPartitions is used instead like this: {code:java} ds.mapPartitions( it => { setup() process(it) } ){code} After having looked at the code, I figured that Python UDF are implemented via query plans that retrieve a RDD via their children and that call mapPartitions of that RDD to work with batches of inputs. These query plans are generated by Catalyst by extracting Python UDFs (rule ExtractPythonUDFs). *Implementation details*: we could implement a new Expression ScalaBatchUDF and add a boolean isBatch to Expression that tells whether an Expression is batch or not. SparkPlan SelectExec, FilterExec (and probably more) would be modified to handle batch Expression: * Generated code would include code that call batch Expression with batch of inputs instead of one single input. * doExecute() method will call batch Expression with batch of inputs instead of one single input. A SparkPlan could be composed of "single" Expressions and batch expressions. It is a first idea that would need to be refined. What do you think ? Here is a very small description of *one of our use cases of Spark* that could greatly benefit from Scala batch UDFs: We are using Spark to distribute some computation run in C#. To do so, we call the method mapPartitions of the DataFrame that represents our data. Inside mapPartitions, we: * First connect to the C# process * Then iterate over the inputs by sending each input to the C# process and by getting back the results. The use of mapPartitions was motivated by the setup (connection to the C# process) that happens for each partition. Now that we have a first working version, we would like to improve it by limiting the columns to read. We don't want to select columns that are required by our computation right before the mapPartitions because it would result in filtering out columns that could be required by other transformations in the workflow. Instead, we would like to take advantage of Catalyst for column pruning, predict pushdowns and other optimization rules. Using a Scala UDF to replace the mapPartitions would not be efficient because we would connect to the C# process for each row. An alternative would be a Scala "batch" UDF which would be applied on the columns that are needed for our computation, to take advantage of Catalyst and its optimizing rules, and which input would be an iterator like mapPartitions. was: Hello, Contrary to Python, there is only one type of Scala UDF, that let us define a Scala function to apply on a set of Column and which is called +for each row+. One advantage of Scala UDF over mapPartitions is that Catalyst is able to see what are the inputs which are then used for column pruning, predicate pushdown and other optimization rules. But in some use cases, there can be a setup phase that we only want to execute once per worker right before processing inputs. For such use cases, Scala UDF is not well suited and mapPartitions is used instead like this: {code:java} ds.mapPartitions( it => { setup() process(it) } ){code} After having looked at the code, I figured that Python UDF are implemented via query plans that retrieve a RDD via their children and that call mapPartitions of that RDD to work with batches of inputs. These query plans are generated by Catalyst by extracting Python UDFs (rule ExtractPythonUDFs). Like for Python UDFs, we could implement Scala batch UDFs with query plans to work with a batch of inputs instead of one input. What do you think ? Here is a very small description of one of our use cases of Spark that could greatly benefit from Scala batch UDFs: We are using Spark to distribute some computation run in C#. To do so, we call the method mapPartitions of the DataFrame that represents our data. Inside mapPartitions, we: * First connect to the C# process * Then iterate over the inputs by sending each input to the C# process and by getting back the results. The use of mapPartitions was motivated by the setup (connection to the C# process) that happens for each partition. Now that we have a first working version, we would like to improve it by limiting the columns to read. We don't want to select columns that are required
[jira] [Updated] (SPARK-33519) Batch UDF in scala
[ https://issues.apache.org/jira/browse/SPARK-33519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaetan updated SPARK-33519: --- Issue Type: New Feature (was: Wish) > Batch UDF in scala > -- > > Key: SPARK-33519 > URL: https://issues.apache.org/jira/browse/SPARK-33519 > Project: Spark > Issue Type: New Feature > Components: Optimizer, Spark Core >Affects Versions: 3.0.0, 3.0.1 >Reporter: Gaetan >Priority: Major > > Hello, > Contrary to Python, there is only one type of Scala UDF, that let us define a > Scala function to apply on a set of Column and which is called +for each > row+. One advantage of Scala UDF over mapPartitions is that Catalyst is able > to see what are the inputs which are then used for column pruning, predicate > pushdown and other optimization rules. But in some use cases, there can be a > setup phase that we only want to execute once per worker right before > processing inputs. For such use cases, Scala UDF is not well suited and > mapPartitions is used instead like this: > > {code:java} > ds.mapPartitions( > it => { > setup() > process(it) > } > ){code} > After having looked at the code, I figured that Python UDF are implemented > via query plans that retrieve a RDD via their children and that call > mapPartitions of that RDD to work with batches of inputs. These query plans > are generated by Catalyst by extracting Python UDFs (rule ExtractPythonUDFs). > > Like for Python UDFs, we could implement Scala batch UDFs with query plans to > work with a batch of inputs instead of one input. What do you think ? > Here is a very small description of one of our use cases of Spark that could > greatly benefit from Scala batch UDFs: > We are using Spark to distribute some computation run in C#. To do so, we > call the method mapPartitions of the DataFrame that represents our data. > Inside mapPartitions, we: > * First connect to the C# process > * Then iterate over the inputs by sending each input to the C# process and > by getting back the results. > The use of mapPartitions was motivated by the setup (connection to the C# > process) that happens for each partition. > Now that we have a first working version, we would like to improve it by > limiting the columns to read. We don't want to select columns that are > required by our computation right before the mapPartitions because it would > result in filtering out columns that could be required by other > transformations in the workflow. Instead, we would like to take advantage of > Catalyst for column pruning, predict pushdowns and other optimization rules. > Using a Scala UDF to replace the mapPartitions would not be efficient because > we would connect to the C# process for each row. An alternative would be a > Scala "batch" UDF which would be applied on the columns that are needed for > our computation, to take advantage of Catalyst and its optimizing rules, and > which input would be an iterator like mapPartitions. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33519) Batch UDF in scala
Gaetan created SPARK-33519: -- Summary: Batch UDF in scala Key: SPARK-33519 URL: https://issues.apache.org/jira/browse/SPARK-33519 Project: Spark Issue Type: Wish Components: Optimizer, Spark Core Affects Versions: 3.0.1, 3.0.0 Reporter: Gaetan Hello, Contrary to Python, there is only one type of Scala UDF, that let us define a Scala function to apply on a set of Column and which is called +for each row+. One advantage of Scala UDF over mapPartitions is that Catalyst is able to see what are the inputs which are then used for column pruning, predicate pushdown and other optimization rules. But in some use cases, there can be a setup phase that we only want to execute once per worker right before processing inputs. For such use cases, Scala UDF is not well suited and mapPartitions is used instead like this: {code:java} ds.mapPartitions( it => { setup() process(it) } ){code} After having looked at the code, I figured that Python UDF are implemented via query plans that retrieve a RDD via their children and that call mapPartitions of that RDD to work with batches of inputs. These query plans are generated by Catalyst by extracting Python UDFs (rule ExtractPythonUDFs). Like for Python UDFs, we could implement Scala batch UDFs with query plans to work with a batch of inputs instead of one input. What do you think ? Here is a very small description of one of our use cases of Spark that could greatly benefit from Scala batch UDFs: We are using Spark to distribute some computation run in C#. To do so, we call the method mapPartitions of the DataFrame that represents our data. Inside mapPartitions, we: * First connect to the C# process * Then iterate over the inputs by sending each input to the C# process and by getting back the results. The use of mapPartitions was motivated by the setup (connection to the C# process) that happens for each partition. Now that we have a first working version, we would like to improve it by limiting the columns to read. We don't want to select columns that are required by our computation right before the mapPartitions because it would result in filtering out columns that could be required by other transformations in the workflow. Instead, we would like to take advantage of Catalyst for column pruning, predict pushdowns and other optimization rules. Using a Scala UDF to replace the mapPartitions would not be efficient because we would connect to the C# process for each row. An alternative would be a Scala "batch" UDF which would be applied on the columns that are needed for our computation, to take advantage of Catalyst and its optimizing rules, and which input would be an iterator like mapPartitions. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14043) Remove restriction on maxDepth for decision trees
[ https://issues.apache.org/jira/browse/SPARK-14043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16726544#comment-16726544 ] Gaetan commented on SPARK-14043: In my opinion this is a mistake to not invest in machine learning improvements. Random forest is a very efficient algorithm for statisticians, often more than some logistic or polynomial regressions. This limit of depth is inconvenient from 1 billion of observations, so that we are forced to use another technology, beacause the error rate is too important. > Remove restriction on maxDepth for decision trees > - > > Key: SPARK-14043 > URL: https://issues.apache.org/jira/browse/SPARK-14043 > Project: Spark > Issue Type: Improvement > Components: ML >Reporter: Joseph K. Bradley >Priority: Minor > > We currently restrict decision trees (DecisionTree, GBT, RandomForest) to be > of maxDepth <= 30. We should remove this restriction to support deep > (imbalanced) trees. > Trees store an index for each node, where each index corresponds to a unique > position in a binary tree. (I.e., the first index of row 0 is 1, the first > of row 1 is 2, the first of row 2 is 4, etc., IIRC) > With some careful thought, we could probably avoid using indices altogether. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org