[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-04-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20816


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-30 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r178328704
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
--- End diff --

So what would be the benefit of keeping that unchanged? To me, it would 
make the code look confusing. And in theory the two parts (1. infer 
`newConditionOpt`; 2. infer `newLeftOp` or `newRightOpt`) will be 
unsynchronized, leaving part 2 always one iteration behind part 1.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-30 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r178328204
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val inferredConstraints = left.getRelevantConstraints(
+left.constraints
+  .union(right.constraints)
+  
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+  val newFilters = inferredConstraints
+.filterNot(left.constraints.contains)
+.reduceLeftOption(And)
+  newFilters.map(Filter(_, left))
+case _ => None
+  }
+  val newRightOpt = joinType match {
+case LeftOuter if newConditionOpt.isDefined =>
+  val inferredConstraints = right.getRelevantConstraints(
+right.constraints
+  .union(left.constraints)
+  
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+  val newFilters = inferredConstraints
+.filterNot(right.constraints.contains)
+.reduceLeftOption(And)
+  newFilters.map(Filter(_, right))
+case _ => None
+  }
+
+  if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt))
--- End diff --

I guess they do. Only that when `conditionOpt` is not empty and 
`additionalConstraints` is empty, there will be unnecessary operations.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-30 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r178322820
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
--- End diff --

Can we keep this unchanged? We just `conditionOpt` in line 681, 685, 693, 
and 697? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-30 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r178322088
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val inferredConstraints = left.getRelevantConstraints(
+left.constraints
+  .union(right.constraints)
+  
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+  val newFilters = inferredConstraints
+.filterNot(left.constraints.contains)
+.reduceLeftOption(And)
+  newFilters.map(Filter(_, left))
+case _ => None
+  }
+  val newRightOpt = joinType match {
+case LeftOuter if newConditionOpt.isDefined =>
+  val inferredConstraints = right.getRelevantConstraints(
+right.constraints
+  .union(left.constraints)
+  
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+  val newFilters = inferredConstraints
+.filterNot(right.constraints.contains)
+.reduceLeftOption(And)
+  newFilters.map(Filter(_, right))
+case _ => None
+  }
+
+  if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt))
--- End diff --

If we change this to `if (newConditionOpt.isDefined || newLeft.isDefined || 
newRight.isDefined)`, do all the tests can pass?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-29 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r178187202
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val inferredConstraints = left.getRelevantConstraints(
+left.constraints
+  .union(right.constraints)
+  
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+  val newFilters = inferredConstraints
+.filterNot(left.constraints.contains)
+.reduceLeftOption(And)
+  newFilters.map(Filter(_, left))
+case _ => None
+  }
+  val newRightOpt = joinType match {
+case LeftOuter if newConditionOpt.isDefined =>
+  val inferredConstraints = right.getRelevantConstraints(
+right.constraints
+  .union(left.constraints)
+  
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+  val newFilters = inferredConstraints
+.filterNot(right.constraints.contains)
+.reduceLeftOption(And)
+  newFilters.map(Filter(_, right))
+case _ => None
+  }
+
+  if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt))
--- End diff --

```
  val newConditionOpt = conditionOpt match {
case Some(condition) =>
  val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
case None =>
  additionalConstraints.reduceOption(And)
  }
```
So here, if `conditionOpt` is matched "None" and meanwhile 
`additionalConstraints` is empty, I assume `newConditionOpt` and `conditionOpt` 
will both be an empty Opt, but reference comparison `ne` will return false.
Since this is part of the original InferFilterFromConstraints logic, and I 
only modified it so as to make `newConditionOpt` work for the rest of the 
function (the new logic added), I assume it has already been covered by the 
existing tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r178116089
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val inferredConstraints = left.getRelevantConstraints(
+left.constraints
+  .union(right.constraints)
+  
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+  val newFilters = inferredConstraints
+.filterNot(left.constraints.contains)
+.reduceLeftOption(And)
+  newFilters.map(Filter(_, left))
+case _ => None
+  }
+  val newRightOpt = joinType match {
+case LeftOuter if newConditionOpt.isDefined =>
+  val inferredConstraints = right.getRelevantConstraints(
+right.constraints
+  .union(left.constraints)
+  
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+  val newFilters = inferredConstraints
+.filterNot(right.constraints.contains)
+.reduceLeftOption(And)
+  newFilters.map(Filter(_, right))
+case _ => None
+  }
+
+  if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt))
--- End diff --

 Do you have a test case in which `newConditionOpt ne conditionOpt` and 
`newConditionOpt.isDefined` are not true at the same time?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-20 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r175864663
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val rightConstraints = right.constraints.union(
+splitConjunctivePredicates(newConditionOpt.get).toSet)
+  val inferredConstraints = ExpressionSet(
+
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
+  val leftConditions = inferredConstraints
--- End diff --

I made another commit yesterday. How's it looking now?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r175580746
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val rightConstraints = right.constraints.union(
+splitConjunctivePredicates(newConditionOpt.get).toSet)
+  val inferredConstraints = ExpressionSet(
+
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
+  val leftConditions = inferredConstraints
--- End diff --

Let us leave `allConstraints ` untouched. We should avoid the extra code 
changes in this PR. We need to use `conf.constraintPropagationEnabled` for the 
extra constraints introduced  by this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r175578919
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val rightConstraints = right.constraints.union(
+splitConjunctivePredicates(newConditionOpt.get).toSet)
+  val inferredConstraints = ExpressionSet(
+
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
+  val leftConditions = inferredConstraints
+.filter(_.deterministic)
+.filter(_.references.subsetOf(left.outputSet))
--- End diff --

Also filter out `constraint.references.isEmpty` in the 
`getRelevantConstraints`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r175578334
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val rightConstraints = right.constraints.union(
+splitConjunctivePredicates(newConditionOpt.get).toSet)
+  val inferredConstraints = ExpressionSet(
+
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
+  val leftConditions = inferredConstraints
--- End diff --

yeah, let us use `ExpressionSet `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r175577945
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val rightConstraints = right.constraints.union(
+splitConjunctivePredicates(newConditionOpt.get).toSet)
+  val inferredConstraints = ExpressionSet(
+
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
+  val leftConditions = inferredConstraints
--- End diff --

This is my proposal
```Scala
val leftConstraints = left.getRelevantConstraints(
  left.constraints
.union(right.constraints)
.union(splitConjunctivePredicates(conditionOpt.get).toSet))
val newFilters = reduceConjunctivePredicates(leftConstraints.toSeq)
  .filterNot(left.constraints.contains)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-19 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r175550955
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val rightConstraints = right.constraints.union(
+splitConjunctivePredicates(newConditionOpt.get).toSet)
+  val inferredConstraints = ExpressionSet(
+
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
+  val leftConditions = inferredConstraints
--- End diff --

I had the same inclination of wrapping the evaluation of `allConstraints` 
in a helper function too, till I realized that `constructIsNotNullConstraints` 
would depend on the LogicalPlan node in addition to the input constraints. 
`constructIsNotNullConstraints` has two parts, one being to deduce is-not-null 
from null-intolerant expressions, the other to deduce is-null-not from 
attribute nullability. We could reorganize these pieces into new methods, but I 
feel like we should wait till we find an actual usage so to figure out what 
needs to be included in the helper function.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r175334935
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val rightConstraints = right.constraints.union(
+splitConjunctivePredicates(newConditionOpt.get).toSet)
+  val inferredConstraints = ExpressionSet(
+
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
+  val leftConditions = inferredConstraints
--- End diff --

I see. I am proposing to add a new helper function `getRelevantConstraints 
` in the trait `QueryPlanConstraints`. I would suggest to ignore what we are 
doing for this specific case. That function could be called in the other cases 
in the future. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-18 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r175330576
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val rightConstraints = right.constraints.union(
+splitConjunctivePredicates(newConditionOpt.get).toSet)
+  val inferredConstraints = ExpressionSet(
+
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
+  val leftConditions = inferredConstraints
--- End diff --

I think the `constructIsNotNullConstraints` logic does not deal with the 
"transitive" constraints so we not need to include it here. Instead the 
"isNotNull" deduction for inferred filters on the null-supplying side is 
guaranteed by 2 things here:
1) when getting constraints from the preserved side, 
`constructIsNotNullConstraints` has already been called and will be carried 
over by `inferAdditionalConstraints` to the null-supplying side;
2) the Filter matching part of `InferFiltersFromConstraints`.
That said, I'm good with the name `getRelevantConstraints` too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r175324999
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val rightConstraints = right.constraints.union(
+splitConjunctivePredicates(newConditionOpt.get).toSet)
+  val inferredConstraints = ExpressionSet(
+
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
+  val leftConditions = inferredConstraints
--- End diff --

We need to also union `constructIsNotNullConstraints`. Thus, let us use 
`getRelevantConstraints `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-16 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r175184304
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val rightConstraints = right.constraints.union(
+splitConjunctivePredicates(newConditionOpt.get).toSet)
+  val inferredConstraints = ExpressionSet(
+
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
+  val leftConditions = inferredConstraints
--- End diff --

I was too thinking to give it another name since it's public now. How about 
`getInferredConstraints`? I'm good either way, though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-16 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r175154877
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -669,11 +672,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val rightConstraints = right.constraints.union(
+splitConjunctivePredicates(newConditionOpt.get).toSet)
+  val inferredConstraints = ExpressionSet(
+
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
+  val leftConditions = inferredConstraints
--- End diff --

can we call `getRelevantConstraints`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20816: [SPARK-21479][SQL] Outer join filter pushdown in ...

2018-03-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20816#discussion_r174591237
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -673,7 +676,48 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  val j = if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+
+  // Infer filter for left/right outer joins
+  joinType match {
+case RightOuter if j.condition.isDefined =>
+  val rightConstraints = right.constraints.union(
+splitConjunctivePredicates(j.condition.get).toSet)
+  val inferredConstraints = ExpressionSet(
+
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
+  val leftConditions = inferredConstraints
+.filter(_.deterministic)
+.filter(_.references.subsetOf(left.outputSet))
+  if (leftConditions.isEmpty) {
+j
+  } else {
+// push the predicate down to left side sub query.
+val newLeft = leftConditions.
+  reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
+val newRight = right
+
+Join(newLeft, newRight, RightOuter, j.condition)
+  }
+case LeftOuter if j.condition.isDefined =>
+  val leftConstraints = left.constraints.union(
+splitConjunctivePredicates(j.condition.get).toSet)
+  val inferredConstraints = ExpressionSet(
+
QueryPlanConstraints.inferAdditionalConstraints(leftConstraints))
+  val rightConditions = inferredConstraints
+.filter(_.deterministic)
+.filter(_.references.subsetOf(right.outputSet))
+  if (rightConditions.isEmpty) {
+j
+  } else {
+// push the predicate down to right side sub query.
+val newRight = rightConditions.
+  reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
+val newLeft = left
+
+Join(newLeft, newRight, LeftOuter, j.condition)
+  }
+case _ => j
+  }
--- End diff --

Could you simplify the code to something like?

```Scala

val newConditionOpt = ... 

val newRight = joinType match {
  ...
}

val newLeft = joinType match {
  ...
}

if (newConditionOpt.isDefined || newLeft.isDefined || newRight.isDefined) {
  Join(newLeft.getOrElse(left), newRight.getOrElse(right), joinType,
newConditionOpt.orElse(conditionOpt))
} else {
  join
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org