yeshengm commented on a change in pull request #24983: [SPARK-27714][SQL][CBO] 
Support Genetic Algorithm based join reorder
URL: https://github.com/apache/spark/pull/24983#discussion_r311205884
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ##########
 @@ -470,3 +397,427 @@ object JoinReorderDPFilters extends PredicateHelper {
  * extended with the set of connected/unconnected plans.
  */
 case class JoinGraphInfo (starJoins: Set[Int], nonStarJoins: Set[Int])
+
+/**
+ * Reorder the joins using a genetic algorithm. The algorithm treat the 
reorder problem
+ * to a traveling salesmen problem, and use genetic algorithm give an 
optimized solution.
+ *
+ * The implementation refs the geqo in postgresql, which is contibuted by 
Darrell Whitley:
+ * https://www.postgresql.org/docs/9.1/geqo-pg-intro.html
+ *
+ * For more info about genetic algorithm and the edge recombination crossover, 
pls see:
+ * "A Genetic Algorithm Tutorial, Darrell Whitley"
+ * https://link.springer.com/article/10.1007/BF00175354
+ * and "Scheduling Problems and Traveling Salesmen: The Genetic Edge 
Recombination Operator,
+ * Darrell Whitley et al." https://dl.acm.org/citation.cfm?id=657238
+ * respectively.
+ */
+object JoinReorderGA extends PredicateHelper with Logging {
+
+  def search(
+      conf: SQLConf,
+      items: Seq[LogicalPlan],
+      conditions: Set[Expression],
+      output: Seq[Attribute]): Option[LogicalPlan] = {
+
+    val startTime = System.nanoTime()
+
+    val itemsWithIndex = items.zipWithIndex.map {
+      case (plan, id) => id -> JoinPlan(Set(id), plan, Set.empty, Cost(0, 0))
+    }.toMap
+
+    val topOutputSet = AttributeSet(output)
+
+    val pop = Population(conf, itemsWithIndex, conditions, topOutputSet).evolve
+
+    val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
+    logInfo(s"Join reordering finished. Duration: $durationInMs ms, number of 
items: " +
+        s"${items.length}, number of plans in memo: ${ pop.chromos.size}")
+
+    assert(pop.chromos.head.basicPlans.size == items.length)
+    pop.chromos.head.integratedPlan match {
+      case Some(joinPlan) => joinPlan.plan match {
+        case p @ Project(projectList, _: Join) if projectList != output =>
+          assert(topOutputSet == p.outputSet)
+          // Keep the same order of final output attributes.
+          Some(p.copy(projectList = output))
+        case finalPlan if !sameOutput(finalPlan, output) =>
+          Some(Project(output, finalPlan))
+        case finalPlan =>
+          Some(finalPlan)
+      }
+      case _ => None
+    }
+  }
+}
+
+/**
+ * A pair of parent individuals can breed a child with certain crossover 
process.
+ * With crossover, child can inherit gene from its parents, and these gene 
snippets
+ * finally compose a new [[Chromosome]].
+ */
+@DeveloperApi
+trait Crossover {
+
+  /**
+   * Generate a new [[Chromosome]] from the given parent [[Chromosome]]s,
+   * with this crossover algorithm.
+   */
+  def newChromo(father: Chromosome, mother: Chromosome) : Chromosome
+}
+
+case class EdgeTable(table: Map[JoinPlan, Seq[JoinPlan]])
+
+/**
+ * This class implements the Genetic Edge Recombination algorithm.
+ * For more information about the Genetic Edge Recombination,
+ * see "Scheduling Problems and Traveling Salesmen: The Genetic Edge
+ * Recombination Operator" by Darrell Whitley et al.
+ * https://dl.acm.org/citation.cfm?id=657238
+ */
+object EdgeRecombination extends Crossover {
+
+  def genEdgeTable(father: Chromosome, mother: Chromosome) : EdgeTable = {
+    val fatherTable = father.basicPlans.map(g => g -> 
findNeighbours(father.basicPlans, g)).toMap
+    val motherTable = mother.basicPlans.map(g => g -> 
findNeighbours(mother.basicPlans, g)).toMap
+    EdgeTable(
+      fatherTable.map(entry => entry._1 -> (entry._2 ++ 
motherTable(entry._1))))
+  }
+
+  def findNeighbours(genes: Seq[JoinPlan], g: JoinPlan) : Seq[JoinPlan] = {
+    val genesIndexed = genes.toIndexedSeq
+    val index = genesIndexed.indexOf(g)
+    val length = genes.size
+    if (index > 0 && index < length - 1) {
+      Seq(genesIndexed(index - 1), genesIndexed(index + 1))
+    } else if (index == 0) {
+      Seq(genesIndexed(1), genesIndexed(length - 1))
+    } else if (index == length - 1) {
+      Seq(genesIndexed(0), genesIndexed(length - 2))
+    } else {
+      Seq()
+    }
+  }
+
+  override def newChromo(father: Chromosome, mother: Chromosome): Chromosome = 
{
+    var newGenes: Seq[JoinPlan] = Seq()
+    // 1. Generate the edge table.
+    var table = genEdgeTable(father, mother).table
+    // 2. Choose a start point randomly from the heads of father/mother.
+    var current =
+      if (util.Random.nextInt(2) == 0) father.basicPlans.head else 
mother.basicPlans.head
+    newGenes :+= current
+
+    var stop = false
+    while (!stop) {
+      // 3. Filter out the chosen point from the edge table.
+      table = table.map(
+        entry => entry._1 -> entry._2.filter(g => if (g == current) false else 
true)
+      )
+      // 4. Choose next point among its neighbours. The criterion for choosing 
which point
+      // is that the one who has fewest neighbours. If two or more points has 
the same num
+      // of neighbours, choose one randomly. If there's no neighbours 
available for this
+      // point but there're still other remaining points, choose one from them 
randomly.
+      val tobeVisited = table(current)
+      val neighboursTable = tobeVisited.map(g => g -> 
table(g)).sortBy(-_._2.size).toMap
+      val filteredTable = table.filter(_._2.nonEmpty)
+      if (neighboursTable.nonEmpty) {
+        val numBase = neighboursTable.head._2.size
+        var numCand = 0
+        neighboursTable.foreach(entry => if (entry._2.size == numBase) numCand 
+= 1)
+        current = neighboursTable.toIndexedSeq(util.Random.nextInt(numCand))._1
+        newGenes :+= current
+      } else if (filteredTable.nonEmpty) {
+        current = 
filteredTable.toIndexedSeq(util.Random.nextInt(filteredTable.size))._2.head
+        newGenes :+= current
+      } else {
+        stop = true
+      }
+    }
+
+    Chromosome(father.conf, newGenes, father.conditions, father.topOutputSet)
+  }
+}
+
+/**
+ * A sequence of genes(each represents a single relation) which represents
+ * a joined plan with determined join order.
+ */
+case class Chromosome(
+    conf: SQLConf,
+    basicPlans: Seq[JoinPlan],
+    conditions: Set[Expression],
+    topOutputSet: AttributeSet) {
+
+  lazy val fitness: Double = evalFitness(integratedPlan)
+
+  lazy val integratedPlan: Option[JoinPlan] = makePlan
+
+  private def makePlan: Option[JoinPlan] = {
+    val semiFinished = mutable.Buffer[JoinPlan]()
+    basicPlans.foreach(mergeSemi(semiFinished, _))
+    if (semiFinished.head.itemIds.size == basicPlans.size) {
+      Some(semiFinished.head)
+    } else {
+      None
+    }
+  }
+
+  private def mergeSemi(semiFinished: mutable.Buffer[JoinPlan], right: 
JoinPlan): Unit = {
+    val filters = None: Option[JoinGraphInfo]
+    for (left <- semiFinished) {
+      JoinReorderUtils.buildJoin(left, right, conf, conditions, topOutputSet, 
filters) match {
+        case Some(joined) =>
+          semiFinished.remove(semiFinished.indexOf(left))
+          mergeSemi(semiFinished, joined)
+        case _ =>
+          None
+      }
+    }
+
+    if (semiFinished.isEmpty || right.itemIds.size == 1) {
+      semiFinished.append(right)
+      return
+    }
+
+    insertPlan(semiFinished, right)
+  }
+
+  private def insertPlan(semiFinished: mutable.Buffer[JoinPlan], plan: 
JoinPlan): Unit = {
+    var criticalSize = if (semiFinished.head.itemIds.size > plan.itemIds.size) 
{
+      semiFinished.head.itemIds.size
+    } else {
+      plan.itemIds.size
+    }
+    var criticalIndex = 0
+    var break: Boolean = false
+    for (p <- semiFinished if !break) {
+      if (plan.itemIds.size > p.itemIds.size && plan.itemIds.size <= 
criticalSize) {
+        break = true
+      } else {
+        criticalIndex += 1
+        criticalSize = p.itemIds.size
+      }
+    }
+
+    semiFinished.insert(criticalIndex, plan)
+  }
+
+  private def evalFitness(plan: Option[JoinPlan]): Double = {
+    plan match {
+      case Some(joinPlan) =>
+        // We use the negative cost as fitness.
+        - joinPlan.planCost.card.toDouble * conf.joinReorderCardWeight -
+            joinPlan.planCost.size.toDouble * (1 - conf.joinReorderCardWeight)
+      case _ =>
+        - Double.MaxValue
+    }
+  }
+}
+
+/**
+ * A live space which has multi individuals(represented by [[Chromosome]]).
+ * The population can evolve, generation by generation. The child individual
+ * is generated by the [[Crossover]] procedure from its parents.
+ */
+object Population {
+  def apply(
+      conf: SQLConf,
+      itemsMap: Map[Int, JoinPlan],
+      conditions: Set[Expression],
+      topOutputSet: AttributeSet) : Population = {
+
+    val chromos: Seq[Chromosome] = Seq.fill(determinePopSize(conf, 
itemsMap.size)) {
+      Chromosome(conf, shuffle(itemsMap), conditions, topOutputSet)
+    }
+
+    Population(conf, chromos)
+  }
+
+  private def determinePopSize(conf: SQLConf, numRelations: Int): Int = {
+    val relaxFactor = conf.joinReorderGARelaxFactor
+    // The default population size:
+    // # of relations | pop size (RF=3) | pop size (RF=3.5)| pop size  (RF=4)
+    //  < 13          |   DP based      |   DP based       |   DP based
+    //    13          |   20            |   16 (13<16)     |   16
+    //    14          |   25            |   16             |   16
+    //    15          |   32            |   19             |   16
+    //    16          |   40            |   23             |   16
+    //    17          |   50            |   28             |   19
+    //    18          |   64            |   35             |   22
+    //    19          |   80            |   43             |   26
+    //    20          |   101           |   52             |   32
+    //    21          |   128           |   64             |   38
+    //    22          |   128           |   78             |   45
+    //    23          |   128           |   95             |   53
+    //    24          |   128           |   115            |   64
+    //    25          |   128           |   128            |   76
+    //    26          |   128           |   128            |   90
+    //    27          |   128           |   128            |   90
+    //    28          |   128           |   128            |   128
+    //  > 28          |   128           |   128            |   128
+    val size = math.pow(2.0, numRelations / relaxFactor)
+    val max = conf.joinReorderGAMaxPoPSize
+    val min = conf.joinReorderGAMinPoPSize
+
+    math.ceil(math.max(math.min(max, size), min)).toInt
+  }
+
+  private def shuffle(itemsMap: Map[Int, JoinPlan]) : Seq[JoinPlan] = {
+    util.Random.shuffle(itemsMap.values).toSeq
+  }
+}
+
+case class Population(conf: SQLConf, chromos: Seq[Chromosome]) extends Logging 
{
+
+  def evolve: Population = {
+    // Sort chromos in the population first.
+    var tempChromos = chromos.sortWith((left, right) => left.fitness > 
right.fitness)
+    // Begin iteration.
+    val generations = chromos.size
+    for (i <- 1 to generations) {
+      val father = JoinReorderUtils.select(conf, tempChromos, None)
+      val mother = JoinReorderUtils.select(conf, tempChromos, Some(father))
+      val kid = EdgeRecombination.newChromo(father, mother)
+      tempChromos = putToPop(kid, tempChromos)
+      logDebug(s"Iteration $i, fitness for kid: ${kid.fitness}," +
+          s" and Fitness for plans: ${ tempChromos.map(c => c.fitness)}")
+    }
+    Population(conf, tempChromos)
+  }
+
+  private def putToPop(kid: Chromosome, chromos: Seq[Chromosome]): 
Seq[Chromosome] = {
+    val tmp = mutable.Buffer[Chromosome](chromos: _*)
+    val index = chromos.indexWhere(_.fitness < kid.fitness)
+    if (index >= 0) {
+      tmp.insert(index, kid)
+      tmp.remove(tmp.size - 1)
+    }
+    Seq(tmp: _*)
+  }
+}
+
+object JoinReorderUtils extends PredicateHelper {
+
+  /**
+   * Builds a new JoinPlan if the following conditions hold:
+   * - the sets of items contained in left and right sides do not overlap.
+   * - there exists at least one join condition involving references from both 
sides.
+   * - if star-join filter is enabled, allow the following combinations:
+   *         1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
+   *         2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
+   *         3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+   *
+   * @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
+   * @param otherJoinPlan The other side JoinPlan for building a new join node.
+   * @param conf SQLConf for statistics computation.
+   * @param conditions The overall set of join conditions.
+   * @param topOutput The output attributes of the final plan.
+   * @param filters Join graph info to be used as filters by the search 
algorithm.
+   * @return Builds and returns a new JoinPlan if both conditions hold. 
Otherwise, returns None.
+   */
+  private[optimizer] def buildJoin(
+      oneJoinPlan: JoinPlan,
+      otherJoinPlan: JoinPlan,
+      conf: SQLConf,
+      conditions: Set[Expression],
+      topOutput: AttributeSet,
+      filters: Option[JoinGraphInfo]): Option[JoinPlan] = {
+
+    if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
+      // Should not join two overlapping item sets.
+      return None
+    }
+
+    if (filters.isDefined) {
+      // Apply star-join filter, which ensures that tables in a star schema 
relationship
+      // are planned together. The star-filter will eliminate joins among star 
and non-star
+      // tables until the star joins are built. The following combinations are 
allowed:
+      // 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join
+      // 2. star-join is a subset of (oneJoinPlan U otherJoinPlan)
+      // 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+      val isValidJoinCombination =
+      JoinReorderDPFilters.starJoinFilter(oneJoinPlan.itemIds, 
otherJoinPlan.itemIds,
+        filters.get)
+      if (!isValidJoinCombination) return None
+    }
+
+    val onePlan = oneJoinPlan.plan
+    val otherPlan = otherJoinPlan.plan
+    val joinConds = conditions
+        .filterNot(l => canEvaluate(l, onePlan))
+        .filterNot(r => canEvaluate(r, otherPlan))
+        .filter(e => e.references.subsetOf(onePlan.outputSet ++ 
otherPlan.outputSet))
+    if (joinConds.isEmpty) {
+      // Cartesian product is very expensive, so we exclude them from 
candidate plans.
+      // This also significantly reduces the search space.
+      return None
+    }
+
+    // Put the deeper side on the left, tend to build a left-deep tree.
+    val (left, right) = if (oneJoinPlan.itemIds.size >= 
otherJoinPlan.itemIds.size) {
+      (onePlan, otherPlan)
+    } else {
+      (otherPlan, onePlan)
+    }
+    val newJoin = Join(left, right, Inner, joinConds.reduceOption(And), 
JoinHint.NONE)
+    val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ 
otherJoinPlan.joinConds
+    val remainingConds = conditions -- collectedJoinConds
+    val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ 
topOutput
+    val neededFromNewJoin = newJoin.output.filter(neededAttr.contains)
+    val newPlan =
+      if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) {
+        Project(neededFromNewJoin, newJoin)
+      } else {
+        newJoin
+      }
+
+    val itemIds = oneJoinPlan.itemIds.union(otherJoinPlan.itemIds)
+    // Now the root node of onePlan/otherPlan becomes an intermediate join (if 
it's a non-leaf
+    // item), so the cost of the new join should also include its own cost.
+    val newPlanCost = oneJoinPlan.planCost + oneJoinPlan.rootCost(conf) +
+        otherJoinPlan.planCost + otherJoinPlan.rootCost(conf)
+    Some(JoinPlan(itemIds, newPlan, collectedJoinConds, newPlanCost))
+  }
+
+  /**
+   * Select a [[Chromosome]] randomly from the given chromosomes set. 
Chromosomes those in
+   * excludes will not be selected.
+   * The selection is biased, i.e., some chromosomes will be preferred 
compared to others.
+   *
+   * @param conf SQLConf for parameters that control the selection
+   * @param chromos candidates to be selected.
+   * @param exclude chromosome that will be excluded in the selection.
+   * @return the selected chromosome.
+   */
+  def select(conf: SQLConf, chromos: Seq[Chromosome], exclude: 
Option[Chromosome]): Chromosome = {
 
 Review comment:
   Sorry for not being clear. This function is a _JoinReorderGA_ util instead 
of a general `JoinReorderUtils`, so let's put it in `object JoinReorderGA`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to