[ 
https://issues.apache.org/jira/browse/ARROW-17325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Grove updated ARROW-17325:
-------------------------------
    Description: 
In QueryStageExec.computeStats we copy partial statistics from materlized query 
stages by calling QueryStageExec#getRuntimeStatistics, which in turn calls 
ShuffleExchangeLike#runtimeStatistics or 
BroadcastExchangeLike#runtimeStatistics.

Only dataSize and numOutputRows are copied into the new Statistics object:

 {code:scala}
  def computeStats(): Option[Statistics] = if (isMaterialized) {
    val runtimeStats = getRuntimeStatistics
    val dataSize = runtimeStats.sizeInBytes.max(0)
    val numOutputRows = runtimeStats.rowCount.map(_.max(0))
    Some(Statistics(dataSize, numOutputRows, isRuntime = true))
  } else {
    None
  }
{code}

I would like to also copy over the column statistics stored in 
Statistics.attributeMap so that they can be fed back into the logical plan 
optimization phase. This is a small change as shown below:

{code:scala}
  def computeStats(): Option[Statistics] = if (isMaterialized) {
    val runtimeStats = getRuntimeStatistics
    val dataSize = runtimeStats.sizeInBytes.max(0)
    val numOutputRows = runtimeStats.rowCount.map(_.max(0))
    val attributeStats = runtimeStats.attributeStats
    Some(Statistics(dataSize, numOutputRows, attributeStats, isRuntime = true))
  } else {
    None
  }
{code}

The Spark implementations of ShuffleExchangeLike and BroadcastExchangeLike do 
not currently provide such column statistics, but other custom implementations 
can.

  was:
In QueryStageExec.computeStats we copy partial statistics from materlized query 
stages by calling QueryStageExec#getRuntimeStatistics, which in turn calls 
ShuffleExchangeLike#runtimeStatistics or 
BroadcastExchangeLike#runtimeStatistics.

 

Only dataSize and numOutputRows are copied into the new Statistics object:

 {code:scala}
  def computeStats(): Option[Statistics] = if (isMaterialized) {
    val runtimeStats = getRuntimeStatistics
    val dataSize = runtimeStats.sizeInBytes.max(0)
    val numOutputRows = runtimeStats.rowCount.map(_.max(0))
    Some(Statistics(dataSize, numOutputRows, isRuntime = true))
  } else {
    None
  }
{code}

I would like to also copy over the column statistics stored in 
Statistics.attributeMap so that they can be fed back into the logical plan 
optimization phase.

The Spark implementations of ShuffleExchangeLike and BroadcastExchangeLike do 
not currently provide such column statistics but other custom implementations 
can.


> AQE should use available column statistics from completed query stages
> ----------------------------------------------------------------------
>
>                 Key: ARROW-17325
>                 URL: https://issues.apache.org/jira/browse/ARROW-17325
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Andy Grove
>            Priority: Major
>
> In QueryStageExec.computeStats we copy partial statistics from materlized 
> query stages by calling QueryStageExec#getRuntimeStatistics, which in turn 
> calls ShuffleExchangeLike#runtimeStatistics or 
> BroadcastExchangeLike#runtimeStatistics.
> Only dataSize and numOutputRows are copied into the new Statistics object:
>  {code:scala}
>   def computeStats(): Option[Statistics] = if (isMaterialized) {
>     val runtimeStats = getRuntimeStatistics
>     val dataSize = runtimeStats.sizeInBytes.max(0)
>     val numOutputRows = runtimeStats.rowCount.map(_.max(0))
>     Some(Statistics(dataSize, numOutputRows, isRuntime = true))
>   } else {
>     None
>   }
> {code}
> I would like to also copy over the column statistics stored in 
> Statistics.attributeMap so that they can be fed back into the logical plan 
> optimization phase. This is a small change as shown below:
> {code:scala}
>   def computeStats(): Option[Statistics] = if (isMaterialized) {
>     val runtimeStats = getRuntimeStatistics
>     val dataSize = runtimeStats.sizeInBytes.max(0)
>     val numOutputRows = runtimeStats.rowCount.map(_.max(0))
>     val attributeStats = runtimeStats.attributeStats
>     Some(Statistics(dataSize, numOutputRows, attributeStats, isRuntime = 
> true))
>   } else {
>     None
>   }
> {code}
> The Spark implementations of ShuffleExchangeLike and BroadcastExchangeLike do 
> not currently provide such column statistics, but other custom 
> implementations can.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to