[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2018-01-24 Thread gczsjdy
Github user gczsjdy closed the pull request at:

https://github.com/apache/spark/pull/19862


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-12 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r156581645
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 ---
@@ -159,6 +154,12 @@ public boolean hasNext() {
 @Override
 public UnsafeRow next() {
   try {
+if (!alreadyCalculated) {
+  while (inputIterator.hasNext()) {
+insertRow(inputIterator.next());
+  }
+  alreadyCalculated = true;
+}
 sortedIterator.loadNext();
--- End diff --

Yes, you are right. Now I modified the `sortedIterator` after inserting 
rows. Due to I can only access an outer final field inside an inner class, so I 
used an array, is there better solution?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-04 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154635850
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

Good advice. Thx.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154568554
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 ---
@@ -159,6 +154,12 @@ public boolean hasNext() {
 @Override
 public UnsafeRow next() {
   try {
+if (!alreadyCalculated) {
--- End diff --

When `hasNext` is called, doesn't `sortedIterator` return no element 
anymore since we haven't added rows into it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154568281
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 ---
@@ -159,6 +154,12 @@ public boolean hasNext() {
 @Override
 public UnsafeRow next() {
   try {
+if (!alreadyCalculated) {
+  while (inputIterator.hasNext()) {
+insertRow(inputIterator.next());
+  }
+  alreadyCalculated = true;
+}
 sortedIterator.loadNext();
--- End diff --

`sortedIterator` is already assigned at L143. When you insert rows when 
first time to call `next`, can the `sortedIterator` correctly return sorted 
elements?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154567693
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -750,6 +756,8 @@ private[joins] class SortMergeJoinScanner(
   bufferedMatches.clear()
   false
 } else {
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

Add a comment like 
https://github.com/apache/spark/pull/19862/files#r154567168.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154567585
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 ---
@@ -182,18 +183,14 @@ public UnsafeRow next() {
 }
   };
 } catch (IOException e) {
-  cleanupResources();
   throw e;
+} finally {
+  // Since we won't ever call next() on an empty iterator, we need to 
clean up resources
+  // here in order to prevent memory leaks.
+  cleanupResources();
--- End diff --

This makes the resource cleaned up when we return iterator too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154567319
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -674,8 +674,9 @@ private[joins] class SortMergeJoinScanner(
   private[this] val bufferedMatches =
 new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
 
-  // Initialization (note: do _not_ want to advance streamed here).
-  advancedBufferedToRowWithNullFreeJoinKey()
+  // Initialization (note: do _not_ want to advance streamed here). This 
is made lazy to prevent
+  // unnecessary trigger of calculation.
+  private lazy val advancedBufferedIterRes = 
advancedBufferedToRowWithNullFreeJoinKey()
--- End diff --

`This is made lazy to run the initialization only once when accessing it.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154567168
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

Then add a comment like `Initialization at the first time reaching here`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154566562
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

Oh. I see. `advancedBufferedIterRes` is a lazy val.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154566463
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -700,38 +701,43 @@ private[joins] class SortMergeJoinScanner(
   bufferedMatches.clear()
   false
 } else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
+  // The new streamed row has the same join key as the previous row, 
so return the same
+  // matches.
--- End diff --

Unnecessary change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154566374
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

Once we advance both the streamed and buffered iterators, and call 
`bufferMatchingRows` at the last turn, it will advance buffered iterator until 
the `bufferedRow` doesn't match with current `streamedRowKey`. 

In the next turn, the call of ``advancedBufferedIterRes` here will advance 
buffered iterator and so the `bufferedRow` will be missed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154563897
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -674,8 +674,9 @@ private[joins] class SortMergeJoinScanner(
   private[this] val bufferedMatches =
 new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
 
-  // Initialization (note: do _not_ want to advance streamed here).
-  advancedBufferedToRowWithNullFreeJoinKey()
+  // Initialization (note: do _not_ want to advance streamed here). This 
is made lazy to prevent
+  // unnecessary trigger of calculation.
+  private lazy val advancedBufferedIterRes = 
advancedBufferedToRowWithNullFreeJoinKey()
--- End diff --

This function should be called (to try to set `BufferedRow`) before 
`BufferedRow` is checked, and it should be only once. This is the original 
requirement due to the logic. While to add this optimization, I think this is 
the best way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154564327
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

This advance function is only called once actually, so no `bufferedRow` 
will be missed. Or maybe I didn't understand your meaning?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154564488
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
+  if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
--- End diff --

I agree with you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154556774
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 ---
@@ -159,6 +159,12 @@ public boolean hasNext() {
 @Override
 public UnsafeRow next() {
   try {
+if (!alreadyCalculated) {
+  while (inputIterator.hasNext()) {
+insertRow(inputIterator.next());
+  }
+  alreadyCalculated = true;
--- End diff --

We have cleaned up resources when we have an empty iterator at L144. We 
should still follow it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154560155
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

To advance buffer iterator here, won't we miss the `bufferedRow` advanced 
before?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154558106
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner(
   matchJoinKey = null
   bufferedMatches.clear()
   false
-} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
-  // The new streamed row has the same join key as the previous row, 
so return the same matches.
-  true
-} else if (bufferedRow == null) {
-  // The streamed row's join key does not match the current batch of 
buffered rows and there are
-  // no more rows to read from the buffered iterator, so there can be 
no more matches.
-  matchJoinKey = null
-  bufferedMatches.clear()
-  false
 } else {
-  // Advance both the streamed and buffered iterators to find the next 
pair of matching rows.
-  var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  do {
-if (streamedRowKey.anyNull) {
-  advancedStreamed()
-} else {
-  assert(!bufferedRowKey.anyNull)
-  comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
-  if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
-  else if (comp < 0) advancedStreamed()
-}
-  } while (streamedRow != null && bufferedRow != null && comp != 0)
-  if (streamedRow == null || bufferedRow == null) {
-// We have either hit the end of one of the iterators, so there 
can be no more matches.
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
+  if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, 
matchJoinKey) == 0) {
--- End diff --

This block can be excluded from this else block. It can be at the original 
position. We don't need to advance buffer rows too if this condition is hit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154560524
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -674,8 +674,9 @@ private[joins] class SortMergeJoinScanner(
   private[this] val bufferedMatches =
 new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
 
-  // Initialization (note: do _not_ want to advance streamed here).
-  advancedBufferedToRowWithNullFreeJoinKey()
+  // Initialization (note: do _not_ want to advance streamed here). This 
is made lazy to prevent
+  // unnecessary trigger of calculation.
+  private lazy val advancedBufferedIterRes = 
advancedBufferedToRowWithNullFreeJoinKey()
--- End diff --

Isn't it the same to simply call `advancedBufferedToRowWithNullFreeJoinKey` 
at needed places?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19862#discussion_r154560474
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -750,6 +756,8 @@ private[joins] class SortMergeJoinScanner(
   bufferedMatches.clear()
   false
 } else {
+  // To make sure vars like bufferedRow is set
+  advancedBufferedIterRes
--- End diff --

ditto. We may miss the bufferedRow advanced before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org