Repository: spark
Updated Branches:
  refs/heads/master b70e483cb -> da3557429


[SPARK-22515][SQL] Estimation relation size based on numRows * rowSize

## What changes were proposed in this pull request?

Currently, relation size is computed as the sum of file size, which is 
error-prone because storage format like parquet may have a much smaller file 
size compared to in-memory size. When we choose broadcast join based on file 
size, there's a risk of OOM. But if the number of rows is available in 
statistics, we can get a better estimation by `numRows * rowSize`, which helps 
to alleviate this problem.

## How was this patch tested?

Added a new test case for data source table and hive table.

Author: Zhenhua Wang <wzh_...@163.com>
Author: Zhenhua Wang <wangzhen...@huawei.com>

Closes #19743 from wzhfy/better_leaf_size.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da355742
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da355742
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da355742

Branch: refs/heads/master
Commit: da35574297d96eb750bdfee755a48defc36e284a
Parents: b70e483
Author: Zhenhua Wang <wzh_...@163.com>
Authored: Tue Nov 28 11:43:21 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Tue Nov 28 11:43:21 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  | 14 +++++----
 .../SizeInBytesOnlyStatsPlanVisitor.scala       |  1 -
 .../sql/StatisticsCollectionTestBase.scala      |  2 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala | 30 +++++++++++++++++++-
 4 files changed, 38 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/da355742/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index b10ce05..95b6fbb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, 
InternalRow, TableIden
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference, Cast, ExprId, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
+import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.types.StructType
@@ -367,13 +368,14 @@ case class CatalogStatistics(
    * on column names.
    */
   def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics 
= {
-    if (cboEnabled) {
-      val attrStats = planOutput.flatMap(a => colStats.get(a.name).map(a -> _))
-      Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount,
-        attributeStats = AttributeMap(attrStats))
+    if (cboEnabled && rowCount.isDefined) {
+      val attrStats = AttributeMap(planOutput.flatMap(a => 
colStats.get(a.name).map(a -> _)))
+      // Estimate size as number of rows * row size.
+      val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, 
attrStats)
+      Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = 
attrStats)
     } else {
-      // When CBO is disabled, we apply the size-only estimation strategy, so 
there's no need to
-      // propagate other statistics from catalog to the plan.
+      // When CBO is disabled or the table doesn't have other statistics, we 
apply the size-only
+      // estimation strategy and only propagate sizeInBytes in statistics.
       Statistics(sizeInBytes = sizeInBytes)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/da355742/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
index d701a95..5e1c4e0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
@@ -19,7 +19,6 @@ package 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation
 
 import org.apache.spark.sql.catalyst.expressions.AttributeMap
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
-import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/da355742/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 0a0407d..65ccc19 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -224,7 +224,7 @@ abstract class StatisticsCollectionTestBase extends 
QueryTest with SQLTestUtils
 
     // Check relation statistics
     withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
-      assert(relation.stats.sizeInBytes == 0)
+      assert(relation.stats.sizeInBytes == 1)
       assert(relation.stats.rowCount == Some(0))
       assert(relation.stats.attributeStats.size == 1)
       val (attribute, colStat) = relation.stats.attributeStats.head

http://git-wip-us.apache.org/repos/asf/spark/blob/da355742/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 7427948..0cdd930 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -41,7 +41,35 @@ import org.apache.spark.sql.types._
 
 
 class StatisticsSuite extends StatisticsCollectionTestBase with 
TestHiveSingleton {
-   test("Hive serde tables should fallback to HDFS for size estimation") {
+
+  test("size estimation for relations is based on row size * number of rows") {
+    val dsTbl = "rel_est_ds_table"
+    val hiveTbl = "rel_est_hive_table"
+    withTable(dsTbl, hiveTbl) {
+      spark.range(1000L).write.format("parquet").saveAsTable(dsTbl)
+      spark.range(1000L).write.format("hive").saveAsTable(hiveTbl)
+
+      Seq(dsTbl, hiveTbl).foreach { tbl =>
+        sql(s"ANALYZE TABLE $tbl COMPUTE STATISTICS")
+        val catalogStats = getCatalogStatistics(tbl)
+        withSQLConf(SQLConf.CBO_ENABLED.key -> "false") {
+          val relationStats = 
spark.table(tbl).queryExecution.optimizedPlan.stats
+          assert(relationStats.sizeInBytes == catalogStats.sizeInBytes)
+          assert(relationStats.rowCount.isEmpty)
+        }
+        spark.sessionState.catalog.refreshTable(TableIdentifier(tbl))
+        withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
+          val relationStats = 
spark.table(tbl).queryExecution.optimizedPlan.stats
+          // Due to compression in parquet files, in this test, file size is 
smaller than
+          // in-memory size.
+          assert(catalogStats.sizeInBytes < relationStats.sizeInBytes)
+          assert(catalogStats.rowCount == relationStats.rowCount)
+        }
+      }
+    }
+  }
+
+  test("Hive serde tables should fallback to HDFS for size estimation") {
     withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
       withTable("csv_table") {
         withTempDir { tempDir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to