[ https://issues.apache.org/jira/browse/SPARK-22042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169148#comment-16169148 ]
Tejas Patil commented on SPARK-22042: ------------------------------------- At the core, the problem is `ReorderJoinPredicates` expects the input tree to be resolved (esp. when it comes to ensuring partitioning requirements). Possible fixes: - Option 1: Have `EnsureRequirements` be done before `ReorderJoinPredicates`. Since `EnsureRequirements` would have inserted sort and shuffle nodes, `ReorderJoinPredicates` would have to remove those out. This would definitely produce sub-optimal plans as `EnsureRequirements` would have done some decision making (eg. how many shuffle partitions to generate ?) and at that point `ReorderJoinPredicates` cannot correct that easily. - Option 2: Do `EnsureRequirements` over the children of input tree for `ReorderJoinPredicates`. This will be simple change but would look weird given that we dont call rules from other rules (not to my knowledge). I will create a PR for option #2 but will be happy to discuss if there are better solutions. > ReorderJoinPredicates can break when child's partitioning is not decided > ------------------------------------------------------------------------ > > Key: SPARK-22042 > URL: https://issues.apache.org/jira/browse/SPARK-22042 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.0, 2.2.0 > Reporter: Tejas Patil > Priority: Minor > > When `ReorderJoinPredicates` tries to get the `outputPartitioning` of its > children, the children may not be properly constructed as the child-subtree > has to still go through other planner rules. > In this particular case, the child is `SortMergeJoinExec`. Since the required > `Exchange` operators are not in place (because `EnsureRequirements` runs > _after_ `ReorderJoinPredicates`), the join's children would not have > partitioning defined. This breaks while creation the `PartitioningCollection` > here : > https://github.com/apache/spark/blob/94439997d57875838a8283c543f9b44705d3a503/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L69 > Small repro: > {noformat} > context.sql("SET spark.sql.autoBroadcastJoinThreshold=0") > val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", > "k") > df.write.format("parquet").saveAsTable("table1") > df.write.format("parquet").saveAsTable("table2") > df.write.format("parquet").bucketBy(8, "j", "k").saveAsTable("bucketed_table") > sql(""" > SELECT * > FROM ( > SELECT a.i, a.j, a.k > FROM bucketed_table a > JOIN table1 b > ON a.i = b.i > ) c > JOIN table2 > ON c.i = table2.i > """).explain > {noformat} > This fails with : > {noformat} > java.lang.IllegalArgumentException: requirement failed: > PartitioningCollection requires all of its partitionings have the same > numPartitions. > at scala.Predef$.require(Predef.scala:224) > at > org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:324) > at > org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69) > at > org.apache.spark.sql.execution.ProjectExec.outputPartitioning(basicPhysicalOperators.scala:82) > at > org.apache.spark.sql.execution.joins.ReorderJoinPredicates$$anonfun$apply$1.applyOrElse(ReorderJoinPredicates.scala:91) > at > org.apache.spark.sql.execution.joins.ReorderJoinPredicates$$anonfun$apply$1.applyOrElse(ReorderJoinPredicates.scala:76) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) > at > org.apache.spark.sql.execution.joins.ReorderJoinPredicates.apply(ReorderJoinPredicates.scala:76) > at > org.apache.spark.sql.execution.joins.ReorderJoinPredicates.apply(ReorderJoinPredicates.scala:34) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:100) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:100) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:100) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:90) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:90) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:201) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:201) > at > org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:114) > at > org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:201) > at > org.apache.spark.sql.execution.command.ExplainCommand.run(commands.scala:147) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:78) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:75) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:91) > at org.apache.spark.sql.Dataset.explain(Dataset.scala:464) > at org.apache.spark.sql.Dataset.explain(Dataset.scala:477) > ... 60 elided > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org