[ https://issues.apache.org/jira/browse/SPARK-22042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16249005#comment-16249005 ]
Felix Cheung commented on SPARK-22042: -------------------------------------- since this is in 2.1.0 and 2.2.0, technically this isn't a regression. I will watch this fix but currently not a blocker for 2.2.1 > ReorderJoinPredicates can break when child's partitioning is not decided > ------------------------------------------------------------------------ > > Key: SPARK-22042 > URL: https://issues.apache.org/jira/browse/SPARK-22042 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.0, 2.2.0 > Reporter: Tejas Patil > > When `ReorderJoinPredicates` tries to get the `outputPartitioning` of its > children, the children may not be properly constructed as the child-subtree > has to still go through other planner rules. > In this particular case, the child is `SortMergeJoinExec`. Since the required > `Exchange` operators are not in place (because `EnsureRequirements` runs > _after_ `ReorderJoinPredicates`), the join's children would not have > partitioning defined. This breaks while creation the `PartitioningCollection` > here : > https://github.com/apache/spark/blob/94439997d57875838a8283c543f9b44705d3a503/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L69 > Small repro: > {noformat} > context.sql("SET spark.sql.autoBroadcastJoinThreshold=0") > val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", > "k") > df.write.format("parquet").saveAsTable("table1") > df.write.format("parquet").saveAsTable("table2") > df.write.format("parquet").bucketBy(8, "j", "k").saveAsTable("bucketed_table") > sql(""" > SELECT * > FROM ( > SELECT a.i, a.j, a.k > FROM bucketed_table a > JOIN table1 b > ON a.i = b.i > ) c > JOIN table2 > ON c.i = table2.i > """).explain > {noformat} > This fails with : > {noformat} > java.lang.IllegalArgumentException: requirement failed: > PartitioningCollection requires all of its partitionings have the same > numPartitions. > at scala.Predef$.require(Predef.scala:224) > at > org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:324) > at > org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69) > at > org.apache.spark.sql.execution.ProjectExec.outputPartitioning(basicPhysicalOperators.scala:82) > at > org.apache.spark.sql.execution.joins.ReorderJoinPredicates$$anonfun$apply$1.applyOrElse(ReorderJoinPredicates.scala:91) > at > org.apache.spark.sql.execution.joins.ReorderJoinPredicates$$anonfun$apply$1.applyOrElse(ReorderJoinPredicates.scala:76) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) > at > org.apache.spark.sql.execution.joins.ReorderJoinPredicates.apply(ReorderJoinPredicates.scala:76) > at > org.apache.spark.sql.execution.joins.ReorderJoinPredicates.apply(ReorderJoinPredicates.scala:34) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:100) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:100) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:100) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:90) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:90) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:201) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:201) > at > org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:114) > at > org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:201) > at > org.apache.spark.sql.execution.command.ExplainCommand.run(commands.scala:147) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:78) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:75) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:91) > at org.apache.spark.sql.Dataset.explain(Dataset.scala:464) > at org.apache.spark.sql.Dataset.explain(Dataset.scala:477) > ... 60 elided > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org