Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193735968 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { if (joinKeys.nonEmpty) { val (leftKeys, rightKeys) = joinKeys.unzip - logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys") - Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) + // Find any simple range expressions between two columns + // (and involving only those two columns) of the two tables being joined, + // which are not used in the equijoin expressions, + // and which can be used for secondary sort optimizations. + // rangePreds will contain the original expressions to be filtered out later. + val rangePreds: mutable.Set[Expression] = mutable.Set.empty + var rangeConditions: Seq[BinaryComparison] = + if (SQLConf.get.useSmjInnerRangeOptimization) { + otherPredicates.flatMap { + case p@LessThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map { + case true => rangePreds.add(p); GreaterThan(r, l) + case false => rangePreds.add(p); p + } + case p@LessThanOrEqual(l, r) => + checkRangeConditions(l, r, left, right, joinKeys).map { + case true => rangePreds.add(p); GreaterThanOrEqual(r, l) + case false => rangePreds.add(p); p + } + case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map { + case true => rangePreds.add(p); LessThan(r, l) + case false => rangePreds.add(p); p + } + case p@GreaterThanOrEqual(l, r) => + checkRangeConditions(l, r, left, right, joinKeys).map { + case true => rangePreds.add(p); LessThanOrEqual(r, l) + case false => rangePreds.add(p); p + } + case _ => None + } + } else { + Nil + } + + // Only using secondary join optimization when both lower and upper conditions + // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x) + if(rangeConditions.size != 2 || + // Looking for one < and one > comparison: + rangeConditions.filter(x => x.isInstanceOf[LessThan] || + x.isInstanceOf[LessThanOrEqual]).size == 0 || + rangeConditions.filter(x => x.isInstanceOf[GreaterThan] || + x.isInstanceOf[GreaterThanOrEqual]).size == 0 || + // Check if both comparisons reference the same columns: + rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 || + rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) { + logDebug("Inner range optimization conditions not met. Clearing range conditions") + rangeConditions = Nil + rangePreds.clear() + } + + Some((joinType, leftKeys, rightKeys, rangeConditions, + otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right)) } else { None } case _ => None } + + /** + * Checks if l and r are valid range conditions: + * - l and r expressions should both contain a single reference to one and the same column. + * - the referenced column should not be part of joinKeys + * If these conditions are not met, the function returns None. + * + * Otherwise, the function checks if the left plan contains l expression and the right plan + * contains r expression. If the expressions need to be switched, the function returns Some(true) + * and Some(false) otherwise. + */ + private def checkRangeConditions(l : Expression, r : Expression, + left : LogicalPlan, right : LogicalPlan, + joinKeys : Seq[(Expression, Expression)]) = { --- End diff -- For clarity add a return type to this method. `Option[Boolean]`?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org