Repository: spark Updated Branches: refs/heads/master b70e483cb -> da3557429
[SPARK-22515][SQL] Estimation relation size based on numRows * rowSize ## What changes were proposed in this pull request? Currently, relation size is computed as the sum of file size, which is error-prone because storage format like parquet may have a much smaller file size compared to in-memory size. When we choose broadcast join based on file size, there's a risk of OOM. But if the number of rows is available in statistics, we can get a better estimation by `numRows * rowSize`, which helps to alleviate this problem. ## How was this patch tested? Added a new test case for data source table and hive table. Author: Zhenhua Wang <wzh_...@163.com> Author: Zhenhua Wang <wangzhen...@huawei.com> Closes #19743 from wzhfy/better_leaf_size. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da355742 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da355742 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da355742 Branch: refs/heads/master Commit: da35574297d96eb750bdfee755a48defc36e284a Parents: b70e483 Author: Zhenhua Wang <wzh_...@163.com> Authored: Tue Nov 28 11:43:21 2017 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Tue Nov 28 11:43:21 2017 -0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/catalog/interface.scala | 14 +++++---- .../SizeInBytesOnlyStatsPlanVisitor.scala | 1 - .../sql/StatisticsCollectionTestBase.scala | 2 +- .../apache/spark/sql/hive/StatisticsSuite.scala | 30 +++++++++++++++++++- 4 files changed, 38 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/da355742/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index b10ce05..95b6fbb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIden import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.types.StructType @@ -367,13 +368,14 @@ case class CatalogStatistics( * on column names. */ def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = { - if (cboEnabled) { - val attrStats = planOutput.flatMap(a => colStats.get(a.name).map(a -> _)) - Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount, - attributeStats = AttributeMap(attrStats)) + if (cboEnabled && rowCount.isDefined) { + val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _))) + // Estimate size as number of rows * row size. + val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats) + Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats) } else { - // When CBO is disabled, we apply the size-only estimation strategy, so there's no need to - // propagate other statistics from catalog to the plan. + // When CBO is disabled or the table doesn't have other statistics, we apply the size-only + // estimation strategy and only propagate sizeInBytes in statistics. Statistics(sizeInBytes = sizeInBytes) } } http://git-wip-us.apache.org/repos/asf/spark/blob/da355742/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala index d701a95..5e1c4e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.plans.logical.statsEstimation import org.apache.spark.sql.catalyst.expressions.AttributeMap import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} -import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ /** http://git-wip-us.apache.org/repos/asf/spark/blob/da355742/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala index 0a0407d..65ccc19 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala @@ -224,7 +224,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils // Check relation statistics withSQLConf(SQLConf.CBO_ENABLED.key -> "true") { - assert(relation.stats.sizeInBytes == 0) + assert(relation.stats.sizeInBytes == 1) assert(relation.stats.rowCount == Some(0)) assert(relation.stats.attributeStats.size == 1) val (attribute, colStat) = relation.stats.attributeStats.head http://git-wip-us.apache.org/repos/asf/spark/blob/da355742/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 7427948..0cdd930 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -41,7 +41,35 @@ import org.apache.spark.sql.types._ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton { - test("Hive serde tables should fallback to HDFS for size estimation") { + + test("size estimation for relations is based on row size * number of rows") { + val dsTbl = "rel_est_ds_table" + val hiveTbl = "rel_est_hive_table" + withTable(dsTbl, hiveTbl) { + spark.range(1000L).write.format("parquet").saveAsTable(dsTbl) + spark.range(1000L).write.format("hive").saveAsTable(hiveTbl) + + Seq(dsTbl, hiveTbl).foreach { tbl => + sql(s"ANALYZE TABLE $tbl COMPUTE STATISTICS") + val catalogStats = getCatalogStatistics(tbl) + withSQLConf(SQLConf.CBO_ENABLED.key -> "false") { + val relationStats = spark.table(tbl).queryExecution.optimizedPlan.stats + assert(relationStats.sizeInBytes == catalogStats.sizeInBytes) + assert(relationStats.rowCount.isEmpty) + } + spark.sessionState.catalog.refreshTable(TableIdentifier(tbl)) + withSQLConf(SQLConf.CBO_ENABLED.key -> "true") { + val relationStats = spark.table(tbl).queryExecution.optimizedPlan.stats + // Due to compression in parquet files, in this test, file size is smaller than + // in-memory size. + assert(catalogStats.sizeInBytes < relationStats.sizeInBytes) + assert(catalogStats.rowCount == relationStats.rowCount) + } + } + } + } + + test("Hive serde tables should fallback to HDFS for size estimation") { withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { withTable("csv_table") { withTempDir { tempDir => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org