[ https://issues.apache.org/jira/browse/ARROW-17325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andy Grove updated ARROW-17325: ------------------------------- Description: In QueryStageExec.computeStats we copy partial statistics from materlized query stages by calling QueryStageExec#getRuntimeStatistics, which in turn calls ShuffleExchangeLike#runtimeStatistics or BroadcastExchangeLike#runtimeStatistics. Only dataSize and numOutputRows are copied into the new Statistics object: {code:scala} def computeStats(): Option[Statistics] = if (isMaterialized) { val runtimeStats = getRuntimeStatistics val dataSize = runtimeStats.sizeInBytes.max(0) val numOutputRows = runtimeStats.rowCount.map(_.max(0)) Some(Statistics(dataSize, numOutputRows, isRuntime = true)) } else { None } {code} I would like to also copy over the column statistics stored in Statistics.attributeMap so that they can be fed back into the logical plan optimization phase. This is a small change as shown below: {code:scala} def computeStats(): Option[Statistics] = if (isMaterialized) { val runtimeStats = getRuntimeStatistics val dataSize = runtimeStats.sizeInBytes.max(0) val numOutputRows = runtimeStats.rowCount.map(_.max(0)) val attributeStats = runtimeStats.attributeStats Some(Statistics(dataSize, numOutputRows, attributeStats, isRuntime = true)) } else { None } {code} The Spark implementations of ShuffleExchangeLike and BroadcastExchangeLike do not currently provide such column statistics, but other custom implementations can. was: In QueryStageExec.computeStats we copy partial statistics from materlized query stages by calling QueryStageExec#getRuntimeStatistics, which in turn calls ShuffleExchangeLike#runtimeStatistics or BroadcastExchangeLike#runtimeStatistics. Only dataSize and numOutputRows are copied into the new Statistics object: {code:scala} def computeStats(): Option[Statistics] = if (isMaterialized) { val runtimeStats = getRuntimeStatistics val dataSize = runtimeStats.sizeInBytes.max(0) val numOutputRows = runtimeStats.rowCount.map(_.max(0)) Some(Statistics(dataSize, numOutputRows, isRuntime = true)) } else { None } {code} I would like to also copy over the column statistics stored in Statistics.attributeMap so that they can be fed back into the logical plan optimization phase. The Spark implementations of ShuffleExchangeLike and BroadcastExchangeLike do not currently provide such column statistics but other custom implementations can. > AQE should use available column statistics from completed query stages > ---------------------------------------------------------------------- > > Key: ARROW-17325 > URL: https://issues.apache.org/jira/browse/ARROW-17325 > Project: Apache Arrow > Issue Type: Improvement > Components: SQL > Reporter: Andy Grove > Priority: Major > > In QueryStageExec.computeStats we copy partial statistics from materlized > query stages by calling QueryStageExec#getRuntimeStatistics, which in turn > calls ShuffleExchangeLike#runtimeStatistics or > BroadcastExchangeLike#runtimeStatistics. > Only dataSize and numOutputRows are copied into the new Statistics object: > {code:scala} > def computeStats(): Option[Statistics] = if (isMaterialized) { > val runtimeStats = getRuntimeStatistics > val dataSize = runtimeStats.sizeInBytes.max(0) > val numOutputRows = runtimeStats.rowCount.map(_.max(0)) > Some(Statistics(dataSize, numOutputRows, isRuntime = true)) > } else { > None > } > {code} > I would like to also copy over the column statistics stored in > Statistics.attributeMap so that they can be fed back into the logical plan > optimization phase. This is a small change as shown below: > {code:scala} > def computeStats(): Option[Statistics] = if (isMaterialized) { > val runtimeStats = getRuntimeStatistics > val dataSize = runtimeStats.sizeInBytes.max(0) > val numOutputRows = runtimeStats.rowCount.map(_.max(0)) > val attributeStats = runtimeStats.attributeStats > Some(Statistics(dataSize, numOutputRows, attributeStats, isRuntime = > true)) > } else { > None > } > {code} > The Spark implementations of ShuffleExchangeLike and BroadcastExchangeLike do > not currently provide such column statistics, but other custom > implementations can. -- This message was sent by Atlassian Jira (v8.20.10#820010)