This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new ae8b35a [SPARK-33018][SQL] Fix estimate statistics issue if child has 0 bytes ae8b35a is described below commit ae8b35a0d24f8c83bbbb597d668875793a8dbca6 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Tue Sep 29 16:46:04 2020 +0000 [SPARK-33018][SQL] Fix estimate statistics issue if child has 0 bytes ### What changes were proposed in this pull request? This pr fix estimate statistics issue if child has 0 bytes. ### Why are the changes needed? The `sizeInBytes` can be `0` when AQE and CBO are enabled(`spark.sql.adaptive.enabled`=true, `spark.sql.cbo.enabled`=true and `spark.sql.cbo.planStats.enabled`=true). This will generate incorrect BroadcastJoin, resulting in Driver OOM. For example: ![SPARK-33018](https://user-images.githubusercontent.com/5399861/94457606-647e3d00-01e7-11eb-85ee-812ae6efe7bb.jpg) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual test. Closes #29894 from wangyum/SPARK-33018. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 711d8dd28afd9af92b025f9908534e5f1d575042) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../SizeInBytesOnlyStatsPlanVisitor.scala | 3 ++- .../statsEstimation/JoinEstimationSuite.scala | 22 ++++++++++++++++++++++ .../statsEstimation/StatsEstimationTestBase.scala | 9 ++++++--- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala index da36db7..a586988 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala @@ -53,7 +53,8 @@ object SizeInBytesOnlyStatsPlanVisitor extends LogicalPlanVisitor[Statistics] { */ override def default(p: LogicalPlan): Statistics = p match { case p: LeafNode => p.computeStats() - case _: LogicalPlan => Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).product) + case _: LogicalPlan => + Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).filter(_ > 0L).product) } override def visitAggregate(p: Aggregate): Statistics = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala index 6c5a2b2..cdfc863 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala @@ -551,4 +551,26 @@ class JoinEstimationSuite extends StatsEstimationTestBase { attributeStats = AttributeMap(Nil)) assert(join.stats == expectedStats) } + + test("SPARK-33018 Fix estimate statistics issue if child has 0 bytes") { + case class MyStatsTestPlan( + outputList: Seq[Attribute], + sizeInBytes: BigInt) extends LeafNode { + override def output: Seq[Attribute] = outputList + override def computeStats(): Statistics = Statistics(sizeInBytes = sizeInBytes) + } + + val left = MyStatsTestPlan( + outputList = Seq("key-1-2", "key-2-4").map(nameToAttr), + sizeInBytes = BigInt(100)) + + val right = MyStatsTestPlan( + outputList = Seq("key-1-2", "key-2-3").map(nameToAttr), + sizeInBytes = BigInt(0)) + + val join = Join(left, right, LeftOuter, + Some(EqualTo(nameToAttr("key-2-4"), nameToAttr("key-2-3"))), JoinHint.NONE) + + assert(join.stats == Statistics(sizeInBytes = 100)) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala index 9dceca5..0a27e31 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala @@ -26,17 +26,20 @@ import org.apache.spark.sql.types.{IntegerType, StringType} trait StatsEstimationTestBase extends SparkFunSuite { - var originalValue: Boolean = false + var originalCBOValue: Boolean = false + var originalPlanStatsValue: Boolean = false override def beforeAll(): Unit = { super.beforeAll() // Enable stats estimation based on CBO. - originalValue = SQLConf.get.getConf(SQLConf.CBO_ENABLED) + originalCBOValue = SQLConf.get.getConf(SQLConf.CBO_ENABLED) + originalPlanStatsValue = SQLConf.get.getConf(SQLConf.PLAN_STATS_ENABLED) SQLConf.get.setConf(SQLConf.CBO_ENABLED, true) } override def afterAll(): Unit = { - SQLConf.get.setConf(SQLConf.CBO_ENABLED, originalValue) + SQLConf.get.setConf(SQLConf.CBO_ENABLED, originalCBOValue) + SQLConf.get.setConf(SQLConf.PLAN_STATS_ENABLED, originalPlanStatsValue) super.afterAll() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org