Takeshi Yamamuro created SPARK-25196:
----------------------------------------

             Summary: Analyze column statistics in cached query
                 Key: SPARK-25196
                 URL: https://issues.apache.org/jira/browse/SPARK-25196
             Project: Spark
          Issue Type: New Feature
          Components: SQL
    Affects Versions: 2.3.1
            Reporter: Takeshi Yamamuro


In common usecases, users read catalog table data, join/aggregate them, and 
then cache the result for following reuse. Since we are only allowed to analyze 
column statistics in catalog tables via ANALYZE commands, the optimization 
depends on non-existing or inaccurate column statistics of cached data. So, I 
think it'd be nice if Spark could analyze cached data and hold temporary column 
statistics for InMemoryRelation.

For example, we might be able to add a new API (e.g., analyzeColumnCacheQuery) 
to do so in CacheManager;
 POC: 
[https://github.com/apache/spark/compare/master...maropu:AnalyzeCacheQuery]
{code:java}
scala> sql("SET spark.sql.cbo.enabled=true")
scala> sql("SET spark.sql.statistics.histogram.enabled=true")
scala> spark.range(1000).selectExpr("id % 33 AS c0", "rand() AS c1", "0 AS 
c2").write.saveAsTable("t")
scala> sql("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS c0, c1, c2")
scala> val cacheManager = spark.sharedState.cacheManager
scala> def printColumnStats(data: org.apache.spark.sql.DataFrame) = {
     |   data.queryExecution.optimizedPlan.stats.attributeStats.foreach {
     |     case (k, v) => println(s"[$k]: $v")
     |   }
     | }
scala> def df() = spark.table("t").groupBy("c0").agg(count("c1").as("v1"), 
sum("c2").as("v2"))

// Prints column statistics in catalog table `t`
scala> printColumnStats(spark.table("t"))
[c0#7073L]: 
ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@209c0be5)))
[c1#7074]: 
ColumnStat(Some(997),Some(5.958619423369615E-4),Some(0.9988009488973438),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@4ef69c53)))
[c2#7075]: 
ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(4),Some(4),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@7cbaf548)))

// Prints column statistics on query result `df`
scala> printColumnStats(df())
[c0#7073L]: 
ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@209c0be5)))

// Prints column statistics on cached data of `df`
scala> printColumnStats(df().cache)
<No Column Statistics>

// A new API described above
scala> cacheManager.analyzeColumnCacheQuery(df(), "v1" :: "v2" :: Nil)
                                                                                
// Then, prints again
scala> printColumnStats(df())
[v1#7101L]: 
ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@e2ff893)))
[v2#7103L]: 
ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@1498a4d)))

scala> cacheManager.analyzeColumnCacheQuery(df(), "c0" :: Nil)
scala> printColumnStats(df())
[v1#7101L]: 
ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@e2ff893)))
[v2#7103L]: 
ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@1498a4d)))
[c0#7073L]: 
ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@626bcfc8)))
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to