Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15363#discussion_r85394505 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -83,10 +88,221 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + /** + * Helper function that computes the size of a table access after applying + * the predicates. Currently, the function returns the table size. + * When plan cardinality and predicate selectivity are implemented in Catalyst, + * the function will be refined based on these estimates. + */ + private def computeBaseTableSize( + input: LogicalPlan, + conditions: Seq[Expression]): Option[BigInt] = { + input match { + case BaseTableAccess(t, cond) if t.statistics.sizeInBytes >= 0 => + // To be replaced by the table cardinality, when available. + val tableSize = t.statistics.sizeInBytes + + // Collect predicate selectivity, when available. + // val predicates = conditions.filter(canEvaluate(_, p)) ++ cond + // Compute the output cardinality = tableSize * predicates' selectivity. + Option(tableSize) + + case _ => None + } + } + + /** + * Helper case class to hold (plan, size) pairs. + */ + private case class TableSize(plan: (LogicalPlan, InnerLike), size: Option[BigInt]) + + /** + * Checks if a star join is a selective join. A star join is assumed + * to be selective if (1) there are local predicates on the dimension + * tables and (2) the join predicates are equi joins. + */ + private def isSelectiveStarJoin( + factTable: LogicalPlan, + dimTables: Seq[LogicalPlan], + conditions: Seq[Expression]): Boolean = { + + val starJoinPlan = factTable +: dimTables + + // Checks if all star plans are base table access. + val allBaseTables = starJoinPlan.forall { + case BaseTableAccess(_, _) => true + case _ => false + } + + // Checks if any condition applies to the dimension tables. + val localPredicates = dimTables.exists { plan => + // Exclude the IsNotNull predicates until predicate selectivity is available. + // In most cases, this predicate is artificially introduced by the Optimizer + // to enforce nullability constraints. + conditions.filterNot(_.isInstanceOf[IsNotNull]).exists(canEvaluate(_, plan)) + } + + // Checks if there are any predicates pushed down to the base table access. + // Similarly, exclude IsNotNull predicates. + val pushedDownPredicates = dimTables.exists { + case BaseTableAccess(_, p) if p.nonEmpty => !p.forall(_.isInstanceOf[IsNotNull]) + case _ => false + } + + val isSelectiveDimensions = allBaseTables && (localPredicates || pushedDownPredicates) + + // Checks if the join predicates are equi joins of the form fact.col = dim.col. + val isEquiJoin = dimTables.forall { plan => + val refs = factTable.outputSet ++ plan.outputSet + conditions.filterNot(canEvaluate(_, factTable)) + .filterNot(canEvaluate(_, plan)) + .filter(_.references.subsetOf(refs)) + .forall { + case EqualTo(_: Attribute, _: Attribute) => true + case EqualNullSafe(_: Attribute, _: Attribute) => true + case _ => false + } + } + + isSelectiveDimensions && isEquiJoin + } + + /** + * Finds an optimal join order for star schema queries i.e. + * + The largest fact table on the driving arm to avoid large tables on the inner of a join, + * and thus favor hash joins. + * + Apply the most selective dimensions early in the plan to reduce the data flow. + * + * In general, star-schema joins are detected using the following conditions: + * 1. Informational RI constraints (reliable detection) + * + Dimension contains a primary key that is being joined to the fact table. + * + Fact table contains foreign keys referencing multiple dimension tables. + * 2. Cardinality based heuristics + * + Usually, the table with the highest cardinality is the fact table. + * + Table being joined with the most number of tables is the fact table. + * + * Given the current Spark optimizer limitations (e.g. limited statistics, no RI + * constraints information, etc.), the algorithm uses table size statistics and the + * type of predicates as a naive approach to infer table cardinality and join selectivity. + * When plan cardinality and predicate selectivity features are implemented in Catalyst, + * the star detection logic will be refined based on these estimates. + * + * The highlights of the algorithm are the following: + * + * Given a set of joined tables/plans, the algorithm finds the set of eligible fact tables. + * An eligible fact table is a base table access with valid statistics. A base table access + * represents Project or Filter operators above a LeafNode. Conservatively, the algorithm + * only considers base table access as part of a star join since they provide reliable + * statistics. + * + * If there are no eligible fact tables (e.g. no base table access i.e. complex sub-plans), + * the algorithm falls back to the positional join reordering, which is the default join + * ordering in Catalyst. Otherwise, the algorithm finds the largest fact table by sorting + * the tables based on their size + * + * The algorithm next computes the set of dimension tables for the current fact table. + * Conservatively, only equality joins are considered between a fact and a dimension table. + * + * Given a star join, i.e. fact and dimension tables, the algorithm considers three cases: + * + * 1) The star join is an expanding join i.e. the fact table is joined using inequality + * predicates, or Cartesian product. In this case, the algorithm is looking for the next + * eligible star join. The rational is that a selective star join, if found, may reduce + * the data stream early in the plan. An alternative, more conservative design, is to simply + * fall back to the default join reordering if the star join with the largest fact table + * is not selective. + * + * 2) The star join is a selective join. This case is detected by observing local predicates + * on the dimension tables. In a star schema relationship, the join between the fact and the + * dimension table is in general a FK-PK join. Heuristically, a selective dimension may reduce + * the result of a join. + * + * 3) The star join is not a selective join (i.e. doesn't reduce the number of rows on the + * driving arm). In this case, the algorithm conservatively falls back to the default join + * reordering. + */ + @tailrec + private def findEligibleStarJoinPlan( + joinedTables: Seq[(LogicalPlan, InnerLike)], + eligibleFactTables: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): Seq[(LogicalPlan, InnerLike)] = { + if (eligibleFactTables.isEmpty || joinedTables.size < 2) { + // There is no eligible star join. + Seq.empty[(LogicalPlan, InnerLike)] + } else { + // Compute the size of the eligible fact tables and sort them in descending order. + // An eligible fact table is a base table access with valid statistics. + val sortedFactTables = eligibleFactTables.map { plan => + TableSize(plan, computeBaseTableSize(plan._1, conditions)) + }.collect { + case t @ TableSize(_, Some(_)) => t + }.sortBy(_.size)(implicitly[Ordering[Option[BigInt]]].reverse) + + sortedFactTables match { + case Nil => + // There are no stats available for these plans. + // Return an empty plan list and fall back to the default join reordering. + Seq.empty[(LogicalPlan, InnerLike)] + + case table1 :: table2 :: _ if table1.size == table2.size => + // There are more tables with the same size. Conservatively, fall back to the + // default join reordering since we cannot make good plan choices. + Seq.empty[(LogicalPlan, InnerLike)] + + case TableSize(factPlan @ (factTable, _), _) :: _ => + // A fact table was found. Get the largest fact table and compute the corresponding + // dimension tables. Conservatively, a dimension table is assumed to be joined with + // the fact table using equality predicates. + val eligibleDimPlans = joinedTables.filterNot(_._1 eq factTable).filter { dimPlan => + val dimTable = dimPlan._1 + val refs = factTable.outputSet ++ dimTable.outputSet + conditions.filter(p => p.isInstanceOf[EqualTo] || p.isInstanceOf[EqualNullSafe]) + .filterNot(canEvaluate(_, factTable)) + .filterNot(canEvaluate(_, dimTable)) + .exists(_.references.subsetOf(refs)) + } + + val dimTables = eligibleDimPlans.map { _._1 } + + if (eligibleDimPlans.isEmpty) { + // This is an expanding join since there are no equality joins + // between the current fact table and the joined tables. + // Look for the next eligible star join plan. + findEligibleStarJoinPlan( + joinedTables, + eligibleFactTables.filterNot(_._1 eq factTable), + conditions) + + } else if (isSelectiveStarJoin(factTable, dimTables, conditions)) { + // This is a selective star join and all dimensions are base table access. + // Compute the size of the dimensions and return the star join + // with the most selective dimensions joined lower in the plan. + val sortedDims = eligibleDimPlans.map { plan => --- End diff -- Another thing could be useful is that placing the selective broadcast join before shuffle join.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org