[ 
https://issues.apache.org/jira/browse/SPARK-19091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-19091:
-------------------------------
    Description: 
The Catalyst optimizer uses LogicalRDD to represent scans from existing RDDs. 
In general, it's hard to produce size estimates for arbitrary RDDs, which is 
why the current implementation simply estimates these relations sizes using the 
default size (see the TODO at (for Datasets created via 
spark.createDataFrame()))

In the special case where data has been parallelized with {{sc.parallelize()}} 
we'll wind up with a ParallelCollectionRDD whose number of elements is known. 
When we construct a LogicalRDD plan node in {{SparkSession.createDataFrame()}} 
we'll probably be using an RDD which is a one-to-one transformation of a 
parallel collection RDD (e.g. mapping an encoder to convert case classes to 
internal rows). Thus we can have LogicalRDD's statistics method pattern-match 
on cases where we have a MappedPartitionsRDD stacked immediately on top of a 
ParallelCollectionRDD, then use walk up the RDD parent chain to determine the 
number of elements and we can combine this with the schema and a conservative 
"fudge factor" to produce an over-estimate of the LogicalRDD's size which will 
be more accurate than the default size.

I believe that this will help us to avoid falling into pathologically bad plans 
when joining tiny parallelize()d data sets against huge tables and have one of 
my own production use-cases which would have benefitted directly from such an 
optimization. 

  was:
The Catalyst optimizer uses LogicalRDD to represent scans from existing RDDs. 
In general, it's hard to produce size estimates for arbitrary RDDs, which is 
why the current implementation simply estimates these relations size using the 
default size (see the TODO at (for Datasets created via 
spark.createDataFrame()))

In the special case where data has been parallelized with {{sc.parallelize()}} 
we'll wind up with a ParallelCollectionRDD whose number of elements is known. 
When we construct a LogicalRDD plan node in {{SparkSession.createDataFrame()}} 
we'll probably be using an RDD which is a one-to-one transformation of a 
parallel collection RDD (e.g. mapping an encoder to convert case classes to 
internal rows). Thus we can have LogicalRDD's statistics method pattern-match 
on cases where we have a MappedPartitionsRDD stacked immediately on top of a 
ParallelCollectionRDD, then use walk up the RDD parent chain to determine the 
number of elements and we can combine this with the schema and a conservative 
"fudge factor" to produce an over-estimate of the LogicalRDD's size which will 
be more accurate than the default size.

I believe that this will help us to avoid falling into pathologically bad plans 
when joining tiny parallelize()d data sets against huge tables and have one of 
my own production use-cases which would have benefitted directly from such an 
optimization. 


> Implement more accurate statistics for LogicalRDD when child is a mapped 
> ParallelCollectionRDD 
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19091
>                 URL: https://issues.apache.org/jira/browse/SPARK-19091
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Josh Rosen
>
> The Catalyst optimizer uses LogicalRDD to represent scans from existing RDDs. 
> In general, it's hard to produce size estimates for arbitrary RDDs, which is 
> why the current implementation simply estimates these relations sizes using 
> the default size (see the TODO at (for Datasets created via 
> spark.createDataFrame()))
> In the special case where data has been parallelized with 
> {{sc.parallelize()}} we'll wind up with a ParallelCollectionRDD whose number 
> of elements is known. When we construct a LogicalRDD plan node in 
> {{SparkSession.createDataFrame()}} we'll probably be using an RDD which is a 
> one-to-one transformation of a parallel collection RDD (e.g. mapping an 
> encoder to convert case classes to internal rows). Thus we can have 
> LogicalRDD's statistics method pattern-match on cases where we have a 
> MappedPartitionsRDD stacked immediately on top of a ParallelCollectionRDD, 
> then use walk up the RDD parent chain to determine the number of elements and 
> we can combine this with the schema and a conservative "fudge factor" to 
> produce an over-estimate of the LogicalRDD's size which will be more accurate 
> than the default size.
> I believe that this will help us to avoid falling into pathologically bad 
> plans when joining tiny parallelize()d data sets against huge tables and have 
> one of my own production use-cases which would have benefitted directly from 
> such an optimization. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to