Github user jeanlyn commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5717#discussion_r32891298
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
    @@ -82,86 +130,169 @@ case class SortMergeJoin(
     
             override final def next(): InternalRow = {
               if (hasNext) {
    -            // we are using the buffered right rows and run down left 
iterator
    -            val joinedRow = joinRow(leftElement, 
rightMatches(rightPosition))
    -            rightPosition += 1
    -            if (rightPosition >= rightMatches.size) {
    -              rightPosition = 0
    -              fetchLeft()
    -              if (leftElement == null || keyOrdering.compare(leftKey, 
matchKey) != 0) {
    -                stop = false
    -                rightMatches = null
    +            if (bufferedMatches == null || bufferedMatches.size == 0) {
    +              // we just found a row with no join match and we are here to 
produce a row
    +              // with this row and a standard null row from the other side.
    +              if (continueStreamed) {
    +                val joinedRow = smartJoinRow(streamedElement, 
bufferedNullRow)
    +                fetchStreamed()
    +                joinedRow
    +              } else {
    +                val joinedRow = smartJoinRow(streamedNullRow, 
bufferedElement)
    +                fetchBuffered()
    +                joinedRow
    +              }
    +            } else {
    +              // we are using the buffered right rows and run down left 
iterator
    +              val joinedRow = smartJoinRow(streamedElement, 
bufferedMatches(bufferedPosition))
    +              bufferedPosition += 1
    +              if (bufferedPosition >= bufferedMatches.size) {
    +                bufferedPosition = 0
    +                if (joinType != FullOuter || secondStreamedElement == 
null) {
    +                  fetchStreamed()
    +                  if (streamedElement == null || 
keyOrdering.compare(streamedKey, matchKey) != 0) {
    +                    stop = false
    +                    bufferedMatches = null
    +                  }
    +                } else {
    +                  // in FullOuter join and the first time we finish the 
match buffer,
    +                  // we still want to generate all rows with streamed null 
row and buffered
    +                  // rows that match the join key but not the conditions.
    +                  streamedElement = secondStreamedElement
    +                  bufferedMatches = secondBufferedMatches
    +                  secondStreamedElement = null
    +                  secondBufferedMatches = null
    +                }
                   }
    +              joinedRow
                 }
    -            joinedRow
               } else {
                 // no more result
                 throw new NoSuchElementException
               }
             }
     
    -        private def fetchLeft() = {
    -          if (leftIter.hasNext) {
    -            leftElement = leftIter.next()
    -            leftKey = leftKeyGenerator(leftElement)
    +        private def smartJoinRow(streamedRow: InternalRow, bufferedRow: 
InternalRow): InternalRow =
    +          joinType match {
    +            case RightOuter => joinRow(bufferedRow, streamedRow)
    +            case _ => joinRow(streamedRow, bufferedRow)
    +          }
    +
    +        private def fetchStreamed(): Unit = {
    +          if (streamedIter.hasNext) {
    +            streamedElement = streamedIter.next()
    +            streamedKey = streamedKeyGenerator(streamedElement)
               } else {
    -            leftElement = null
    +            streamedElement = null
               }
             }
     
    -        private def fetchRight() = {
    -          if (rightIter.hasNext) {
    -            rightElement = rightIter.next()
    -            rightKey = rightKeyGenerator(rightElement)
    +        private def fetchBuffered(): Unit = {
    +          if (bufferedIter.hasNext) {
    +            bufferedElement = bufferedIter.next()
    +            bufferedKey = bufferedKeyGenerator(bufferedElement)
               } else {
    -            rightElement = null
    +            bufferedElement = null
               }
             }
     
             private def initialize() = {
    -          fetchLeft()
    -          fetchRight()
    +          fetchStreamed()
    +          fetchBuffered()
             }
     
             /**
              * Searches the right iterator for the next rows that have matches 
in left side, and store
              * them in a buffer.
    +         * When this is not a Inner join, we will also return true when we 
get a row with no match
    +         * on the other side. This search will jump out every time from 
the same position until
    +         * `next()` is called.
    +         * Unless we call `next()`, this function can be called multiple 
times, with the same
    +         * return value and result as running it once, since we have set 
guardians in it.
              *
              * @return true if the search is successful, and false if the 
right iterator runs out of
              *         tuples.
              */
             private def nextMatchingPair(): Boolean = {
    -          if (!stop && rightElement != null) {
    -            // run both side to get the first match pair
    -            while (!stop && leftElement != null && rightElement != null) {
    -              val comparing = keyOrdering.compare(leftKey, rightKey)
    +          if (!stop && streamedElement != null) {
    +            // step 1: run both side to get the first match pair
    +            while (!stop && streamedElement != null && bufferedElement != 
null) {
    +              val comparing = keyOrdering.compare(streamedKey, bufferedKey)
                   // for inner join, we need to filter those null keys
    -              stop = comparing == 0 && !leftKey.anyNull
    -              if (comparing > 0 || rightKey.anyNull) {
    -                fetchRight()
    -              } else if (comparing < 0 || leftKey.anyNull) {
    -                fetchLeft()
    +              stop = comparing == 0 && !streamedKey.anyNull
    +              if (comparing > 0 || bufferedKey.anyNull) {
    +                if (joinType == FullOuter) {
    +                  // the join type is full outer and the buffered side has 
a row with no
    +                  // join match, so we have a result row with streamed 
null with buffered
    +                  // side as this row. Then we fetch next buffered element 
and go back.
    +                  continueStreamed = false
    +                  return true
    +                } else {
    +                  fetchBuffered()
    +                }
    +              } else if (comparing < 0 || streamedKey.anyNull) {
    +                if (joinType == Inner) {
    +                  fetchStreamed()
    +                } else {
    +                  // the join type is not inner and the streamed side has 
a row with no
    +                  // join match, so we have a result row with this 
streamed row with buffered
    +                  // null row. Then we fetch next streamed element and go 
back.
    +                  continueStreamed = true
    +                  return true
    +                }
                   }
                 }
    -            rightMatches = new CompactBuffer[InternalRow]()
    +            // step 2: run down the buffered side to put all matched rows 
in a buffer
    +            bufferedMatches = new CompactBuffer[InternalRow]()
    +            secondBufferedMatches = new CompactBuffer[InternalRow]()
                 if (stop) {
                   stop = false
                   // iterate the right side to buffer all rows that matches
                   // as the records should be ordered, exit when we meet the 
first that not match
    -              while (!stop && rightElement != null) {
    -                rightMatches += rightElement
    -                fetchRight()
    -                stop = keyOrdering.compare(leftKey, rightKey) != 0
    +              while (!stop) {
    +                if (boundCondition(joinRow(streamedElement, 
bufferedElement))) {
    --- End diff --
    
    May be we also need to keep the row when key match but `boundCondition ` 
return false for inner join.The example like above 
    ```sql
    select a.key,b.key,a.value-b.value from a join b on a.key=b.key and a.value 
- b.value > 1
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to