[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20816 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r178328704 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt --- End diff -- So what would be the benefit of keeping that unchanged? To me, it would make the code look confusing. And in theory the two parts (1. infer `newConditionOpt`; 2. infer `newLeftOp` or `newRightOpt`) will be unsynchronized, leaving part 2 always one iteration behind part 1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r178328204 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val inferredConstraints = left.getRelevantConstraints( +left.constraints + .union(right.constraints) + .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) + val newFilters = inferredConstraints +.filterNot(left.constraints.contains) +.reduceLeftOption(And) + newFilters.map(Filter(_, left)) +case _ => None + } + val newRightOpt = joinType match { +case LeftOuter if newConditionOpt.isDefined => + val inferredConstraints = right.getRelevantConstraints( +right.constraints + .union(left.constraints) + .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) + val newFilters = inferredConstraints +.filterNot(right.constraints.contains) +.reduceLeftOption(And) + newFilters.map(Filter(_, right)) +case _ => None + } + + if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt)) --- End diff -- I guess they do. Only that when `conditionOpt` is not empty and `additionalConstraints` is empty, there will be unnecessary operations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r178322820 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt --- End diff -- Can we keep this unchanged? We just `conditionOpt` in line 681, 685, 693, and 697? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r178322088 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val inferredConstraints = left.getRelevantConstraints( +left.constraints + .union(right.constraints) + .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) + val newFilters = inferredConstraints +.filterNot(left.constraints.contains) +.reduceLeftOption(And) + newFilters.map(Filter(_, left)) +case _ => None + } + val newRightOpt = joinType match { +case LeftOuter if newConditionOpt.isDefined => + val inferredConstraints = right.getRelevantConstraints( +right.constraints + .union(left.constraints) + .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) + val newFilters = inferredConstraints +.filterNot(right.constraints.contains) +.reduceLeftOption(And) + newFilters.map(Filter(_, right)) +case _ => None + } + + if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt)) --- End diff -- If we change this to `if (newConditionOpt.isDefined || newLeft.isDefined || newRight.isDefined)`, do all the tests can pass? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r178187202 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val inferredConstraints = left.getRelevantConstraints( +left.constraints + .union(right.constraints) + .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) + val newFilters = inferredConstraints +.filterNot(left.constraints.contains) +.reduceLeftOption(And) + newFilters.map(Filter(_, left)) +case _ => None + } + val newRightOpt = joinType match { +case LeftOuter if newConditionOpt.isDefined => + val inferredConstraints = right.getRelevantConstraints( +right.constraints + .union(left.constraints) + .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) + val newFilters = inferredConstraints +.filterNot(right.constraints.contains) +.reduceLeftOption(And) + newFilters.map(Filter(_, right)) +case _ => None + } + + if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt)) --- End diff -- ``` val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } ``` So here, if `conditionOpt` is matched "None" and meanwhile `additionalConstraints` is empty, I assume `newConditionOpt` and `conditionOpt` will both be an empty Opt, but reference comparison `ne` will return false. Since this is part of the original InferFilterFromConstraints logic, and I only modified it so as to make `newConditionOpt` work for the rest of the function (the new logic added), I assume it has already been covered by the existing tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r178116089 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val inferredConstraints = left.getRelevantConstraints( +left.constraints + .union(right.constraints) + .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) + val newFilters = inferredConstraints +.filterNot(left.constraints.contains) +.reduceLeftOption(And) + newFilters.map(Filter(_, left)) +case _ => None + } + val newRightOpt = joinType match { +case LeftOuter if newConditionOpt.isDefined => + val inferredConstraints = right.getRelevantConstraints( +right.constraints + .union(left.constraints) + .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) + val newFilters = inferredConstraints +.filterNot(right.constraints.contains) +.reduceLeftOption(And) + newFilters.map(Filter(_, right)) +case _ => None + } + + if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt)) --- End diff -- Do you have a test case in which `newConditionOpt ne conditionOpt` and `newConditionOpt.isDefined` are not true at the same time? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r175864663 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val rightConstraints = right.constraints.union( +splitConjunctivePredicates(newConditionOpt.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints --- End diff -- I made another commit yesterday. How's it looking now? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r175580746 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val rightConstraints = right.constraints.union( +splitConjunctivePredicates(newConditionOpt.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints --- End diff -- Let us leave `allConstraints ` untouched. We should avoid the extra code changes in this PR. We need to use `conf.constraintPropagationEnabled` for the extra constraints introduced by this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r175578919 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val rightConstraints = right.constraints.union( +splitConjunctivePredicates(newConditionOpt.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints +.filter(_.deterministic) +.filter(_.references.subsetOf(left.outputSet)) --- End diff -- Also filter out `constraint.references.isEmpty` in the `getRelevantConstraints` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r175578334 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val rightConstraints = right.constraints.union( +splitConjunctivePredicates(newConditionOpt.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints --- End diff -- yeah, let us use `ExpressionSet ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r175577945 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val rightConstraints = right.constraints.union( +splitConjunctivePredicates(newConditionOpt.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints --- End diff -- This is my proposal ```Scala val leftConstraints = left.getRelevantConstraints( left.constraints .union(right.constraints) .union(splitConjunctivePredicates(conditionOpt.get).toSet)) val newFilters = reduceConjunctivePredicates(leftConstraints.toSeq) .filterNot(left.constraints.contains) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r175550955 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val rightConstraints = right.constraints.union( +splitConjunctivePredicates(newConditionOpt.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints --- End diff -- I had the same inclination of wrapping the evaluation of `allConstraints` in a helper function too, till I realized that `constructIsNotNullConstraints` would depend on the LogicalPlan node in addition to the input constraints. `constructIsNotNullConstraints` has two parts, one being to deduce is-not-null from null-intolerant expressions, the other to deduce is-null-not from attribute nullability. We could reorganize these pieces into new methods, but I feel like we should wait till we find an actual usage so to figure out what needs to be included in the helper function. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r175334935 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val rightConstraints = right.constraints.union( +splitConjunctivePredicates(newConditionOpt.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints --- End diff -- I see. I am proposing to add a new helper function `getRelevantConstraints ` in the trait `QueryPlanConstraints`. I would suggest to ignore what we are doing for this specific case. That function could be called in the other cases in the future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r175330576 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val rightConstraints = right.constraints.union( +splitConjunctivePredicates(newConditionOpt.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints --- End diff -- I think the `constructIsNotNullConstraints` logic does not deal with the "transitive" constraints so we not need to include it here. Instead the "isNotNull" deduction for inferred filters on the null-supplying side is guaranteed by 2 things here: 1) when getting constraints from the preserved side, `constructIsNotNullConstraints` has already been called and will be carried over by `inferAdditionalConstraints` to the null-supplying side; 2) the Filter matching part of `InferFiltersFromConstraints`. That said, I'm good with the name `getRelevantConstraints` too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r175324999 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val rightConstraints = right.constraints.union( +splitConjunctivePredicates(newConditionOpt.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints --- End diff -- We need to also union `constructIsNotNullConstraints`. Thus, let us use `getRelevantConstraints ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r175184304 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val rightConstraints = right.constraints.union( +splitConjunctivePredicates(newConditionOpt.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints --- End diff -- I was too thinking to give it another name since it's public now. How about `getInferredConstraints`? I'm good either way, though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r175154877 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { +case RightOuter if newConditionOpt.isDefined => + val rightConstraints = right.constraints.union( +splitConjunctivePredicates(newConditionOpt.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints --- End diff -- can we call `getRelevantConstraints`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20816#discussion_r174591237 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -673,7 +676,48 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + val j = if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + + // Infer filter for left/right outer joins + joinType match { +case RightOuter if j.condition.isDefined => + val rightConstraints = right.constraints.union( +splitConjunctivePredicates(j.condition.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints +.filter(_.deterministic) +.filter(_.references.subsetOf(left.outputSet)) + if (leftConditions.isEmpty) { +j + } else { +// push the predicate down to left side sub query. +val newLeft = leftConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) +val newRight = right + +Join(newLeft, newRight, RightOuter, j.condition) + } +case LeftOuter if j.condition.isDefined => + val leftConstraints = left.constraints.union( +splitConjunctivePredicates(j.condition.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(leftConstraints)) + val rightConditions = inferredConstraints +.filter(_.deterministic) +.filter(_.references.subsetOf(right.outputSet)) + if (rightConditions.isEmpty) { +j + } else { +// push the predicate down to right side sub query. +val newRight = rightConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) +val newLeft = left + +Join(newLeft, newRight, LeftOuter, j.condition) + } +case _ => j + } --- End diff -- Could you simplify the code to something like? ```Scala val newConditionOpt = ... val newRight = joinType match { ... } val newLeft = joinType match { ... } if (newConditionOpt.isDefined || newLeft.isDefined || newRight.isDefined) { Join(newLeft.getOrElse(left), newRight.getOrElse(right), joinType, newConditionOpt.orElse(conditionOpt)) } else { join } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org