Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15363#discussion_r85394505
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
    @@ -83,10 +88,221 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
         }
       }
     
    +  /**
    +   * Helper function that computes the size of a table access after 
applying
    +   * the predicates. Currently, the function returns the table size.
    +   * When plan cardinality and predicate selectivity are implemented in 
Catalyst,
    +   * the function will be refined based on these estimates.
    +   */
    +  private def computeBaseTableSize(
    +      input: LogicalPlan,
    +      conditions: Seq[Expression]): Option[BigInt] = {
    +    input match {
    +      case BaseTableAccess(t, cond) if t.statistics.sizeInBytes >= 0 =>
    +        // To be replaced by the table cardinality, when available.
    +        val tableSize = t.statistics.sizeInBytes
    +
    +        // Collect predicate selectivity, when available.
    +        // val predicates = conditions.filter(canEvaluate(_, p)) ++ cond
    +        // Compute the output cardinality = tableSize * predicates' 
selectivity.
    +        Option(tableSize)
    +
    +      case _ => None
    +    }
    +  }
    +
    +  /**
    +   * Helper case class to hold (plan, size) pairs.
    +   */
    +  private case class TableSize(plan: (LogicalPlan, InnerLike), size: 
Option[BigInt])
    +
    +  /**
    +   * Checks if a star join is a selective join. A star join is assumed
    +   * to be selective if (1) there are local predicates on the dimension
    +   * tables and (2) the join predicates are equi joins.
    +   */
    +  private def isSelectiveStarJoin(
    +      factTable: LogicalPlan,
    +      dimTables: Seq[LogicalPlan],
    +      conditions: Seq[Expression]): Boolean = {
    +
    +    val starJoinPlan = factTable +: dimTables
    +
    +    // Checks if all star plans are base table access.
    +    val allBaseTables = starJoinPlan.forall {
    +      case BaseTableAccess(_, _) => true
    +      case _ => false
    +    }
    +
    +    // Checks if any condition applies to the dimension tables.
    +    val localPredicates = dimTables.exists { plan =>
    +      // Exclude the IsNotNull predicates until predicate selectivity is 
available.
    +      // In most cases, this predicate is artificially introduced by the 
Optimizer
    +      // to enforce nullability constraints.
    +      
conditions.filterNot(_.isInstanceOf[IsNotNull]).exists(canEvaluate(_, plan))
    +    }
    +
    +    // Checks if there are any predicates pushed down to the base table 
access.
    +    // Similarly, exclude IsNotNull predicates.
    +    val pushedDownPredicates = dimTables.exists {
    +      case BaseTableAccess(_, p) if p.nonEmpty => 
!p.forall(_.isInstanceOf[IsNotNull])
    +      case _ => false
    +    }
    +
    +    val isSelectiveDimensions = allBaseTables && (localPredicates || 
pushedDownPredicates)
    +
    +    // Checks if the join predicates are equi joins of the form fact.col = 
dim.col.
    +    val isEquiJoin = dimTables.forall { plan =>
    +      val refs = factTable.outputSet ++ plan.outputSet
    +      conditions.filterNot(canEvaluate(_, factTable))
    +        .filterNot(canEvaluate(_, plan))
    +        .filter(_.references.subsetOf(refs))
    +        .forall {
    +          case EqualTo(_: Attribute, _: Attribute) => true
    +          case EqualNullSafe(_: Attribute, _: Attribute) => true
    +          case _ => false
    +        }
    +    }
    +
    +    isSelectiveDimensions && isEquiJoin
    +  }
    +
    +  /**
    +   * Finds an optimal join order for star schema queries i.e.
    +   * + The largest fact table on the driving arm to avoid large tables on 
the inner of a join,
    +   *   and thus favor hash joins.
    +   * + Apply the most selective dimensions early in the plan to reduce the 
data flow.
    +   *
    +   * In general, star-schema joins are detected using the following 
conditions:
    +   *  1. Informational RI constraints (reliable detection)
    +   *    + Dimension contains a primary key that is being joined to the 
fact table.
    +   *    + Fact table contains foreign keys referencing multiple dimension 
tables.
    +   *  2. Cardinality based heuristics
    +   *    + Usually, the table with the highest cardinality is the fact 
table.
    +   *    + Table being joined with the most number of tables is the fact 
table.
    +   *
    +   * Given the current Spark optimizer limitations (e.g. limited 
statistics, no RI
    +   * constraints information, etc.), the algorithm uses table size 
statistics and the
    +   * type of predicates as a naive approach to infer table cardinality and 
join selectivity.
    +   * When plan cardinality and predicate selectivity features are 
implemented in Catalyst,
    +   * the star detection logic will be refined based on these estimates.
    +   *
    +   * The highlights of the algorithm are the following:
    +   *
    +   * Given a set of joined tables/plans, the algorithm finds the set of 
eligible fact tables.
    +   * An eligible fact table is a base table access with valid statistics. 
A base table access
    +   * represents Project or Filter operators above a LeafNode. 
Conservatively, the algorithm
    +   * only considers base table access as part of a star join since they 
provide reliable
    +   * statistics.
    +   *
    +   * If there are no eligible fact tables (e.g. no base table access i.e. 
complex sub-plans),
    +   * the algorithm falls back to the positional join reordering, which is 
the default join
    +   * ordering in Catalyst. Otherwise, the algorithm finds the largest fact 
table by sorting
    +   * the tables based on their size
    +   *
    +   * The algorithm next computes the set of dimension tables for the 
current fact table.
    +   * Conservatively, only equality joins are considered between a fact and 
a dimension table.
    +   *
    +   * Given a star join, i.e. fact and dimension tables, the algorithm 
considers three cases:
    +   *
    +   * 1) The star join is an expanding join i.e. the fact table is joined 
using inequality
    +   * predicates, or Cartesian product. In this case, the algorithm is 
looking for the next
    +   * eligible star join. The rational is that a selective star join, if 
found, may reduce
    +   * the data stream early in the plan. An alternative, more conservative 
design, is to simply
    +   * fall back to the default join reordering if the star join with the 
largest fact table
    +   * is not selective.
    +   *
    +   * 2) The star join is a selective join. This case is detected by 
observing local predicates
    +   * on the dimension tables. In a star schema relationship, the join 
between the fact and the
    +   * dimension table is in general a FK-PK join. Heuristically, a 
selective dimension may reduce
    +   * the result of a join.
    +   *
    +   * 3) The star join is not a selective join (i.e. doesn't reduce the 
number of rows on the
    +   * driving arm). In this case, the algorithm conservatively falls back 
to the default join
    +   * reordering.
    +   */
    +  @tailrec
    +  private def findEligibleStarJoinPlan(
    +      joinedTables: Seq[(LogicalPlan, InnerLike)],
    +      eligibleFactTables: Seq[(LogicalPlan, InnerLike)],
    +      conditions: Seq[Expression]): Seq[(LogicalPlan, InnerLike)] = {
    +    if (eligibleFactTables.isEmpty || joinedTables.size < 2) {
    +      // There is no eligible star join.
    +      Seq.empty[(LogicalPlan, InnerLike)]
    +    } else {
    +      // Compute the size of the eligible fact tables and sort them in 
descending order.
    +      // An eligible fact table is a base table access with valid 
statistics.
    +      val sortedFactTables = eligibleFactTables.map { plan =>
    +        TableSize(plan, computeBaseTableSize(plan._1, conditions))
    +      }.collect {
    +        case t @ TableSize(_, Some(_)) => t
    +      }.sortBy(_.size)(implicitly[Ordering[Option[BigInt]]].reverse)
    +
    +      sortedFactTables match {
    +        case Nil =>
    +          // There are no stats available for these plans.
    +          // Return an empty plan list and fall back to the default join 
reordering.
    +          Seq.empty[(LogicalPlan, InnerLike)]
    +
    +        case table1 :: table2 :: _ if table1.size == table2.size =>
    +          // There are more tables with the same size. Conservatively, 
fall back to the
    +          // default join reordering since we cannot make good plan 
choices.
    +          Seq.empty[(LogicalPlan, InnerLike)]
    +
    +        case TableSize(factPlan @ (factTable, _), _) :: _ =>
    +          // A fact table was found. Get the largest fact table and 
compute the corresponding
    +          // dimension tables. Conservatively, a dimension table is 
assumed to be joined with
    +          // the fact table using equality predicates.
    +          val eligibleDimPlans = joinedTables.filterNot(_._1 eq 
factTable).filter { dimPlan =>
    +            val dimTable = dimPlan._1
    +            val refs = factTable.outputSet ++ dimTable.outputSet
    +            conditions.filter(p => p.isInstanceOf[EqualTo] || 
p.isInstanceOf[EqualNullSafe])
    +              .filterNot(canEvaluate(_, factTable))
    +              .filterNot(canEvaluate(_, dimTable))
    +              .exists(_.references.subsetOf(refs))
    +          }
    +
    +          val dimTables = eligibleDimPlans.map { _._1 }
    +
    +          if (eligibleDimPlans.isEmpty) {
    +            // This is an expanding join since there are no equality joins
    +            // between the current fact table and the joined tables.
    +            // Look for the next eligible star join plan.
    +            findEligibleStarJoinPlan(
    +              joinedTables,
    +              eligibleFactTables.filterNot(_._1 eq factTable),
    +              conditions)
    +
    +          } else if (isSelectiveStarJoin(factTable, dimTables, 
conditions)) {
    +            // This is a selective star join and all dimensions are base 
table access.
    +            // Compute the size of the dimensions and return the star join
    +            // with the most selective dimensions joined lower in the plan.
    +            val sortedDims = eligibleDimPlans.map { plan =>
    --- End diff --
    
    Another thing could be useful is that placing the selective broadcast join 
before shuffle join.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to