[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user gczsjdy closed the pull request at: https://github.com/apache/spark/pull/19862 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r156581645 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java --- @@ -159,6 +154,12 @@ public boolean hasNext() { @Override public UnsafeRow next() { try { +if (!alreadyCalculated) { + while (inputIterator.hasNext()) { +insertRow(inputIterator.next()); + } + alreadyCalculated = true; +} sortedIterator.loadNext(); --- End diff -- Yes, you are right. Now I modified the `sortedIterator` after inserting rows. Due to I can only access an outer final field inside an inner class, so I used an array, is there better solution? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154635850 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- Good advice. Thx. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154568554 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java --- @@ -159,6 +154,12 @@ public boolean hasNext() { @Override public UnsafeRow next() { try { +if (!alreadyCalculated) { --- End diff -- When `hasNext` is called, doesn't `sortedIterator` return no element anymore since we haven't added rows into it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154568281 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java --- @@ -159,6 +154,12 @@ public boolean hasNext() { @Override public UnsafeRow next() { try { +if (!alreadyCalculated) { + while (inputIterator.hasNext()) { +insertRow(inputIterator.next()); + } + alreadyCalculated = true; +} sortedIterator.loadNext(); --- End diff -- `sortedIterator` is already assigned at L143. When you insert rows when first time to call `next`, can the `sortedIterator` correctly return sorted elements? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154567693 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -750,6 +756,8 @@ private[joins] class SortMergeJoinScanner( bufferedMatches.clear() false } else { + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- Add a comment like https://github.com/apache/spark/pull/19862/files#r154567168. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154567585 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java --- @@ -182,18 +183,14 @@ public UnsafeRow next() { } }; } catch (IOException e) { - cleanupResources(); throw e; +} finally { + // Since we won't ever call next() on an empty iterator, we need to clean up resources + // here in order to prevent memory leaks. + cleanupResources(); --- End diff -- This makes the resource cleaned up when we return iterator too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154567319 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -674,8 +674,9 @@ private[joins] class SortMergeJoinScanner( private[this] val bufferedMatches = new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) - // Initialization (note: do _not_ want to advance streamed here). - advancedBufferedToRowWithNullFreeJoinKey() + // Initialization (note: do _not_ want to advance streamed here). This is made lazy to prevent + // unnecessary trigger of calculation. + private lazy val advancedBufferedIterRes = advancedBufferedToRowWithNullFreeJoinKey() --- End diff -- `This is made lazy to run the initialization only once when accessing it.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154567168 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- Then add a comment like `Initialization at the first time reaching here`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154566562 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- Oh. I see. `advancedBufferedIterRes` is a lazy val. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154566463 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -700,38 +701,43 @@ private[joins] class SortMergeJoinScanner( bufferedMatches.clear() false } else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. + // The new streamed row has the same join key as the previous row, so return the same + // matches. --- End diff -- Unnecessary change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154566374 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- Once we advance both the streamed and buffered iterators, and call `bufferMatchingRows` at the last turn, it will advance buffered iterator until the `bufferedRow` doesn't match with current `streamedRowKey`. In the next turn, the call of ``advancedBufferedIterRes` here will advance buffered iterator and so the `bufferedRow` will be missed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154563897 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -674,8 +674,9 @@ private[joins] class SortMergeJoinScanner( private[this] val bufferedMatches = new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) - // Initialization (note: do _not_ want to advance streamed here). - advancedBufferedToRowWithNullFreeJoinKey() + // Initialization (note: do _not_ want to advance streamed here). This is made lazy to prevent + // unnecessary trigger of calculation. + private lazy val advancedBufferedIterRes = advancedBufferedToRowWithNullFreeJoinKey() --- End diff -- This function should be called (to try to set `BufferedRow`) before `BufferedRow` is checked, and it should be only once. This is the original requirement due to the logic. While to add this optimization, I think this is the best way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154564327 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- This advance function is only called once actually, so no `bufferedRow` will be missed. Or maybe I didn't understand your meaning? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154564488 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes + if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { --- End diff -- I agree with you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154556774 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java --- @@ -159,6 +159,12 @@ public boolean hasNext() { @Override public UnsafeRow next() { try { +if (!alreadyCalculated) { + while (inputIterator.hasNext()) { +insertRow(inputIterator.next()); + } + alreadyCalculated = true; --- End diff -- We have cleaned up resources when we have an empty iterator at L144. We should still follow it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154560155 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- To advance buffer iterator here, won't we miss the `bufferedRow` advanced before? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154558106 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes + if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { --- End diff -- This block can be excluded from this else block. It can be at the original position. We don't need to advance buffer rows too if this condition is hit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154560524 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -674,8 +674,9 @@ private[joins] class SortMergeJoinScanner( private[this] val bufferedMatches = new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) - // Initialization (note: do _not_ want to advance streamed here). - advancedBufferedToRowWithNullFreeJoinKey() + // Initialization (note: do _not_ want to advance streamed here). This is made lazy to prevent + // unnecessary trigger of calculation. + private lazy val advancedBufferedIterRes = advancedBufferedToRowWithNullFreeJoinKey() --- End diff -- Isn't it the same to simply call `advancedBufferedToRowWithNullFreeJoinKey` at needed places? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154560474 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -750,6 +756,8 @@ private[joins] class SortMergeJoinScanner( bufferedMatches.clear() false } else { + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- ditto. We may miss the bufferedRow advanced before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org