Support Genetic Algorithm based join reorder

 File path: 
 @@ -470,3 +397,427 @@ object JoinReorderDPFilters extends PredicateHelper {
  * extended with the set of connected/unconnected plans.
 case class JoinGraphInfo (starJoins: Set[Int], nonStarJoins: Set[Int])
+ * Reorder the joins using a genetic algorithm. The algorithm treat the 
reorder problem
+ * to a traveling salesmen problem, and use genetic algorithm give an 
optimized solution.
+ *
+ * The implementation refs the geqo in postgresql, which is contibuted by 
Darrell Whitley:
+ *
+ *
+ * For more info about genetic algorithm and the edge recombination crossover, 
pls see:
+ * "A Genetic Algorithm Tutorial, Darrell Whitley"
+ *
+ * and "Scheduling Problems and Traveling Salesmen: The Genetic Edge 
Recombination Operator,
+ * Darrell Whitley et al."
+ * respectively.
+ */
+object JoinReorderGA extends PredicateHelper with Logging {
+  def search(
+      conf: SQLConf,
+      items: Seq[LogicalPlan],
+      conditions: Set[Expression],
+      output: Seq[Attribute]): Option[LogicalPlan] = {
+    val startTime = System.nanoTime()
+    val itemsWithIndex = {
+      case (plan, id) => id -> JoinPlan(Set(id), plan, Set.empty, Cost(0, 0))
+    }.toMap
+    val topOutputSet = AttributeSet(output)
+    val pop = Population(conf, itemsWithIndex, conditions, topOutputSet).evolve
+    val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
+    logInfo(s"Join reordering finished. Duration: $durationInMs ms, number of 
items: " +
+        s"${items.length}, number of plans in memo: ${ pop.chromos.size}")
+    assert(pop.chromos.head.basicPlans.size == items.length)
+    pop.chromos.head.integratedPlan match {
+      case Some(joinPlan) => joinPlan.plan match {
+        case p @ Project(projectList, _: Join) if projectList != output =>
+          assert(topOutputSet == p.outputSet)
+          // Keep the same order of final output attributes.
+          Some(p.copy(projectList = output))
+        case finalPlan if !sameOutput(finalPlan, output) =>
+          Some(Project(output, finalPlan))
+        case finalPlan =>
+          Some(finalPlan)
+      }
+      case _ => None
+    }
+  }
+ * A pair of parent individuals can breed a child with certain crossover 
+ * With crossover, child can inherit gene from its parents, and these gene 
+ * finally compose a new [[Chromosome]].
+ */
+trait Crossover {
+  /**
+   * Generate a new [[Chromosome]] from the given parent [[Chromosome]]s,
+   * with this crossover algorithm.
+   */
+  def newChromo(father: Chromosome, mother: Chromosome) : Chromosome
+case class EdgeTable(table: Map[JoinPlan, Seq[JoinPlan]])
+ * This class implements the Genetic Edge Recombination algorithm.
+ * For more information about the Genetic Edge Recombination,
+ * see "Scheduling Problems and Traveling Salesmen: The Genetic Edge
+ * Recombination Operator" by Darrell Whitley et al.
+ *
+ */
+object EdgeRecombination extends Crossover {
+  def genEdgeTable(father: Chromosome, mother: Chromosome) : EdgeTable = {
+    val fatherTable = => g -> 
findNeighbours(father.basicPlans, g)).toMap
+    val motherTable = => g -> 
findNeighbours(mother.basicPlans, g)).toMap
+    EdgeTable(
+ => entry._1 -> (entry._2 ++ 
+  }
+  def findNeighbours(genes: Seq[JoinPlan], g: JoinPlan) : Seq[JoinPlan] = {
+    val genesIndexed = genes.toIndexedSeq
+    val index = genesIndexed.indexOf(g)
+    val length = genes.size
+    if (index > 0 && index < length - 1) {
+      Seq(genesIndexed(index - 1), genesIndexed(index + 1))
+    } else if (index == 0) {
+      Seq(genesIndexed(1), genesIndexed(length - 1))
+    } else if (index == length - 1) {
+      Seq(genesIndexed(0), genesIndexed(length - 2))
+    } else {
+      Seq()
+    }
+  }
+  override def newChromo(father: Chromosome, mother: Chromosome): Chromosome = 
+    var newGenes: Seq[JoinPlan] = Seq()
+    // 1. Generate the edge table.
+    var table = genEdgeTable(father, mother).table
+    // 2. Choose a start point randomly from the heads of father/mother.
+    var current =
+      if (util.Random.nextInt(2) == 0) father.basicPlans.head else 
+    newGenes :+= current
+    var stop = false
+    while (!stop) {
+      // 3. Filter out the chosen point from the edge table.
+      table =
+        entry => entry._1 -> entry._2.filter(g => if (g == current) false else 
+      )
+      // 4. Choose next point among its neighbours. The criterion for choosing 
which point
+      // is that the one who has fewest neighbours. If two or more points has 
the same num
+      // of neighbours, choose one randomly. If there's no neighbours 
available for this
+      // point but there're still other remaining points, choose one from them 
+      val tobeVisited = table(current)
+      val neighboursTable = => g -> 
+      val filteredTable = table.filter(_._2.nonEmpty)
+      if (neighboursTable.nonEmpty) {
+        val numBase = neighboursTable.head._2.size
+        var numCand = 0
+        neighboursTable.foreach(entry => if (entry._2.size == numBase) numCand 
+= 1)
+        current = neighboursTable.toIndexedSeq(util.Random.nextInt(numCand))._1
+        newGenes :+= current
+      } else if (filteredTable.nonEmpty) {
+        current = 
+        newGenes :+= current
+      } else {
+        stop = true
+      }
+    }
+    Chromosome(father.conf, newGenes, father.conditions, father.topOutputSet)
+  }
+ * A sequence of genes(each represents a single relation) which represents
+ * a joined plan with determined join order.
+ */
+case class Chromosome(
+    conf: SQLConf,
+    basicPlans: Seq[JoinPlan],
+    conditions: Set[Expression],
+    topOutputSet: AttributeSet) {
+  lazy val fitness: Double = evalFitness(integratedPlan)
+  lazy val integratedPlan: Option[JoinPlan] = makePlan
+  private def makePlan: Option[JoinPlan] = {
+    val semiFinished = mutable.Buffer[JoinPlan]()
+    basicPlans.foreach(mergeSemi(semiFinished, _))
+    if (semiFinished.head.itemIds.size == basicPlans.size) {
+      Some(semiFinished.head)
+    } else {
+      None
+    }
+  }
+  private def mergeSemi(semiFinished: mutable.Buffer[JoinPlan], right: 
JoinPlan): Unit = {
+    val filters = None: Option[JoinGraphInfo]
+    for (left <- semiFinished) {
+      JoinReorderUtils.buildJoin(left, right, conf, conditions, topOutputSet, 
filters) match {
+        case Some(joined) =>
+          semiFinished.remove(semiFinished.indexOf(left))
+          mergeSemi(semiFinished, joined)
+        case _ =>
+          None
+      }
+    }
+    if (semiFinished.isEmpty || right.itemIds.size == 1) {
+      semiFinished.append(right)
+      return
+    }
+    insertPlan(semiFinished, right)
+  }
+  private def insertPlan(semiFinished: mutable.Buffer[JoinPlan], plan: 
JoinPlan): Unit = {
+    var criticalSize = if (semiFinished.head.itemIds.size > plan.itemIds.size) 
+      semiFinished.head.itemIds.size
+    } else {
+      plan.itemIds.size
+    }
+    var criticalIndex = 0
+    var break: Boolean = false
+    for (p <- semiFinished if !break) {
+      if (plan.itemIds.size > p.itemIds.size && plan.itemIds.size <= 
criticalSize) {
+        break = true
+      } else {
+        criticalIndex += 1
+        criticalSize = p.itemIds.size
+      }
+    }
+    semiFinished.insert(criticalIndex, plan)
+  }
+  private def evalFitness(plan: Option[JoinPlan]): Double = {
+    plan match {
+      case Some(joinPlan) =>
+        // We use the negative cost as fitness.
+        - joinPlan.planCost.card.toDouble * conf.joinReorderCardWeight -
+            joinPlan.planCost.size.toDouble * (1 - conf.joinReorderCardWeight)
+      case _ =>
+        - Double.MaxValue
+    }
+  }
+ * A live space which has multi individuals(represented by [[Chromosome]]).
+ * The population can evolve, generation by generation. The child individual
+ * is generated by the [[Crossover]] procedure from its parents.
+ */
+object Population {
+  def apply(
+      conf: SQLConf,
+      itemsMap: Map[Int, JoinPlan],
+      conditions: Set[Expression],
+      topOutputSet: AttributeSet) : Population = {
+    val chromos: Seq[Chromosome] = Seq.fill(determinePopSize(conf, 
itemsMap.size)) {
+      Chromosome(conf, shuffle(itemsMap), conditions, topOutputSet)
+    }
+    Population(conf, chromos)
+  }
+  private def determinePopSize(conf: SQLConf, numRelations: Int): Int = {
+    val relaxFactor = conf.joinReorderGARelaxFactor
+    // The default population size:
+    // # of relations | pop size (RF=3) | pop size (RF=3.5)| pop size  (RF=4)
+    //  < 13          |   DP based      |   DP based       |   DP based
+    //    13          |   20            |   16 (13<16)     |   16
+    //    14          |   25            |   16             |   16
+    //    15          |   32            |   19             |   16
+    //    16          |   40            |   23             |   16
+    //    17          |   50            |   28             |   19
+    //    18          |   64            |   35             |   22
+    //    19          |   80            |   43             |   26
+    //    20          |   101           |   52             |   32
+    //    21          |   128           |   64             |   38
+    //    22          |   128           |   78             |   45
+    //    23          |   128           |   95             |   53
+    //    24          |   128           |   115            |   64
+    //    25          |   128           |   128            |   76
+    //    26          |   128           |   128            |   90
+    //    27          |   128           |   128            |   90
+    //    28          |   128           |   128            |   128
+    //  > 28          |   128           |   128            |   128
+    val size = math.pow(2.0, numRelations / relaxFactor)
+    val max = conf.joinReorderGAMaxPoPSize
+    val min = conf.joinReorderGAMinPoPSize
+    math.ceil(math.max(math.min(max, size), min)).toInt
+  }
+  private def shuffle(itemsMap: Map[Int, JoinPlan]) : Seq[JoinPlan] = {
+    util.Random.shuffle(itemsMap.values).toSeq
+  }
+case class Population(conf: SQLConf, chromos: Seq[Chromosome]) extends Logging 
+  def evolve: Population = {
+    // Sort chromos in the population first.
+    var tempChromos = chromos.sortWith((left, right) => >
+    // Begin iteration.
+    val generations = chromos.size
+    for (i <- 1 to generations) {
+      val father =, tempChromos, None)
+      val mother =, tempChromos, Some(father))
+      val kid = EdgeRecombination.newChromo(father, mother)
+      tempChromos = putToPop(kid, tempChromos)
+      logDebug(s"Iteration $i, fitness for kid: ${}," +
+          s" and Fitness for plans: ${ =>}")
+    }
+    Population(conf, tempChromos)
+  }
+  private def putToPop(kid: Chromosome, chromos: Seq[Chromosome]): 
Seq[Chromosome] = {
+    val tmp = mutable.Buffer[Chromosome](chromos: _*)
+    val index = chromos.indexWhere( <
+    if (index >= 0) {
+      tmp.insert(index, kid)
+      tmp.remove(tmp.size - 1)
+    }
+    Seq(tmp: _*)
+  }
+object JoinReorderUtils extends PredicateHelper {
+  /**
+   * Builds a new JoinPlan if the following conditions hold:
+   * - the sets of items contained in left and right sides do not overlap.
+   * - there exists at least one join condition involving references from both 
+   * - if star-join filter is enabled, allow the following combinations:
+   *         1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
+   *         2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
+   *         3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+   *
+   * @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
+   * @param otherJoinPlan The other side JoinPlan for building a new join node.
+   * @param conf SQLConf for statistics computation.
+   * @param conditions The overall set of join conditions.
+   * @param topOutput The output attributes of the final plan.
+   * @param filters Join graph info to be used as filters by the search 
+   * @return Builds and returns a new JoinPlan if both conditions hold. 
Otherwise, returns None.
+   */
+  private[optimizer] def buildJoin(
+      oneJoinPlan: JoinPlan,
+      otherJoinPlan: JoinPlan,
+      conf: SQLConf,
+      conditions: Set[Expression],
+      topOutput: AttributeSet,
+      filters: Option[JoinGraphInfo]): Option[JoinPlan] = {
+    if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
+      // Should not join two overlapping item sets.
+      return None
+    }
+    if (filters.isDefined) {
+      // Apply star-join filter, which ensures that tables in a star schema 
+      // are planned together. The star-filter will eliminate joins among star 
and non-star
+      // tables until the star joins are built. The following combinations are 
+      // 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join
+      // 2. star-join is a subset of (oneJoinPlan U otherJoinPlan)
+      // 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join
+      val isValidJoinCombination =
+      JoinReorderDPFilters.starJoinFilter(oneJoinPlan.itemIds, 
+        filters.get)
+      if (!isValidJoinCombination) return None
+    }
+    val onePlan = oneJoinPlan.plan
+    val otherPlan = otherJoinPlan.plan
+    val joinConds = conditions
+        .filterNot(l => canEvaluate(l, onePlan))
+        .filterNot(r => canEvaluate(r, otherPlan))
+        .filter(e => e.references.subsetOf(onePlan.outputSet ++ 
+    if (joinConds.isEmpty) {
+      // Cartesian product is very expensive, so we exclude them from 
candidate plans.
+      // This also significantly reduces the search space.
+      return None
+    }
+    // Put the deeper side on the left, tend to build a left-deep tree.
+    val (left, right) = if (oneJoinPlan.itemIds.size >= 
otherJoinPlan.itemIds.size) {
+      (onePlan, otherPlan)
+    } else {
+      (otherPlan, onePlan)
+    }
+    val newJoin = Join(left, right, Inner, joinConds.reduceOption(And), 
+    val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ 
+    val remainingConds = conditions -- collectedJoinConds
+    val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ 
+    val neededFromNewJoin = newJoin.output.filter(neededAttr.contains)
+    val newPlan =
+      if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) {
+        Project(neededFromNewJoin, newJoin)
+      } else {
+        newJoin
+      }
+    val itemIds = oneJoinPlan.itemIds.union(otherJoinPlan.itemIds)
+    // Now the root node of onePlan/otherPlan becomes an intermediate join (if 
it's a non-leaf
+    // item), so the cost of the new join should also include its own cost.
+    val newPlanCost = oneJoinPlan.planCost + oneJoinPlan.rootCost(conf) +
+        otherJoinPlan.planCost + otherJoinPlan.rootCost(conf)
+    Some(JoinPlan(itemIds, newPlan, collectedJoinConds, newPlanCost))
+  }
+  /**
+   * Select a [[Chromosome]] randomly from the given chromosomes set. 
Chromosomes those in
+   * excludes will not be selected.
+   * The selection is biased, i.e., some chromosomes will be preferred 
compared to others.
+   *
+   * @param conf SQLConf for parameters that control the selection
+   * @param chromos candidates to be selected.
+   * @param exclude chromosome that will be excluded in the selection.
+   * @return the selected chromosome.
+   */
+  def select(conf: SQLConf, chromos: Seq[Chromosome], exclude: 
Option[Chromosome]): Chromosome = {
 Review comment:
   Sorry for not being clear. This function is a _JoinReorderGA_ util instead 
of a general `JoinReorderUtils`, so let's put it in `object JoinReorderGA`.

