This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 074e1b39d27 [SPARK-41539][SQL] Remap stats and constraints against output in logical plan for LogicalRDD 074e1b39d27 is described below commit 074e1b39d279f12ff8d822a03741f33f159f5df8 Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Wed Dec 21 15:02:18 2022 +0900 [SPARK-41539][SQL] Remap stats and constraints against output in logical plan for LogicalRDD ### What changes were proposed in this pull request? This PR proposes to remap stats and constraints against the output in logical for LogicalRDD, like we remap stats and constraints against the "new" output when we call newInstance. ### Why are the changes needed? The output in logical plan and optimized plan can be "slightly" different (we observed the difference of exprId), and then the query can fail due to the invalid attribute reference(s) in stats and constraints for LogicalRDD. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Modified test cases. Closes #39082 from HeartSaVioR/SPARK-41539. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../apache/spark/sql/execution/ExistingRDD.scala | 89 +++++++++++++++++----- .../org/apache/spark/sql/DataFrameSuite.scala | 29 ++++++- 2 files changed, 98 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 3acadee5fb4..3dcf0efaadd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Encoder, SparkSession} import org.apache.spark.sql.catalyst.InternalRow @@ -103,6 +104,8 @@ case class LogicalRDD( originConstraints: Option[ExpressionSet] = None) extends LeafNode with MultiInstanceRelation { + import LogicalRDD._ + override protected final def otherCopyArgs: Seq[AnyRef] = session :: originStats :: originConstraints :: Nil @@ -122,22 +125,8 @@ case class LogicalRDD( case e: Attribute => rewrite.getOrElse(e, e) }.asInstanceOf[SortOrder]) - val rewrittenStatistics = originStats.map { s => - Statistics( - s.sizeInBytes, - s.rowCount, - AttributeMap[ColumnStat](s.attributeStats.map { - case (attr, v) => (rewrite.getOrElse(attr, attr), v) - }), - s.isRuntime - ) - } - - val rewrittenConstraints = originConstraints.map { c => - c.map(_.transform { - case e: Attribute => rewrite.getOrElse(e, e) - }) - } + val rewrittenStatistics = originStats.map(rewriteStatistics(_, rewrite)) + val rewrittenConstraints = originConstraints.map(rewriteConstraints(_, rewrite)) LogicalRDD( output.map(rewrite), @@ -163,7 +152,7 @@ case class LogicalRDD( override lazy val constraints: ExpressionSet = originConstraints.getOrElse(ExpressionSet()) } -object LogicalRDD { +object LogicalRDD extends Logging { /** * Create a new LogicalRDD based on existing Dataset. Stats and constraints are inherited from * origin Dataset. @@ -183,16 +172,80 @@ object LogicalRDD { } } + val logicalPlan = originDataset.logicalPlan val optimizedPlan = originDataset.queryExecution.optimizedPlan val executedPlan = originDataset.queryExecution.executedPlan + val (stats, constraints) = rewriteStatsAndConstraints(logicalPlan, optimizedPlan) + LogicalRDD( originDataset.logicalPlan.output, rdd, firstLeafPartitioning(executedPlan.outputPartitioning), executedPlan.outputOrdering, isStreaming - )(originDataset.sparkSession, Some(optimizedPlan.stats), Some(optimizedPlan.constraints)) + )(originDataset.sparkSession, stats, constraints) + } + + private[sql] def buildOutputAssocForRewrite( + source: Seq[Attribute], + destination: Seq[Attribute]): Option[Map[Attribute, Attribute]] = { + // We check the name and type, allowing nullability, exprId, metadata, qualifier be different + // E.g. This could happen during optimization phase. + val rewrite = source.zip(destination).flatMap { case (attr1, attr2) => + if (attr1.name == attr2.name && attr1.dataType == attr2.dataType) { + Some(attr1 -> attr2) + } else { + None + } + }.toMap + + if (rewrite.size == source.size) { + Some(rewrite) + } else { + None + } + } + + private[sql] def rewriteStatsAndConstraints( + logicalPlan: LogicalPlan, + optimizedPlan: LogicalPlan): (Option[Statistics], Option[ExpressionSet]) = { + val rewrite = buildOutputAssocForRewrite(optimizedPlan.output, logicalPlan.output) + + rewrite.map { rw => + val rewrittenStatistics = rewriteStatistics(optimizedPlan.stats, rw) + val rewrittenConstraints = rewriteConstraints(optimizedPlan.constraints, rw) + + (Some(rewrittenStatistics), Some(rewrittenConstraints)) + }.getOrElse { + // can't rewrite stats and constraints, give up + logWarning("The output columns are expected to the same (for name and type) for output " + + "between logical plan and optimized plan, but they aren't. output in logical plan: " + + s"${logicalPlan.output.map(_.simpleString(10))} / output in optimized plan: " + + s"${optimizedPlan.output.map(_.simpleString(10))}") + + (None, None) + } + } + + private[sql] def rewriteStatistics( + originStats: Statistics, + colRewrite: Map[Attribute, Attribute]): Statistics = { + Statistics( + originStats.sizeInBytes, + originStats.rowCount, + AttributeMap[ColumnStat](originStats.attributeStats.map { + case (attr, v) => (colRewrite.getOrElse(attr, attr), v) + }), + originStats.isRuntime) + } + + private[sql] def rewriteConstraints( + originConstraints: ExpressionSet, + colRewrite: Map[Attribute, Attribute]): ExpressionSet = { + originConstraints.map(_.transform { + case e: Attribute => colRewrite.getOrElse(e, e) + }) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 589ee1bea27..4269aaea0df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2121,12 +2121,25 @@ class DataFrameSuite extends QueryTest withSQLConf(SQLConf.CBO_ENABLED.key -> "true") { val df = Dataset.ofRows(spark, statsPlan) + // add some map-like operations which optimizer will optimize away, and make a divergence + // for output between logical plan and optimized plan + // logical plan + // Project [cb#6 AS cbool#12, cby#7 AS cbyte#13, ci#8 AS cint#14] + // +- Project [cbool#0 AS cb#6, cbyte#1 AS cby#7, cint#2 AS ci#8] + // +- OutputListAwareStatsTestPlan [cbool#0, cbyte#1, cint#2], 2, 16 + // optimized plan + // OutputListAwareStatsTestPlan [cbool#0, cbyte#1, cint#2], 2, 16 + .selectExpr("cbool AS cb", "cbyte AS cby", "cint AS ci") + .selectExpr("cb AS cbool", "cby AS cbyte", "ci AS cint") // We can't leverage LogicalRDD.fromDataset here, since it triggers physical planning and // there is no matching physical node for OutputListAwareStatsTestPlan. + val optimizedPlan = df.queryExecution.optimizedPlan + val rewrite = LogicalRDD.buildOutputAssocForRewrite(optimizedPlan.output, + df.logicalPlan.output) val logicalRDD = LogicalRDD( df.logicalPlan.output, spark.sparkContext.emptyRDD[InternalRow], isStreaming = true)( - spark, Some(df.queryExecution.optimizedPlan.stats), None) + spark, Some(LogicalRDD.rewriteStatistics(optimizedPlan.stats, rewrite.get)), None) val stats = logicalRDD.computeStats() val expectedStats = Statistics(sizeInBytes = expectedSize, rowCount = Some(2), @@ -2164,12 +2177,24 @@ class DataFrameSuite extends QueryTest val statsPlan = OutputListAwareConstraintsTestPlan(outputList = outputList) val df = Dataset.ofRows(spark, statsPlan) + // add some map-like operations which optimizer will optimize away, and make a divergence + // for output between logical plan and optimized plan + // logical plan + // Project [cb#6 AS cbool#12, cby#7 AS cbyte#13, ci#8 AS cint#14] + // +- Project [cbool#0 AS cb#6, cbyte#1 AS cby#7, cint#2 AS ci#8] + // +- OutputListAwareConstraintsTestPlan [cbool#0, cbyte#1, cint#2] + // optimized plan + // OutputListAwareConstraintsTestPlan [cbool#0, cbyte#1, cint#2] + .selectExpr("cbool AS cb", "cbyte AS cby", "cint AS ci") + .selectExpr("cb AS cbool", "cby AS cbyte", "ci AS cint") // We can't leverage LogicalRDD.fromDataset here, since it triggers physical planning and // there is no matching physical node for OutputListAwareConstraintsTestPlan. + val optimizedPlan = df.queryExecution.optimizedPlan + val rewrite = LogicalRDD.buildOutputAssocForRewrite(optimizedPlan.output, df.logicalPlan.output) val logicalRDD = LogicalRDD( df.logicalPlan.output, spark.sparkContext.emptyRDD[InternalRow], isStreaming = true)( - spark, None, Some(df.queryExecution.optimizedPlan.constraints)) + spark, None, Some(LogicalRDD.rewriteConstraints(optimizedPlan.constraints, rewrite.get))) val constraints = logicalRDD.constraints val expectedConstraints = buildExpectedConstraints(logicalRDD.output) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org