yeshengm commented on a change in pull request #24983: [SPARK-27714][SQL][CBO] Support Genetic Algorithm based join reorder URL: https://github.com/apache/spark/pull/24983#discussion_r312276641
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala ########## @@ -470,3 +397,451 @@ object JoinReorderDPFilters extends PredicateHelper { * extended with the set of connected/unconnected plans. */ case class JoinGraphInfo (starJoins: Set[Int], nonStarJoins: Set[Int]) + +/** + * Reorder the joins using a genetic algorithm. The algorithm treat the reorder problem + * to a traveling salesmen problem, and use genetic algorithm give an optimized solution. + * + * The implementation refs the geqo in postgresql, which is contibuted by Darrell Whitley: + * https://www.postgresql.org/docs/9.1/geqo-pg-intro.html + * + * For more info about genetic algorithm and the edge recombination crossover, pls see: + * "A Genetic Algorithm Tutorial, Darrell Whitley" + * https://link.springer.com/article/10.1007/BF00175354 + * and "Scheduling Problems and Traveling Salesmen: The Genetic Edge Recombination Operator, + * Darrell Whitley et al." https://dl.acm.org/citation.cfm?id=657238 + * respectively. + */ +object JoinReorderGA extends PredicateHelper with Logging { + + def search( + conf: SQLConf, + items: Seq[LogicalPlan], + conditions: Set[Expression], + output: Seq[Attribute]): Option[LogicalPlan] = { + + val startTime = System.nanoTime() + + val itemsWithIndex = items.zipWithIndex.map { + case (plan, id) => id -> JoinPlan(Set(id), plan, Set.empty, Cost(0, 0)) + }.toMap + + val topOutputSet = AttributeSet(output) + + val pop = Population(conf, itemsWithIndex, conditions, topOutputSet).evolve + + val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000) + logInfo(s"Join reordering finished. Duration: $durationInMs ms, number of items: " + + s"${items.length}, number of plans in memo: ${ pop.chromos.size}") + + assert(pop.chromos.head.basicPlans.size == items.length) + pop.chromos.head.integratedPlan match { + case Some(joinPlan) => joinPlan.plan match { + case p @ Project(projectList, _: Join) if projectList != output => + assert(topOutputSet == p.outputSet) + // Keep the same order of final output attributes. + Some(p.copy(projectList = output)) + case finalPlan if !sameOutput(finalPlan, output) => + Some(Project(output, finalPlan)) + case finalPlan => + Some(finalPlan) + } + case _ => None + } + } +} + +/** + * A pair of parent individuals can breed a child with certain crossover process. + * With crossover, child can inherit gene from its parents, and these gene snippets + * finally compose a new [[Chromosome]]. + */ +@DeveloperApi +trait Crossover { + + /** + * Generate a new [[Chromosome]] from the given parent [[Chromosome]]s, + * with this crossover algorithm. + */ + def newChromo(father: Chromosome, mother: Chromosome) : Chromosome +} + +/** + * This class implements the Genetic Edge Recombination algorithm. The algorithm generates + * a new traveling path by choosing the edges in certain order from the parents' edge table, + * where the edge table contains links of vertexes of the parents' traveling paths. + * + * Here's an example. Suppose we have two traveling paths on a graph, + * I, [A B C D E F] + * II, [B D C A E F] + * + * The algorithm works as follows, + * 1. find the links of each vertex, then we have an 'Edge Table' + * A: B F C E + * B: A C D F + * C: B D A + * D: C E B + * E: D F A + * F: A E B + * 2. from one vertex, say A, choose one of it's neighbours as it's new neighbour, say F, we have + * {A F} + * 3. then choose one from F's neighbours(note that vertex that has been chosen should not + * be chosen again), + * {A F E} + * 4. go on, + * {A F E D} + * 5. go on, + * {A F E D B} + * 6. go on, + * {A F E D B C} + * Note if the procedure ends before all of vertexes been chosen, start from another vertex that + * has not been chosen, and go on the procedure. + * + * For more information about the Genetic Edge Recombination, + * see "Scheduling Problems and Traveling Salesmen: The Genetic Edge + * Recombination Operator" by Darrell Whitley et al. + * https://dl.acm.org/citation.cfm?id=657238 + */ +object EdgeRecombination extends Crossover { + + def genEdgeTable(father: Chromosome, mother: Chromosome) : Map[JoinPlan, Seq[JoinPlan]] = { + val fatherTable = father.basicPlans.map(g => g -> findNeighbours(father.basicPlans, g)).toMap + val motherTable = mother.basicPlans.map(g => g -> findNeighbours(mother.basicPlans, g)).toMap + + fatherTable.map(entry => entry._1 -> (entry._2 ++ motherTable(entry._1))) + } + + def findNeighbours(genes: Seq[JoinPlan], g: JoinPlan) : Seq[JoinPlan] = { Review comment: Consider this `x.zipWithIndex.flatMap { case (v, idx) => Seq(v -> x((idx+1)%v.length), v->x((idx-1+v.length())%v.length)) }`. Definitely too long for a line, but you get the idea. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org