Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19452#discussion_r144437488 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala --- @@ -66,6 +67,68 @@ object StreamingSymmetricHashJoinHelper extends Logging { } } + /** + * Wrapper around various useful splits of the join condition. + * left AND right AND joined is equivalent to full. + * + * Note that left and right do not necessarily contain *all* conjuncts which satisfy + * their condition. Any conjuncts after the first nondeterministic one are treated as + * nondeterministic for purposes of the split. + * + * @param leftSideOnly Deterministic conjuncts which reference only the left side of the join. + * @param rightSideOnly Deterministic conjuncts which reference only the right side of the join. + * @param bothSides Conjuncts which are nondeterministic, occur after a nondeterministic conjunct, + * or reference both left and right sides of the join. + * @param full The full join condition. + */ + case class JoinConditionSplitPredicates( + leftSideOnly: Option[Expression], + rightSideOnly: Option[Expression], + bothSides: Option[Expression], + full: Option[Expression]) {} + + object JoinConditionSplitPredicates extends PredicateHelper { + def apply(condition: Option[Expression], left: SparkPlan, right: SparkPlan): + JoinConditionSplitPredicates = { + // Split the condition into 3 parts: + // * Conjuncts that can be evaluated on only the left input. + // * Conjuncts that can be evaluated on only the right input. + // * Conjuncts that require both left and right input. + // + // Note that these splits are applied in order, so the first category will end up containing + // conjuncts which depend on neither the left nor right input. + // + // Note also that nondeterministic conjuncts effectively require both left and right input. + // To maintain their semantics, they need to be evaluated exactly once per joined row. + val (leftCondition, rightCondition, joinedCondition) = { + if (condition.isEmpty) { + (None, None, None) + } else { + // Span rather than partition, because nondeterministic expressions don't commute + // across AND. --- End diff -- good!
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org