[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198365370 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( + originalPlan: LogicalPlan, + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): LogicalPlan = { +val orderedJoins = createOrderedJoin(input, conditions) +if (!sameJoinOrder(orderedJoins, originalPlan)) { + if (originalPlan.output != orderedJoins.output) { --- End diff -- I think opening another JIRA would be better, for people discovering this bug in earlier versions are more likely to find it if it is filed as an independent issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198347568 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,51 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case Filter(_, child) => extractLeftDeepInnerJoins(child) +case Project(_, child) => extractLeftDeepInnerJoins(child) +case _ => Seq(plan) + } + + private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( + originalPlan: LogicalPlan, + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): LogicalPlan = { +val orderedJoins = createOrderedJoin(input, conditions) +if (!sameJoinOrder(orderedJoins, originalPlan)) { + if (originalPlan.output != orderedJoins.output) { +// Keep the same output attributes and the order +Project(originalPlan.output, orderedJoins) + } else { +orderedJoins + } +} else { + originalPlan +} + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case ExtractFiltersAndInnerJoins(input, conditions) +case p @ ExtractFiltersAndInnerJoins(input, conditions) if input.size > 2 && conditions.nonEmpty => if (SQLConf.get.starSchemaDetection && !SQLConf.get.cboEnabled) { val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, conditions) if (starJoinPlan.nonEmpty) { val rest = input.filterNot(starJoinPlan.contains(_)) - createOrderedJoin(starJoinPlan ++ rest, conditions) + mayCreateOrderedJoin(p, starJoinPlan ++ rest, conditions) } else { - createOrderedJoin(input, conditions) + mayCreateOrderedJoin(p, input, conditions) } } else { -createOrderedJoin(input, conditions) +mayCreateOrderedJoin(p, input, conditions) } --- End diff -- I expanded the function logic in the end of `apply` because the logic is enough small: https://github.com/apache/spark/pull/20345/files#diff-17d31b198ff391188311550fcabd1198R119 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198345839 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( + originalPlan: LogicalPlan, + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): LogicalPlan = { +val orderedJoins = createOrderedJoin(input, conditions) +if (!sameJoinOrder(orderedJoins, originalPlan)) { + if (originalPlan.output != orderedJoins.output) { --- End diff -- ah, yes, I got you. btw, we need to file a separate jira? Is it bad to add the bug in the description of this jira and this pr? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198341155 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,51 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case Filter(_, child) => extractLeftDeepInnerJoins(child) +case Project(_, child) => extractLeftDeepInnerJoins(child) +case _ => Seq(plan) + } + + private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( + originalPlan: LogicalPlan, + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): LogicalPlan = { +val orderedJoins = createOrderedJoin(input, conditions) +if (!sameJoinOrder(orderedJoins, originalPlan)) { + if (originalPlan.output != orderedJoins.output) { +// Keep the same output attributes and the order +Project(originalPlan.output, orderedJoins) + } else { +orderedJoins + } +} else { + originalPlan +} + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case ExtractFiltersAndInnerJoins(input, conditions) +case p @ ExtractFiltersAndInnerJoins(input, conditions) if input.size > 2 && conditions.nonEmpty => if (SQLConf.get.starSchemaDetection && !SQLConf.get.cboEnabled) { val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, conditions) if (starJoinPlan.nonEmpty) { val rest = input.filterNot(starJoinPlan.contains(_)) - createOrderedJoin(starJoinPlan ++ rest, conditions) + mayCreateOrderedJoin(p, starJoinPlan ++ rest, conditions) } else { - createOrderedJoin(input, conditions) + mayCreateOrderedJoin(p, input, conditions) } } else { -createOrderedJoin(input, conditions) +mayCreateOrderedJoin(p, input, conditions) } --- End diff -- ok, I'll brush up the code based on the suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198209435 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( + originalPlan: LogicalPlan, + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): LogicalPlan = { +val orderedJoins = createOrderedJoin(input, conditions) +if (!sameJoinOrder(orderedJoins, originalPlan)) { + if (originalPlan.output != orderedJoins.output) { --- End diff -- I mean the original join re-order before your fix did not apply a Project and that must be a bug, as is illustrated by your change to JoinReorderSuite. I think we should file it and mark it fixed by this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198208026 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,51 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case Filter(_, child) => extractLeftDeepInnerJoins(child) +case Project(_, child) => extractLeftDeepInnerJoins(child) +case _ => Seq(plan) + } + + private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( + originalPlan: LogicalPlan, + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): LogicalPlan = { +val orderedJoins = createOrderedJoin(input, conditions) +if (!sameJoinOrder(orderedJoins, originalPlan)) { + if (originalPlan.output != orderedJoins.output) { +// Keep the same output attributes and the order +Project(originalPlan.output, orderedJoins) + } else { +orderedJoins + } +} else { + originalPlan +} + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case ExtractFiltersAndInnerJoins(input, conditions) +case p @ ExtractFiltersAndInnerJoins(input, conditions) if input.size > 2 && conditions.nonEmpty => if (SQLConf.get.starSchemaDetection && !SQLConf.get.cboEnabled) { val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, conditions) if (starJoinPlan.nonEmpty) { val rest = input.filterNot(starJoinPlan.contains(_)) - createOrderedJoin(starJoinPlan ++ rest, conditions) + mayCreateOrderedJoin(p, starJoinPlan ++ rest, conditions) } else { - createOrderedJoin(input, conditions) + mayCreateOrderedJoin(p, input, conditions) } } else { -createOrderedJoin(input, conditions) +mayCreateOrderedJoin(p, input, conditions) } --- End diff -- How about make it like: ``` val joinReorderedPlan = if (SQLConf.get.starSchemaDetection && !SQLConf.get.cboEnabled) { val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, conditions) if (starJoinPlan.nonEmpty) { val rest = input.filterNot(starJoinPlan.contains(_)) createOrderedJoin(starJoinPlan ++ rest, conditions) } else { createOrderedJoin(input, conditions) } } else { createOrderedJoin(input, conditions) } projectIfNecessary(joinReorderedPlan, p) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198206001 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( --- End diff -- Sorry, I didn't make myself very clear. I was suggesting something like "projectIfNecessary(joinReorderedPlan: LogicalPlan, originalPlan: LogicalPlan)" Please also refer to the comment below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198059157 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -141,14 +141,16 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { } /** - * A pattern that collects the filter and inner joins. + * A pattern that collects the filter and inner joins and skip projections with attributes only. * * Filter *| *inner Join * /\> (Seq(plan0, plan1, plan2), conditions) * Filter plan2 *| + * Project --- End diff -- I fixed the bug you pointed out and added the two tests for that. Could you check again? Thanks! https://github.com/apache/spark/pull/20345/files#diff-6c81baa433aea4741c60c00734f31406R191 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198052416 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -141,14 +141,16 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { } /** - * A pattern that collects the filter and inner joins. + * A pattern that collects the filter and inner joins and skip projections with attributes only. * * Filter *| *inner Join * /\> (Seq(plan0, plan1, plan2), conditions) * Filter plan2 *| + * Project --- End diff -- yea, good catch. Thanks! I'll fix soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198043685 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( + originalPlan: LogicalPlan, + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): LogicalPlan = { +val orderedJoins = createOrderedJoin(input, conditions) +if (!sameJoinOrder(orderedJoins, originalPlan)) { + if (originalPlan.output != orderedJoins.output) { --- End diff -- What's the bug you pointed out here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198043470 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -81,14 +92,14 @@ class JoinOptimizationSuite extends PlanTest { testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr), Some((Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr -testExtractCheckCross(x.join(y, Cross), Some((Seq((x, Cross), (y, Cross)), Seq( -testExtractCheckCross(x.join(y, Cross).join(z, Cross), +testExtractInnerJoins(x.join(y, Cross), Some((Seq((x, Cross), (y, Cross)), Seq( +testExtractInnerJoins(x.join(y, Cross).join(z, Cross), Some((Seq((x, Cross), (y, Cross), (z, Cross)), Seq( -testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Cross), +testExtractInnerJoins(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Cross), Some((Seq((x, Cross), (y, Cross), (z, Cross)), Seq("x.b".attr === "y.d".attr -testExtractCheckCross(x.join(y, Inner, Some("x.b".attr === "y.d".attr)).join(z, Cross), +testExtractInnerJoins(x.join(y, Inner, Some("x.b".attr === "y.d".attr)).join(z, Cross), Some((Seq((x, Inner), (y, Inner), (z, Cross)), Seq("x.b".attr === "y.d".attr -testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Inner), +testExtractInnerJoins(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Inner), Some((Seq((x, Cross), (y, Cross), (z, Inner)), Seq("x.b".attr === "y.d".attr } --- End diff -- ok, I'll try to add --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198043374 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( --- End diff -- Since the main purpose of this function is to order joins, I set the name. IMHO the projection is not a main task here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198029508 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( --- End diff -- Call this "projectIfNecessary"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198031184 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( + originalPlan: LogicalPlan, + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): LogicalPlan = { +val orderedJoins = createOrderedJoin(input, conditions) +if (!sameJoinOrder(orderedJoins, originalPlan)) { + if (originalPlan.output != orderedJoins.output) { --- End diff -- I think this also fixes an existing bug for join reordering, right? Shall we add it to the issue description? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198029399 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -141,14 +141,16 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { } /** - * A pattern that collects the filter and inner joins. + * A pattern that collects the filter and inner joins and skip projections with attributes only. * * Filter *| *inner Join * /\> (Seq(plan0, plan1, plan2), conditions) * Filter plan2 *| + * Project --- End diff -- I don't think this case would be covered, i.e., Project-over-Filter or Filter-over-Project. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r198030263 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -81,14 +92,14 @@ class JoinOptimizationSuite extends PlanTest { testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr), Some((Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr -testExtractCheckCross(x.join(y, Cross), Some((Seq((x, Cross), (y, Cross)), Seq( -testExtractCheckCross(x.join(y, Cross).join(z, Cross), +testExtractInnerJoins(x.join(y, Cross), Some((Seq((x, Cross), (y, Cross)), Seq( +testExtractInnerJoins(x.join(y, Cross).join(z, Cross), Some((Seq((x, Cross), (y, Cross), (z, Cross)), Seq( -testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Cross), +testExtractInnerJoins(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Cross), Some((Seq((x, Cross), (y, Cross), (z, Cross)), Seq("x.b".attr === "y.d".attr -testExtractCheckCross(x.join(y, Inner, Some("x.b".attr === "y.d".attr)).join(z, Cross), +testExtractInnerJoins(x.join(y, Inner, Some("x.b".attr === "y.d".attr)).join(z, Cross), Some((Seq((x, Inner), (y, Inner), (z, Cross)), Seq("x.b".attr === "y.d".attr -testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Inner), +testExtractInnerJoins(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Inner), Some((Seq((x, Cross), (y, Cross), (z, Inner)), Seq("x.b".attr === "y.d".attr } --- End diff -- It would be worth adding tests with "where" and "select" together if the scenario I mentioned above can be implemented. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180616339 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -145,4 +161,55 @@ class JoinOptimizationSuite extends PlanTest { } assert(broadcastChildren.size == 1) } + + test("SPARK-23172 skip projections when flattening joins") { +val x = testRelation.subquery('x) +val y = testRelation1.subquery('y) +val z = testRelation.subquery('z) +val joined = x.join(z, Inner, Some($"x.b" === $"z.b")).select($"x.a", $"z.a", $"z.c") + .join(y, Inner, Some($"y.d" === $"z.a")).analyze +val expectedTables = joined.collectLeaves().map { case p => (p, Inner) } +val expectedConditions = joined.collect { case Join(_, _, _, Some(conditions)) => conditions } +testExtractInnerJoins(joined, Some((expectedTables, expectedConditions))) + } + + test("SPARK-23172 reorder joins with projections") { --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180616142 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -116,7 +127,12 @@ class JoinOptimizationSuite extends PlanTest { ) queryAnswers foreach { queryAnswerPair => - val optimized = Optimize.execute(queryAnswerPair._1.analyze) + val optimized = Optimize.execute(queryAnswerPair._1.analyze) match { +// `ReorderJoin` may add `Project` to keep the same order of output attributes. +// So, we drop a top `Project` for tests. +case project: Project => project.child --- End diff -- yea, great suggestion and I think so. I'll try to fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180615479 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -172,17 +174,20 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { case Filter(filterCondition, j @ Join(left, right, _: InnerLike, joinCondition)) => val (plans, conditions) = flattenJoin(j) (plans, conditions ++ splitConjunctivePredicates(filterCondition)) - +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) +// Keep flattening joins when the project has attributes only +if p.projectList.forall(_.isInstanceOf[Attribute]) => + flattenJoin(j) case _ => (Seq((plan, parentJoinType)), Seq.empty) } - def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] - = plan match { -case f @ Filter(filterCondition, j @ Join(_, _, joinType: InnerLike, _)) => - Some(flattenJoin(f)) -case j @ Join(_, _, joinType, _) => - Some(flattenJoin(j)) -case _ => None + def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] = { +val (plans, conditions) = flattenJoin(plan) +if (plans.size > 1) { --- End diff -- aha, sounds good to me. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180615326 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -59,12 +75,7 @@ class JoinOptimizationSuite extends PlanTest { (noCartesian, seq_pair._2) } } - testExtractCheckCross(plan, expectedNoCross) -} - -def testExtractCheckCross -(plan: LogicalPlan, expected: Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]) { - assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected) --- End diff -- If we have multiple conditions, we need the compare them as `Set`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180613972 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def checkSameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180345994 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def checkSameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { --- End diff -- how about `sameJoinOrder`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180346211 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,49 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private def extractLeftDeepInnerJoins(plan: LogicalPlan): Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def checkSameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( + originalPlan: LogicalPlan, + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): LogicalPlan = { +val orderedJoins = createOrderedJoin(input, conditions) +if (!checkSameJoinOrder(orderedJoins, originalPlan)) { --- End diff -- ah, right, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180327289 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -172,17 +174,20 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { case Filter(filterCondition, j @ Join(left, right, _: InnerLike, joinCondition)) => val (plans, conditions) = flattenJoin(j) (plans, conditions ++ splitConjunctivePredicates(filterCondition)) - +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) +// Keep flattening joins when the project has attributes only +if p.projectList.forall(_.isInstanceOf[Attribute]) => + flattenJoin(j) case _ => (Seq((plan, parentJoinType)), Seq.empty) } - def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] - = plan match { -case f @ Filter(filterCondition, j @ Join(_, _, joinType: InnerLike, _)) => - Some(flattenJoin(f)) -case j @ Join(_, _, joinType, _) => - Some(flattenJoin(j)) -case _ => None + def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] = { +val (plans, conditions) = flattenJoin(plan) +if (plans.size > 1) { --- End diff -- how about `plans.size > 2 && conditions.nonEmpty`? then we can remove the `if` condition [here](https://github.com/apache/spark/pull/20345/files#diff-17d31b198ff391188311550fcabd1198R120) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180344569 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -59,12 +75,7 @@ class JoinOptimizationSuite extends PlanTest { (noCartesian, seq_pair._2) } } - testExtractCheckCross(plan, expectedNoCross) -} - -def testExtractCheckCross -(plan: LogicalPlan, expected: Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]) { - assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected) --- End diff -- since you rewrite this function, is the previous comparison logic wrong? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180328615 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -145,4 +161,55 @@ class JoinOptimizationSuite extends PlanTest { } assert(broadcastChildren.size == 1) } + + test("SPARK-23172 skip projections when flattening joins") { +val x = testRelation.subquery('x) +val y = testRelation1.subquery('y) +val z = testRelation.subquery('z) +val joined = x.join(z, Inner, Some($"x.b" === $"z.b")).select($"x.a", $"z.a", $"z.c") + .join(y, Inner, Some($"y.d" === $"z.a")).analyze +val expectedTables = joined.collectLeaves().map { case p => (p, Inner) } +val expectedConditions = joined.collect { case Join(_, _, _, Some(conditions)) => conditions } +testExtractInnerJoins(joined, Some((expectedTables, expectedConditions))) + } + + test("SPARK-23172 reorder joins with projections") { --- End diff -- The case can also happen without star schema enabled, right? Is it possible to use a simpler case like the one in pr description? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180340667 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -46,6 +48,20 @@ class JoinOptimizationSuite extends PlanTest { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) val testRelation1 = LocalRelation('d.int) + def testExtractInnerJoins( --- End diff -- private --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180344118 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -116,7 +127,12 @@ class JoinOptimizationSuite extends PlanTest { ) queryAnswers foreach { queryAnswerPair => - val optimized = Optimize.execute(queryAnswerPair._1.analyze) + val optimized = Optimize.execute(queryAnswerPair._1.analyze) match { +// `ReorderJoin` may add `Project` to keep the same order of output attributes. +// So, we drop a top `Project` for tests. +case project: Project => project.child --- End diff -- I'm a little hesitate to change this, because if we really forget to add a Project node after join reordering, the test can pass, but that's wrong. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r176027921 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -145,4 +161,55 @@ class JoinOptimizationSuite extends PlanTest { } assert(broadcastChildren.size == 1) } + + test("SPARK-23172 skip projections when flattening joins") { +val x = testRelation.subquery('x) +val y = testRelation1.subquery('y) +val z = testRelation.subquery('z) +val joined = x.join(z, Inner, Some($"x.b" === $"z.b")).select($"x.a", $"z.a", $"z.c") + .join(y, Inner, Some($"y.d" === $"z.a")).analyze +val expectedTables = joined.collectLeaves().map { case p => (p, Inner) } +val expectedConditions = joined.collect { case Join(_, _, _, Some(conditions)) => conditions } +testExtractInnerJoins(joined, Some((expectedTables, expectedConditions))) + } + + test("SPARK-23172 reorder joins with projections") { --- End diff -- @wzhfy Added this test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r175971417 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,49 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private def extractLeftDeepInnerJoins(plan: LogicalPlan): Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def checkSameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( + originalPlan: LogicalPlan, + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): LogicalPlan = { +val orderedJoins = createOrderedJoin(input, conditions) +if (!checkSameJoinOrder(orderedJoins, originalPlan)) { --- End diff -- If we don't have this check, `operatorOptimizationRuleSet` reaches `fixedPoint` because `ReorderJoin` is re-applied in the same join trees every time the optimization rule batch invoked. This case does not happen in the master because reordered joins have `Project` in internal nodes (`Project` added by following optimization rules, e.g., `ColumnPruning`) and this plan structure guards this case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r175971428 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -145,4 +159,15 @@ class JoinOptimizationSuite extends PlanTest { } assert(broadcastChildren.size == 1) } + + test("SPARK-23172 skip projections when flattening joins") { --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r175971439 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -141,14 +141,16 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { } /** - * A pattern that collects the filter and inner joins. + * A pattern that collects the filter and inner joins (and skip projections in plan sub-trees). --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r175725372 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,49 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private def extractLeftDeepInnerJoins(plan: LogicalPlan): Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def checkSameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( + originalPlan: LogicalPlan, + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): LogicalPlan = { +val orderedJoins = createOrderedJoin(input, conditions) +if (!checkSameJoinOrder(orderedJoins, originalPlan)) { --- End diff -- Is this check necessary? I think check `originalPlan.output != orderedJoins.output` is enough, and faster. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r175696570 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -141,14 +141,16 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { } /** - * A pattern that collects the filter and inner joins. + * A pattern that collects the filter and inner joins (and skip projections in plan sub-trees). --- End diff -- skip projections with attributes only --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r175727668 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -145,4 +159,15 @@ class JoinOptimizationSuite extends PlanTest { } assert(broadcastChildren.size == 1) } + + test("SPARK-23172 skip projections when flattening joins") { --- End diff -- Could you add a test case which would fail before the fix? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r175696187 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -172,17 +174,23 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { case Filter(filterCondition, j @ Join(left, right, _: InnerLike, joinCondition)) => val (plans, conditions) = flattenJoin(j) (plans, conditions ++ splitConjunctivePredicates(filterCondition)) - +case p @ Project(_, j @ Join(left, right, _: InnerLike, joinCondition)) => + // Keep flattening joins when projects having attributes only + if (p.outputSet.subsetOf(j.outputSet)) { --- End diff -- If we want to make sure the project has attributes only, should it be `p.projectList.forall(_.isInstanceOf[Attribute])`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r175696302 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -172,17 +174,23 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { case Filter(filterCondition, j @ Join(left, right, _: InnerLike, joinCondition)) => val (plans, conditions) = flattenJoin(j) (plans, conditions ++ splitConjunctivePredicates(filterCondition)) - +case p @ Project(_, j @ Join(left, right, _: InnerLike, joinCondition)) => + // Keep flattening joins when projects having attributes only --- End diff -- nit: when projects having attributes only => when the project has attributes only --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org