Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/15363#discussion_r105030111 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -20,19 +20,340 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins +import org.apache.spark.sql.catalyst.planning.{BaseTableAccess, ExtractFiltersAndInnerJoins} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.CatalystConf + +/** + * Encapsulates star-schema join detection. + */ +case class DetectStarSchemaJoin(conf: CatalystConf) extends PredicateHelper { + + /** + * Star schema consists of one or more fact tables referencing a number of dimension + * tables. In general, star-schema joins are detected using the following conditions: + * 1. Informational RI constraints (reliable detection) + * + Dimension contains a primary key that is being joined to the fact table. + * + Fact table contains foreign keys referencing multiple dimension tables. + * 2. Cardinality based heuristics + * + Usually, the table with the highest cardinality is the fact table. + * + Table being joined with the most number of tables is the fact table. + * + * To detect star joins, the algorithm uses a combination of the above two conditions. + * The fact table is chosen based on the cardinality heuristics, and the dimension + * tables are chosen based on the RI constraints. A star join will consist of the largest + * fact table joined with the dimension tables on their primary keys. To detect that a + * column is a primary key, the algorithm uses table and column statistics. + * + * Since Catalyst only supports left-deep tree plans, the algorithm currently returns only + * the star join with the largest fact table. Choosing the largest fact table on the + * driving arm to avoid large inners is in general a good heuristic. This restriction can + * be lifted with support for bushy tree plans. + * + * The highlights of the algorithm are the following: + * + * Given a set of joined tables/plans, the algorithm first verifies if they are eligible + * for star join detection. An eligible plan is a base table access with valid statistics. + * A base table access represents Project or Filter operators above a LeafNode. Conservatively, + * the algorithm only considers base table access as part of a star join since they provide + * reliable statistics. + * + * If some of the plans are not base table access, or statistics are not available, the algorithm + * falls back to the positional join reordering, since in the absence of statistics it cannot make + * good planning decisions. Otherwise, the algorithm finds the table with the largest cardinality + * (number of rows), which is assumed to be a fact table. + * + * Next, it computes the set of dimension tables for the current fact table. A dimension table + * is assumed to be in a RI relationship with a fact table. To infer column uniqueness, + * the algorithm compares the number of distinct values with the total number of rows in the + * table. If their relative difference is within certain limits (i.e. ndvMaxError * 2, adjusted + * based on tpcds data), the column is assumed to be unique. + * + * Given a star join, i.e. fact and dimension tables, the algorithm considers three cases: + * + * 1) The star join is an expanding join i.e. the fact table is joined using inequality + * predicates or Cartesian product. In this case, the algorithm conservatively falls back + * to the default join reordering since it cannot make good planning decisions in the absence + * of the cost model. + * + * 2) The star join is a selective join. This case is detected by observing local predicates + * on the dimension tables. In a star schema relationship, the join between the fact and the + * dimension table is a FK-PK join. Heuristically, a selective dimension may reduce + * the result of a join. + * + * 3) The star join is not a selective join (i.e. doesn't reduce the number of rows). In this + * case, the algorithm conservatively falls back to the default join reordering. + * + * If an eligible star join was found in step 2 above, the algorithm reorders the tables based + * on the following heuristics: + * 1) Place the largest fact table on the driving arm to avoid large tables on the inner of a + * join and thus favor hash joins. + * 2) Apply the most selective dimensions early in the plan to reduce data flow. + * + * Other assumptions made by the algorithm, mainly to prevent regressions in the absence of a + * cost model, are the following: + * 1) Only considers star joins with more than one dimensions, which is a typical + * star join scenario. + * 2) If the top largest tables have comparable number of rows, fall back to the default + * join reordering. This will prevent changing the position of the large tables in the join. + */ + def findStarJoinPlan( + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): Seq[(LogicalPlan, InnerLike)] = { + assert(input.size >= 2) + + val emptyStarJoinPlan = Seq.empty[(LogicalPlan, InnerLike)] + + // Find if the input plans are eligible for star join detection. + // An eligible plan is a base table access with valid statistics. + val foundEligibleJoin = input.forall { case (plan, _) => --- End diff -- NIT: we can be shorter: ```scala val foundEligibleJoin = input.forall { case (BaseTableAccess(t, _), _) => t.stats(conf).rowCount.isDefined case _ => false } ```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org