This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new ff395a3 Revert "[SPARK-29721][SQL] Prune unnecessary nested fields from Generate without Project ff395a3 is described below commit ff395a39a5b10a7e71ef61813084bd3cf120280c Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Sun Feb 9 19:45:16 2020 -0800 Revert "[SPARK-29721][SQL] Prune unnecessary nested fields from Generate without Project This reverts commit a0e63b61e7c5d55ae2a9213b95ab1e87ac7c203c. ### What changes were proposed in this pull request? This reverts the patch at #26978 based on gatorsmile's suggestion. ### Why are the changes needed? Original patch #26978 has not considered a corner case. We may need to put more time on ensuring we can cover all cases. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test. Closes #27504 from viirya/revert-SPARK-29721. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Xiao Li <gatorsm...@gmail.com> --- .../catalyst/optimizer/NestedColumnAliasing.scala | 47 ---------------------- .../spark/sql/catalyst/optimizer/Optimizer.scala | 43 +++++++++++--------- .../execution/datasources/SchemaPruningSuite.scala | 32 --------------- 3 files changed, 25 insertions(+), 97 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index ea85014..43a6006 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -155,53 +155,6 @@ object NestedColumnAliasing { case MapType(keyType, valueType, _) => totalFieldNum(keyType) + totalFieldNum(valueType) case _ => 1 // UDT and others } -} - -/** - * This prunes unnessary nested columns from `Generate` and optional `Project` on top - * of it. - */ -object GeneratorNestedColumnAliasing { - def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match { - // Either `nestedPruningOnExpressions` or `nestedSchemaPruningEnabled` is enabled, we - // need to prune nested columns through Project and under Generate. The difference is - // when `nestedSchemaPruningEnabled` is on, nested columns will be pruned further at - // file format readers if it is supported. - case Project(projectList, g: Generate) if (SQLConf.get.nestedPruningOnExpressions || - SQLConf.get.nestedSchemaPruningEnabled) && canPruneGenerator(g.generator) => - // On top on `Generate`, a `Project` that might have nested column accessors. - // We try to get alias maps for both project list and generator's children expressions. - NestedColumnAliasing.getAliasSubMap(projectList ++ g.generator.children).map { - case (nestedFieldToAlias, attrToAliases) => - val newChild = pruneGenerate(g, nestedFieldToAlias, attrToAliases) - Project(NestedColumnAliasing.getNewProjectList(projectList, nestedFieldToAlias), newChild) - } - - case g: Generate if SQLConf.get.nestedSchemaPruningEnabled && - canPruneGenerator(g.generator) => - NestedColumnAliasing.getAliasSubMap(g.generator.children).map { - case (nestedFieldToAlias, attrToAliases) => - pruneGenerate(g, nestedFieldToAlias, attrToAliases) - } - - case _ => - None - } - - private def pruneGenerate( - g: Generate, - nestedFieldToAlias: Map[ExtractValue, Alias], - attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = { - val newGenerator = g.generator.transform { - case f: ExtractValue if nestedFieldToAlias.contains(f) => - nestedFieldToAlias(f).toAttribute - }.asInstanceOf[Generator] - - // Defer updating `Generate.unrequiredChildIndex` to next round of `ColumnPruning`. - val newGenerate = g.copy(generator = newGenerator) - - NestedColumnAliasing.replaceChildrenWithAliases(newGenerate, attrToAliases) - } /** * This is a while-list for pruning nested fields at `Generator`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c90117b..08acac1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -601,24 +601,31 @@ object ColumnPruning extends Rule[LogicalPlan] { s.copy(child = prunedChild(child, s.references)) // prune unrequired references - case p @ Project(_, g: Generate) => - val currP = if (p.references != g.outputSet) { - val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references - val newChild = prunedChild(g.child, requiredAttrs) - val unrequired = g.generator.references -- p.references - val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1)) - .map(_._2) - p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices)) - } else { - p - } - // If we can prune nested column on Project + Generate, do it now. - // Otherwise by transforming down to Generate, it could be pruned individually, - // and causes nested column on top Project unable to resolve. - GeneratorNestedColumnAliasing.unapply(currP).getOrElse(currP) - - // prune unrequired nested fields from `Generate`. - case GeneratorNestedColumnAliasing(p) => p + case p @ Project(_, g: Generate) if p.references != g.outputSet => + val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references + val newChild = prunedChild(g.child, requiredAttrs) + val unrequired = g.generator.references -- p.references + val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1)) + .map(_._2) + p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices)) + + // prune unrequired nested fields + case p @ Project(projectList, g: Generate) if SQLConf.get.nestedPruningOnExpressions && + NestedColumnAliasing.canPruneGenerator(g.generator) => + NestedColumnAliasing.getAliasSubMap(projectList ++ g.generator.children).map { + case (nestedFieldToAlias, attrToAliases) => + val newGenerator = g.generator.transform { + case f: ExtractValue if nestedFieldToAlias.contains(f) => + nestedFieldToAlias(f).toAttribute + }.asInstanceOf[Generator] + + // Defer updating `Generate.unrequiredChildIndex` to next round of `ColumnPruning`. + val newGenerate = g.copy(generator = newGenerator) + + val newChild = NestedColumnAliasing.replaceChildrenWithAliases(newGenerate, attrToAliases) + + Project(NestedColumnAliasing.getNewProjectList(projectList, nestedFieldToAlias), newChild) + }.getOrElse(p) // Eliminate unneeded attributes from right side of a Left Existence Join. case j @ Join(_, right, LeftExistence(_), _, _) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala index 5977e86..a3d4905 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -301,38 +301,6 @@ abstract class SchemaPruningSuite checkAnswer(query, Row("Y.", 1) :: Row("X.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil) } - testSchemaPruning("select explode of nested field of array of struct") { - // Config combinations - val configs = Seq((true, true), (true, false), (false, true), (false, false)) - - configs.foreach { case (nestedPruning, nestedPruningOnExpr) => - withSQLConf( - SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key -> nestedPruning.toString, - SQLConf.NESTED_PRUNING_ON_EXPRESSIONS.key -> nestedPruningOnExpr.toString) { - val query1 = spark.table("contacts") - .select(explode(col("friends.first"))) - if (nestedPruning) { - // If `NESTED_SCHEMA_PRUNING_ENABLED` is enabled, - // even disabling `NESTED_PRUNING_ON_EXPRESSIONS`, - // nested schema is still pruned at scan node. - checkScan(query1, "struct<friends:array<struct<first:string>>>") - } else { - checkScan(query1, "struct<friends:array<struct<first:string,middle:string,last:string>>>") - } - checkAnswer(query1, Row("Susan") :: Nil) - - val query2 = spark.table("contacts") - .select(explode(col("friends.first")), col("friends.middle")) - if (nestedPruning) { - checkScan(query2, "struct<friends:array<struct<first:string,middle:string>>>") - } else { - checkScan(query2, "struct<friends:array<struct<first:string,middle:string,last:string>>>") - } - checkAnswer(query2, Row("Susan", Array("Z.")) :: Nil) - } - } - } - protected def testSchemaPruning(testName: String)(testThunk: => Unit): Unit = { test(s"Spark vectorized reader - without partition data column - $testName") { withSQLConf(vectorizedReaderEnabledKey -> "true") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org