Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19452#discussion_r144708446 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -349,14 +350,35 @@ case class StreamingSymmetricHashJoinExec( /** * Internal helper class to consume input rows, generate join output rows using other sides * buffered state rows, and finally clean up this sides buffered state rows + * + * @param joinSide The JoinSide - either left or right. + * @param inputAttributes The input attributes for this side of the join. + * @param joinKeys The join keys. + * @param inputIter The iterator of input rows on this side to be joined. + * @param preJoinFilterExpr A filter over rows on this side. This filter rejects rows that could + * never pass the overall join condition no matter what other side row + * they're joined with. + * @param postJoinFilterExpr A filter over joined rows. This filter completes the application of + * the overall join condition, assuming that preJoinFilter on both sides + * of the join has already been passed. + * @param stateWatermarkPredicate The state watermark predicate. See + * [[StreamingSymmetricHashJoinExec]] for further description of + * state watermarks. */ private class OneSideHashJoiner( joinSide: JoinSide, inputAttributes: Seq[Attribute], joinKeys: Seq[Expression], inputIter: Iterator[InternalRow], + preJoinFilterExpr: Option[Expression], + postJoinFilterExpr: Option[Expression], stateWatermarkPredicate: Option[JoinStateWatermarkPredicate]) { + // Filter the joined rows based on the given condition. + val preJoinFilter = + newPredicate(preJoinFilterExpr.getOrElse(Literal(true)), inputAttributes).eval _ + val postJoinFilter = newPredicate(postJoinFilterExpr.getOrElse(Literal(true)), output).eval _ --- End diff -- this is incorrect. the schema os the rows on which this filter will be applied is `left.output ++ right.output`. You need to apply another projection to put the JoinedRow in an UnsafeRow of the schema `output`.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org