[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106775334 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -19,31 +19,33 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.mutable -import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Expression, PredicateHelper} import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike} import org.apache.spark.sql.catalyst.plans.logical.{BinaryNode, Join, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf /** * Cost-based join reorder. * We may have several join reorder algorithms in the future. This class is the entry of these * algorithms, and chooses which one to use. */ -case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper { +case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = { if (!conf.cboEnabled || !conf.joinReorderEnabled) { plan } else { - val result = plan transform { -case p @ Project(projectList, j @ Join(_, _, _: InnerLike, _)) => - reorder(p, p.outputSet) -case j @ Join(_, _, _: InnerLike, _) => + val result = plan transformDown { +// Start reordering with a joinable item, which is an InnerLike join with conditions. +case j @ Join(_, _, _: InnerLike, Some(cond)) => reorder(j, j.outputSet) +case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond))) + if projectList.forall(_.isInstanceOf[Attribute]) => + reorder(p, p.outputSet) } // After reordering is finished, convert OrderedJoin back to Join - result transform { + result transformDown { --- End diff -- Is the change needed? The order must be top-down? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106775016 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -19,31 +19,33 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.mutable -import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Expression, PredicateHelper} import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike} import org.apache.spark.sql.catalyst.plans.logical.{BinaryNode, Join, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf /** * Cost-based join reorder. * We may have several join reorder algorithms in the future. This class is the entry of these * algorithms, and chooses which one to use. */ -case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper { +case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = { if (!conf.cboEnabled || !conf.joinReorderEnabled) { plan } else { - val result = plan transform { -case p @ Project(projectList, j @ Join(_, _, _: InnerLike, _)) => - reorder(p, p.outputSet) -case j @ Join(_, _, _: InnerLike, _) => + val result = plan transformDown { +// Start reordering with a joinable item, which is an InnerLike join with conditions. +case j @ Join(_, _, _: InnerLike, Some(cond)) => --- End diff -- I have a very general question about how to exclude cartesian product. See my old PR: #16762 Having a join condition does not mean it is not a Cartesian product. : ) Only having non-equal predicates in join conditions for an inner join. e.g., tab1.a < tab2.b --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17286 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106603543 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala --- @@ -187,6 +220,8 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { case (j1: Join, j2: Join) => (sameJoinPlan(j1.left, j2.left) && sameJoinPlan(j1.right, j2.right)) || (sameJoinPlan(j1.left, j2.right) && sameJoinPlan(j1.right, j2.left)) + case _ if plan1.children.nonEmpty && plan2.children.nonEmpty => --- End diff -- If they are two projects, we should recurse into it's child. And we may have some tests with other operators in the future, so I didn't only match project here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106602975 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -710,6 +710,14 @@ object SQLConf { .intConf .createWithDefault(12) + val JOIN_REORDER_CARD_WEIGHT = +buildConf("spark.sql.cbo.joinReorder.card.weight") + .doc("The weight of cardinality (number of rows) for plan cost comparison in join reorder: " + +"rows * weight + size * (1 - weight).") + .doubleConf + .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") + .createWithDefault(0.7) --- End diff -- `0.7` is just an empirical value, I think it's better to be able to tune it. Maybe mark it internal? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106583389 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala --- @@ -187,6 +220,8 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { case (j1: Join, j2: Join) => (sameJoinPlan(j1.left, j2.left) && sameJoinPlan(j1.right, j2.right)) || (sameJoinPlan(j1.left, j2.right) && sameJoinPlan(j1.right, j2.left)) + case _ if plan1.children.nonEmpty && plan2.children.nonEmpty => --- End diff -- when will we hit this branch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106583140 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -710,6 +710,14 @@ object SQLConf { .intConf .createWithDefault(12) + val JOIN_REORDER_CARD_WEIGHT = +buildConf("spark.sql.cbo.joinReorder.card.weight") + .doc("The weight of cardinality (number of rows) for plan cost comparison in join reorder: " + +"rows * weight + size * (1 - weight).") + .doubleConf + .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") + .createWithDefault(0.7) --- End diff -- it is useful to expose this config? I think most of the users will just disable join reordering if they have problems. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106583013 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -203,64 +205,46 @@ object JoinReorderDP extends PredicateHelper { private def buildJoin( oneJoinPlan: JoinPlan, otherJoinPlan: JoinPlan, - conf: CatalystConf, + conf: SQLConf, conditions: Set[Expression], - topOutput: AttributeSet): JoinPlan = { + topOutput: AttributeSet): Option[JoinPlan] = { val onePlan = oneJoinPlan.plan val otherPlan = otherJoinPlan.plan -// Now both onePlan and otherPlan become intermediate joins, so the cost of the -// new join should also include their own cardinalities and sizes. -val newCost = if (isCartesianProduct(onePlan) || isCartesianProduct(otherPlan)) { - // We consider cartesian product very expensive, thus set a very large cost for it. - // This enables to plan all the cartesian products at the end, because having a cartesian - // product as an intermediate join will significantly increase a plan's cost, making it - // impossible to be selected as the best plan for the items, unless there's no other choice. - Cost( -rows = BigInt(Long.MaxValue) * BigInt(Long.MaxValue), -size = BigInt(Long.MaxValue) * BigInt(Long.MaxValue)) -} else { - val onePlanStats = onePlan.stats(conf) - val otherPlanStats = otherPlan.stats(conf) - Cost( -rows = oneJoinPlan.cost.rows + onePlanStats.rowCount.get + - otherJoinPlan.cost.rows + otherPlanStats.rowCount.get, -size = oneJoinPlan.cost.size + onePlanStats.sizeInBytes + - otherJoinPlan.cost.size + otherPlanStats.sizeInBytes) -} - -// Put the deeper side on the left, tend to build a left-deep tree. -val (left, right) = if (oneJoinPlan.itemIds.size >= otherJoinPlan.itemIds.size) { - (onePlan, otherPlan) -} else { - (otherPlan, onePlan) -} val joinConds = conditions .filterNot(l => canEvaluate(l, onePlan)) .filterNot(r => canEvaluate(r, otherPlan)) .filter(e => e.references.subsetOf(onePlan.outputSet ++ otherPlan.outputSet)) -// We use inner join whether join condition is empty or not. Since cross join is -// equivalent to inner join without condition. -val newJoin = Join(left, right, Inner, joinConds.reduceOption(And)) -val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ otherJoinPlan.joinConds -val remainingConds = conditions -- collectedJoinConds -val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ topOutput -val neededFromNewJoin = newJoin.outputSet.filter(neededAttr.contains) -val newPlan = - if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) { -Project(neededFromNewJoin.toSeq, newJoin) +if (joinConds.isEmpty) { + // Cartesian product is very expensive, so we exclude them from candidate plans. + // This also significantly reduces the search space. --- End diff -- great! now we can safely apply this optimization :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106582845 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -272,26 +256,39 @@ object JoinReorderDP extends PredicateHelper { * @param itemIds Set of item ids participating in this partial plan. * @param plan The plan tree with the lowest cost for these items found so far. * @param joinConds Join conditions included in the plan. - * @param cost The cost of this plan is the sum of costs of all intermediate joins. + * @param planCost The cost of this plan tree is the sum of costs of all intermediate joins. --- End diff -- I think `cost` is good enough, why rename it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106582668 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -185,11 +184,14 @@ object JoinReorderDP extends PredicateHelper { // Should not join two overlapping item sets. if (oneSidePlan.itemIds.intersect(otherSidePlan.itemIds).isEmpty) { val joinPlan = buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput) -// Check if it's the first plan for the item set, or it's a better plan than -// the existing one due to lower cost. -val existingPlan = nextLevel.get(joinPlan.itemIds) -if (existingPlan.isEmpty || joinPlan.cost.lessThan(existingPlan.get.cost)) { - nextLevel.update(joinPlan.itemIds, joinPlan) +if (joinPlan.isDefined) { --- End diff -- when will this condition be false? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106582563 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -128,38 +131,34 @@ case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] wi object JoinReorderDP extends PredicateHelper { def search( - conf: CatalystConf, + conf: SQLConf, items: Seq[LogicalPlan], conditions: Set[Expression], - topOutput: AttributeSet): Option[LogicalPlan] = { + topOutput: AttributeSet): LogicalPlan = { // Level i maintains all found plans for i + 1 items. // Create the initial plans: each plan is a single item with zero cost. -val itemIndex = items.zipWithIndex +val itemIndex = items.zipWithIndex.map(_.swap).toMap --- End diff -- looks like an unnecessary change now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106357158 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -696,6 +696,13 @@ object SQLConf { .intConf .createWithDefault(12) + val JOIN_REORDER_CARD_WEIGHT = +buildConf("spark.sql.cbo.joinReorder.card.weight") + .doc("The weight of cardinality (number of rows) for plan cost comparison in join reorder: " + +"rows * weight + size * (1 - weight).") + .doubleConf + .createWithDefault(0.7) --- End diff -- Yes, checking should be added, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106355108 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -128,38 +128,43 @@ case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] wi object JoinReorderDP extends PredicateHelper { def search( - conf: CatalystConf, + conf: SQLConf, items: Seq[LogicalPlan], conditions: Set[Expression], topOutput: AttributeSet): Option[LogicalPlan] = { // Level i maintains all found plans for i + 1 items. // Create the initial plans: each plan is a single item with zero cost. -val itemIndex = items.zipWithIndex +val itemIndex = items.zipWithIndex.map(_.swap).toMap val foundPlans = mutable.Buffer[JoinPlanMap](itemIndex.map { - case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) -}.toMap) + case (id, item) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) +}) -for (lev <- 1 until items.length) { +// Build plans for next levels until the last level has only one plan. This plan contains +// all items that can be joined, so there's no need to continue. +while (foundPlans.size < items.length && foundPlans.last.size > 1) { // Build plans for the next level. foundPlans += searchLevel(foundPlans, conf, conditions, topOutput) } -val plansLastLevel = foundPlans(items.length - 1) -if (plansLastLevel.isEmpty) { - // Failed to find a plan, fall back to the original plan - None -} else { - // There must be only one plan at the last level, which contains all items. - assert(plansLastLevel.size == 1 && plansLastLevel.head._1.size == items.length) - Some(plansLastLevel.head._2.plan) +// Find the best plan +assert(foundPlans.last.size <= 1) --- End diff -- We still cannot deal with the case where several disconnected groups of items exist in the memo. Join relations may exist within these groups. E.g. ``` level 3: {ABCD} level 2: {EFG}, {HIJ}, {ABC}... level 1: {KL}, {MN}, {AB}, {BC}... level 0: ... ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106341181 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -696,6 +696,13 @@ object SQLConf { .intConf .createWithDefault(12) + val JOIN_REORDER_CARD_WEIGHT = +buildConf("spark.sql.cbo.joinReorder.card.weight") + .doc("The weight of cardinality (number of rows) for plan cost comparison in join reorder: " + +"rows * weight + size * (1 - weight).") + .doubleConf + .createWithDefault(0.7) --- End diff -- What is boundary of this? adding `check`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106338345 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -128,38 +128,43 @@ case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] wi object JoinReorderDP extends PredicateHelper { def search( - conf: CatalystConf, + conf: SQLConf, items: Seq[LogicalPlan], conditions: Set[Expression], topOutput: AttributeSet): Option[LogicalPlan] = { // Level i maintains all found plans for i + 1 items. // Create the initial plans: each plan is a single item with zero cost. -val itemIndex = items.zipWithIndex +val itemIndex = items.zipWithIndex.map(_.swap).toMap val foundPlans = mutable.Buffer[JoinPlanMap](itemIndex.map { - case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) -}.toMap) + case (id, item) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) +}) -for (lev <- 1 until items.length) { +// Build plans for next levels until the last level has only one plan. This plan contains +// all items that can be joined, so there's no need to continue. +while (foundPlans.size < items.length && foundPlans.last.size > 1) { // Build plans for the next level. foundPlans += searchLevel(foundPlans, conf, conditions, topOutput) } -val plansLastLevel = foundPlans(items.length - 1) -if (plansLastLevel.isEmpty) { - // Failed to find a plan, fall back to the original plan - None -} else { - // There must be only one plan at the last level, which contains all items. - assert(plansLastLevel.size == 1 && plansLastLevel.head._1.size == items.length) - Some(plansLastLevel.head._2.plan) +// Find the best plan +assert(foundPlans.last.size <= 1) --- End diff -- how about ``` while (foundPlans.size < items.length && foundPlans.last.size > 0) ``` When we end the while loop, either we have reached the level n, or the current level has 0 entries. Then we pick the last level which has non-zero entries, and pick the best entry from this level, and construct the final join plan. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106179885 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -128,38 +128,43 @@ case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] wi object JoinReorderDP extends PredicateHelper { def search( - conf: CatalystConf, + conf: SQLConf, items: Seq[LogicalPlan], conditions: Set[Expression], topOutput: AttributeSet): Option[LogicalPlan] = { // Level i maintains all found plans for i + 1 items. // Create the initial plans: each plan is a single item with zero cost. -val itemIndex = items.zipWithIndex +val itemIndex = items.zipWithIndex.map(_.swap).toMap val foundPlans = mutable.Buffer[JoinPlanMap](itemIndex.map { - case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) -}.toMap) + case (id, item) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) +}) -for (lev <- 1 until items.length) { +// Build plans for next levels until the last level has only one plan. This plan contains +// all items that can be joined, so there's no need to continue. +while (foundPlans.size < items.length && foundPlans.last.size > 1) { // Build plans for the next level. foundPlans += searchLevel(foundPlans, conf, conditions, topOutput) } -val plansLastLevel = foundPlans(items.length - 1) -if (plansLastLevel.isEmpty) { - // Failed to find a plan, fall back to the original plan - None -} else { - // There must be only one plan at the last level, which contains all items. - assert(plansLastLevel.size == 1 && plansLastLevel.head._1.size == items.length) - Some(plansLastLevel.head._2.plan) +// Find the best plan +assert(foundPlans.last.size <= 1) --- End diff -- Seems excluding CP bring some complexity here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106179640 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -128,38 +128,43 @@ case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] wi object JoinReorderDP extends PredicateHelper { def search( - conf: CatalystConf, + conf: SQLConf, items: Seq[LogicalPlan], conditions: Set[Expression], topOutput: AttributeSet): Option[LogicalPlan] = { // Level i maintains all found plans for i + 1 items. // Create the initial plans: each plan is a single item with zero cost. -val itemIndex = items.zipWithIndex +val itemIndex = items.zipWithIndex.map(_.swap).toMap val foundPlans = mutable.Buffer[JoinPlanMap](itemIndex.map { - case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) -}.toMap) + case (id, item) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) +}) -for (lev <- 1 until items.length) { +// Build plans for next levels until the last level has only one plan. This plan contains +// all items that can be joined, so there's no need to continue. +while (foundPlans.size < items.length && foundPlans.last.size > 1) { // Build plans for the next level. foundPlans += searchLevel(foundPlans, conf, conditions, topOutput) } -val plansLastLevel = foundPlans(items.length - 1) -if (plansLastLevel.isEmpty) { - // Failed to find a plan, fall back to the original plan - None -} else { - // There must be only one plan at the last level, which contains all items. - assert(plansLastLevel.size == 1 && plansLastLevel.head._1.size == items.length) - Some(plansLastLevel.head._2.plan) +// Find the best plan +assert(foundPlans.last.size <= 1) --- End diff -- hmm you have a good point here. If we have several disconnect item sets, e.g. {AB} and {CD}, or a more complex case: {ABCD}, {EFG}, {LM}... These cases need to be dealt with. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106178276 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -19,31 +19,31 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.mutable -import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Expression, PredicateHelper} import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike} import org.apache.spark.sql.catalyst.plans.logical.{BinaryNode, Join, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf /** * Cost-based join reorder. * We may have several join reorder algorithms in the future. This class is the entry of these * algorithms, and chooses which one to use. */ -case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper { +case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = { if (!conf.cboEnabled || !conf.joinReorderEnabled) { plan } else { - val result = plan transform { + val result = plan transformDown { case p @ Project(projectList, j @ Join(_, _, _: InnerLike, _)) => --- End diff -- Yea I think so. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106094256 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -128,38 +128,43 @@ case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] wi object JoinReorderDP extends PredicateHelper { def search( - conf: CatalystConf, + conf: SQLConf, items: Seq[LogicalPlan], conditions: Set[Expression], topOutput: AttributeSet): Option[LogicalPlan] = { // Level i maintains all found plans for i + 1 items. // Create the initial plans: each plan is a single item with zero cost. -val itemIndex = items.zipWithIndex +val itemIndex = items.zipWithIndex.map(_.swap).toMap val foundPlans = mutable.Buffer[JoinPlanMap](itemIndex.map { - case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) -}.toMap) + case (id, item) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) +}) -for (lev <- 1 until items.length) { +// Build plans for next levels until the last level has only one plan. This plan contains +// all items that can be joined, so there's no need to continue. +while (foundPlans.size < items.length && foundPlans.last.size > 1) { // Build plans for the next level. foundPlans += searchLevel(foundPlans, conf, conditions, topOutput) } -val plansLastLevel = foundPlans(items.length - 1) -if (plansLastLevel.isEmpty) { - // Failed to find a plan, fall back to the original plan - None -} else { - // There must be only one plan at the last level, which contains all items. - assert(plansLastLevel.size == 1 && plansLastLevel.head._1.size == items.length) - Some(plansLastLevel.head._2.plan) +// Find the best plan +assert(foundPlans.last.size <= 1) --- End diff -- can you answer this question: https://github.com/apache/spark/pull/17240#discussion_r105822819 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17286: [SPARK-19915][SQL] Exclude cartesian product cand...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17286#discussion_r106094103 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -19,31 +19,31 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.mutable -import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Expression, PredicateHelper} import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike} import org.apache.spark.sql.catalyst.plans.logical.{BinaryNode, Join, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf /** * Cost-based join reorder. * We may have several join reorder algorithms in the future. This class is the entry of these * algorithms, and chooses which one to use. */ -case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper { +case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = { if (!conf.cboEnabled || !conf.joinReorderEnabled) { plan } else { - val result = plan transform { + val result = plan transformDown { case p @ Project(projectList, j @ Join(_, _, _: InnerLike, _)) => --- End diff -- shall we also check if `projectList` are all attributes here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org