Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144437332
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
    @@ -402,17 +411,27 @@ case class StreamingSymmetricHashJoinExec(
     
           nonLateRows.flatMap { row =>
             val thisRow = row.asInstanceOf[UnsafeRow]
    -        val key = keyGenerator(thisRow)
    -        val outputIter = otherSideJoiner.joinStateManager.get(key).map { 
thatRow =>
    -          generateJoinedRow(thisRow, thatRow)
    -        }
    -        val shouldAddToState = // add only if both removal predicates do 
not match
    -          !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow)
    -        if (shouldAddToState) {
    -          joinStateManager.append(key, thisRow)
    -          updatedStateRowsCount += 1
    +        if (preJoinFilter(thisRow)) {
    --- End diff --
    
    add docs explaning what this condition is for.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to