Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144708446
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
    @@ -349,14 +350,35 @@ case class StreamingSymmetricHashJoinExec(
       /**
        * Internal helper class to consume input rows, generate join output 
rows using other sides
        * buffered state rows, and finally clean up this sides buffered state 
rows
    +   *
    +   * @param joinSide The JoinSide - either left or right.
    +   * @param inputAttributes The input attributes for this side of the join.
    +   * @param joinKeys The join keys.
    +   * @param inputIter The iterator of input rows on this side to be joined.
    +   * @param preJoinFilterExpr A filter over rows on this side. This filter 
rejects rows that could
    +   *                          never pass the overall join condition no 
matter what other side row
    +   *                          they're joined with.
    +   * @param postJoinFilterExpr A filter over joined rows. This filter 
completes the application of
    +   *                           the overall join condition, assuming that 
preJoinFilter on both sides
    +   *                           of the join has already been passed.
    +   * @param stateWatermarkPredicate The state watermark predicate. See
    +   *                                [[StreamingSymmetricHashJoinExec]] for 
further description of
    +   *                                state watermarks.
        */
       private class OneSideHashJoiner(
           joinSide: JoinSide,
           inputAttributes: Seq[Attribute],
           joinKeys: Seq[Expression],
           inputIter: Iterator[InternalRow],
    +      preJoinFilterExpr: Option[Expression],
    +      postJoinFilterExpr: Option[Expression],
           stateWatermarkPredicate: Option[JoinStateWatermarkPredicate]) {
     
    +    // Filter the joined rows based on the given condition.
    +    val preJoinFilter =
    +      newPredicate(preJoinFilterExpr.getOrElse(Literal(true)), 
inputAttributes).eval _
    +    val postJoinFilter = 
newPredicate(postJoinFilterExpr.getOrElse(Literal(true)), output).eval _
    --- End diff --
    
    this is incorrect. the schema os the rows on which this filter will be 
applied is `left.output ++ right.output`. You need to apply another projection 
to put the JoinedRow in an UnsafeRow of the schema `output`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to