[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/5717


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r36493358
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -62,111 +100,250 @@ case class SortMergeJoin(
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-val leftResults = left.execute().map(_.copy())
-val rightResults = right.execute().map(_.copy())
+val streamResults = streamedPlan.execute().map(_.copy())
+val bufferResults = bufferedPlan.execute().map(_.copy())
 
-leftResults.zipPartitions(rightResults) { (leftIter, rightIter) =>
+streamResults.zipPartitions(bufferResults) ( (streamedIter, 
bufferedIter) => {
+  // standard null rows
+  val streamedNullRow = 
InternalRow.fromSeq(Seq.fill(streamedPlan.output.length)(null))
+  val bufferedNullRow = 
InternalRow.fromSeq(Seq.fill(bufferedPlan.output.length)(null))
   new Iterator[InternalRow] {
 // An ordering that can be used to compare keys from both sides.
 private[this] val keyOrdering = 
newNaturalAscendingOrdering(leftKeys.map(_.dataType))
 // Mutable per row objects.
 private[this] val joinRow = new JoinedRow
-private[this] var leftElement: InternalRow = _
-private[this] var rightElement: InternalRow = _
-private[this] var leftKey: InternalRow = _
-private[this] var rightKey: InternalRow = _
-private[this] var rightMatches: CompactBuffer[InternalRow] = _
-private[this] var rightPosition: Int = -1
+private[this] var streamedElement: InternalRow = _
+private[this] var bufferedElement: InternalRow = _
+private[this] var streamedKey: InternalRow = _
+private[this] var bufferedKey: InternalRow = _
+private[this] var bufferedMatches: CompactBuffer[InternalRow] = _
+private[this] var bufferedPosition: Int = -1
 private[this] var stop: Boolean = false
 private[this] var matchKey: InternalRow = _
+// when we do merge algorithm and find some not matched join key, 
there must be a side
+// that do not have a corresponding match. So we need to mark 
which side it is. True means
+// streamed side not have match, and False means the buffered 
side. Only set when needed.
+private[this] var continueStreamed: Boolean = _
+private[this] var streamNullGenerated: Boolean = false
+// Tracks if each element in bufferedMatches have a matched 
streamedElement.
+private[this] var bitSet: BitSet = _
+// marks if the found result has been fetched.
+private[this] var found: Boolean = false
+private[this] var bufferNullGenerated: Boolean = false
 
 // initialize iterator
 initialize()
 
-override final def hasNext: Boolean = nextMatchingPair()
+override final def hasNext: Boolean = {
+  val matching = nextMatchingBlock()
+  if (matching && !isBufferEmpty(bufferedMatches)) {
+// The buffer stores all rows that match key, but condition 
may not be matched.
+// If none of rows in the buffer match condition, we'll fetch 
next matching block.
+findNextInBuffer() || hasNext
+  } else {
+matching
+  }
+}
+
+/**
+ * Run down the current `bufferedMatches` to find rows that match 
conditions.
+ * If `joinType` is not `Inner`, we will use `bufferNullGenerated` 
to mark if
+ * we need to build a bufferedNullRow for result.
+ * If `joinType` is `FullOuter`, we will use `streamNullGenerated` 
to mark if
+ * a buffered element need to join with a streamedNullRow.
+ * The method can be called multiple times since `found` serves as 
a guardian.
+ */
+def findNextInBuffer(): Boolean = {
+  while (!found && streamedElement != null
+&& keyOrdering.compare(streamedKey, matchKey) == 0) {
+while (bufferedPosition < bufferedMatches.size && 
!boundCondition(
+  joinRow(streamedElement, 
bufferedMatches(bufferedPosition {
+  bufferedPosition += 1
+}
+if (bufferedPosition == bufferedMatches.size) {
+  if (joinType == Inner || bufferNullGenerated) {
+bufferNullGenerated = false
+bufferedPosition = 0
+fetchStreamed()
+  } else {
+found = true
+  }
+} else {
+  // mark as true so we don't generate null row for streamed 
row.
+   

[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r36493188
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -62,111 +100,250 @@ case class SortMergeJoin(
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-val leftResults = left.execute().map(_.copy())
-val rightResults = right.execute().map(_.copy())
+val streamResults = streamedPlan.execute().map(_.copy())
+val bufferResults = bufferedPlan.execute().map(_.copy())
 
-leftResults.zipPartitions(rightResults) { (leftIter, rightIter) =>
+streamResults.zipPartitions(bufferResults) ( (streamedIter, 
bufferedIter) => {
+  // standard null rows
+  val streamedNullRow = 
InternalRow.fromSeq(Seq.fill(streamedPlan.output.length)(null))
+  val bufferedNullRow = 
InternalRow.fromSeq(Seq.fill(bufferedPlan.output.length)(null))
   new Iterator[InternalRow] {
 // An ordering that can be used to compare keys from both sides.
 private[this] val keyOrdering = 
newNaturalAscendingOrdering(leftKeys.map(_.dataType))
 // Mutable per row objects.
 private[this] val joinRow = new JoinedRow
-private[this] var leftElement: InternalRow = _
-private[this] var rightElement: InternalRow = _
-private[this] var leftKey: InternalRow = _
-private[this] var rightKey: InternalRow = _
-private[this] var rightMatches: CompactBuffer[InternalRow] = _
-private[this] var rightPosition: Int = -1
+private[this] var streamedElement: InternalRow = _
+private[this] var bufferedElement: InternalRow = _
+private[this] var streamedKey: InternalRow = _
+private[this] var bufferedKey: InternalRow = _
+private[this] var bufferedMatches: CompactBuffer[InternalRow] = _
+private[this] var bufferedPosition: Int = -1
 private[this] var stop: Boolean = false
 private[this] var matchKey: InternalRow = _
+// when we do merge algorithm and find some not matched join key, 
there must be a side
+// that do not have a corresponding match. So we need to mark 
which side it is. True means
+// streamed side not have match, and False means the buffered 
side. Only set when needed.
+private[this] var continueStreamed: Boolean = _
+private[this] var streamNullGenerated: Boolean = false
+// Tracks if each element in bufferedMatches have a matched 
streamedElement.
+private[this] var bitSet: BitSet = _
+// marks if the found result has been fetched.
+private[this] var found: Boolean = false
+private[this] var bufferNullGenerated: Boolean = false
 
 // initialize iterator
 initialize()
 
-override final def hasNext: Boolean = nextMatchingPair()
+override final def hasNext: Boolean = {
+  val matching = nextMatchingBlock()
+  if (matching && !isBufferEmpty(bufferedMatches)) {
+// The buffer stores all rows that match key, but condition 
may not be matched.
+// If none of rows in the buffer match condition, we'll fetch 
next matching block.
+findNextInBuffer() || hasNext
+  } else {
+matching
+  }
+}
+
+/**
+ * Run down the current `bufferedMatches` to find rows that match 
conditions.
+ * If `joinType` is not `Inner`, we will use `bufferNullGenerated` 
to mark if
+ * we need to build a bufferedNullRow for result.
+ * If `joinType` is `FullOuter`, we will use `streamNullGenerated` 
to mark if
+ * a buffered element need to join with a streamedNullRow.
+ * The method can be called multiple times since `found` serves as 
a guardian.
+ */
+def findNextInBuffer(): Boolean = {
+  while (!found && streamedElement != null
+&& keyOrdering.compare(streamedKey, matchKey) == 0) {
+while (bufferedPosition < bufferedMatches.size && 
!boundCondition(
+  joinRow(streamedElement, 
bufferedMatches(bufferedPosition {
+  bufferedPosition += 1
+}
+if (bufferedPosition == bufferedMatches.size) {
+  if (joinType == Inner || bufferNullGenerated) {
+bufferNullGenerated = false
+bufferedPosition = 0
+fetchStreamed()
+  } else {
+found = true
+  }
+} else {
+  // mark as true so we don't generate null row for streamed 
row.
+  

[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-128239391
  
  [Test build #39983 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39983/console)
 for   PR 5717 at commit 
[`d02f6bb`](https://github.com/apache/spark/commit/d02f6bbab969a20e8a3cd9d6b065db39462d6ff5).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-128239606
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-128224368
  
  [Test build #39983 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39983/consoleFull)
 for   PR 5717 at commit 
[`d02f6bb`](https://github.com/apache/spark/commit/d02f6bbab969a20e8a3cd9d6b065db39462d6ff5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-128224165
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-128224169
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127908732
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127908730
  
  [Test build #39844 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39844/console)
 for   PR 5717 at commit 
[`549796e`](https://github.com/apache/spark/commit/549796eba511316fd43d3cecdfb289665c0027bc).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127907896
  
  [Test build #227 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SlowSparkPullRequestBuilder/227/console)
 for   PR 5717 at commit 
[`549796e`](https://github.com/apache/spark/commit/549796eba511316fd43d3cecdfb289665c0027bc).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127907901
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127906716
  
  [Test build #39844 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39844/consoleFull)
 for   PR 5717 at commit 
[`549796e`](https://github.com/apache/spark/commit/549796eba511316fd43d3cecdfb289665c0027bc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127906326
  
  [Test build #227 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SlowSparkPullRequestBuilder/227/consoleFull)
 for   PR 5717 at commit 
[`549796e`](https://github.com/apache/spark/commit/549796eba511316fd43d3cecdfb289665c0027bc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127906214
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127906184
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127905222
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127905246
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread adrian-wang
Github user adrian-wang commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127905132
  
retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127900058
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127899918
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127899891
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r36238964
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---
@@ -152,16 +149,14 @@ class JoinSuite extends QueryTest with 
BeforeAndAfterEach {
 ctx.cacheManager.clearCache()
 ctx.sql("CACHE TABLE testData")
 
-val SORTMERGEJOIN_ENABLED: Boolean = ctx.conf.sortMergeJoinEnabled
 Seq(
-  ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", 
classOf[ShuffledHashOuterJoin]),
+  ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", 
classOf[SortMergeJoin]),
   ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 
2",
-classOf[BroadcastHashOuterJoin]),
--- End diff --

It looks like this patch causes us to plan SortMergeJoin for outer joins 
that are capable of using BroadcastHashOuterJoin, which seems like it could 
lead to performance issues by triggering unnecessary shuffling of the large 
table.

As a result, I think that we should not change the broadcast-enabled half 
of the test, but, rather, should update the broadcast-disabled half to test 
both the sort-merge-join enabled and sort-merge-join-disabled configurations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127723655
  
Also, I think that it might be a little clearer to introduce a separate 
`SortMergeOuterJoin` operator rather than trying to combine the inner and outer 
joins into the same operator. This would be consistent with what we've done for 
other joins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127713743
  
We should have a test to guard against reintroduction of the the bug that 
@jeanlyn mentioned.

I find the code here to be really dense and hard to understand, so I'd like 
to try to pursue my design first. There's another 1.5 blocker / critical 
related to eliminating JoinedRow in favor of Tungsten's RowJoiner when 
UnsafeRows are used, and I think that the code re-use enabled by my design will 
make this significantly easier to accomplish.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread adrian-wang
Github user adrian-wang commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127560250
  
@JoshRosen I've fixed the bug  that @jeanlyn mentioned, can you merge this 
first and then do the following steps in #7904 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127558998
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127558897
  
  [Test build #39689 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39689/console)
 for   PR 5717 at commit 
[`bff834a`](https://github.com/apache/spark/commit/bff834aa11d074f387c33ffe6bee3acf670be2ac).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127513516
  
  [Test build #39689 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39689/consoleFull)
 for   PR 5717 at commit 
[`bff834a`](https://github.com/apache/spark/commit/bff834aa11d074f387c33ffe6bee3acf670be2ac).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127513075
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127513050
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127508758
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127508742
  
  [Test build #39678 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39678/console)
 for   PR 5717 at commit 
[`d0e65c5`](https://github.com/apache/spark/commit/d0e65c59019e80440ad90532d074863697ec46d4).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127505435
  
  [Test build #39678 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39678/consoleFull)
 for   PR 5717 at commit 
[`d0e65c5`](https://github.com/apache/spark/commit/d0e65c59019e80440ad90532d074863697ec46d4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127504700
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-08-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-127504682
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126581760
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126581734
  
  [Test build #39159 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39159/console)
 for   PR 5717 at commit 
[`f520079`](https://github.com/apache/spark/commit/f5200790e1f5ddf4d3cb2cb68dfdfff45473aaf4).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126569073
  
  [Test build #39159 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39159/consoleFull)
 for   PR 5717 at commit 
[`f520079`](https://github.com/apache/spark/commit/f5200790e1f5ddf4d3cb2cb68dfdfff45473aaf4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126568863
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126568916
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126219174
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126219170
  
  [Test build #39030 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39030/console)
 for   PR 5717 at commit 
[`fd73084`](https://github.com/apache/spark/commit/fd73084f40a6eb74fab98d062c13c54e049ef055).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126218775
  
  [Test build #39030 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39030/consoleFull)
 for   PR 5717 at commit 
[`fd73084`](https://github.com/apache/spark/commit/fd73084f40a6eb74fab98d062c13c54e049ef055).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126218039
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126218132
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126213835
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126213811
  
  [Test build #39024 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39024/console)
 for   PR 5717 at commit 
[`4152e86`](https://github.com/apache/spark/commit/4152e864a19490ee6c5db7f9d3589fed7693efde).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126209436
  
  [Test build #39024 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39024/consoleFull)
 for   PR 5717 at commit 
[`4152e86`](https://github.com/apache/spark/commit/4152e864a19490ee6c5db7f9d3589fed7693efde).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126208727
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126208711
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126005227
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126005211
  
  [Test build #38839 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38839/console)
 for   PR 5717 at commit 
[`07f96b5`](https://github.com/apache/spark/commit/07f96b5e9a0d2a79498589976f287f38eacbd11e).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-126002674
  
  [Test build #38839 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38839/consoleFull)
 for   PR 5717 at commit 
[`07f96b5`](https://github.com/apache/spark/commit/07f96b5e9a0d2a79498589976f287f38eacbd11e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-12555
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-125921405
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-28 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-125860123
  
Yup I will work on this myself tomorrow based on this PR.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-07-28 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-125859852
  
/bump, do you think that we'll have time to finish this for 1.5.0?

/cc @rxin


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-22 Thread adrian-wang
Github user adrian-wang commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r33010716
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -82,86 +130,169 @@ case class SortMergeJoin(
 
 override final def next(): InternalRow = {
   if (hasNext) {
-// we are using the buffered right rows and run down left 
iterator
-val joinedRow = joinRow(leftElement, 
rightMatches(rightPosition))
-rightPosition += 1
-if (rightPosition >= rightMatches.size) {
-  rightPosition = 0
-  fetchLeft()
-  if (leftElement == null || keyOrdering.compare(leftKey, 
matchKey) != 0) {
-stop = false
-rightMatches = null
+if (bufferedMatches == null || bufferedMatches.size == 0) {
+  // we just found a row with no join match and we are here to 
produce a row
+  // with this row and a standard null row from the other side.
+  if (continueStreamed) {
+val joinedRow = smartJoinRow(streamedElement, 
bufferedNullRow)
+fetchStreamed()
+joinedRow
+  } else {
+val joinedRow = smartJoinRow(streamedNullRow, 
bufferedElement)
+fetchBuffered()
+joinedRow
+  }
+} else {
+  // we are using the buffered right rows and run down left 
iterator
+  val joinedRow = smartJoinRow(streamedElement, 
bufferedMatches(bufferedPosition))
+  bufferedPosition += 1
+  if (bufferedPosition >= bufferedMatches.size) {
+bufferedPosition = 0
+if (joinType != FullOuter || secondStreamedElement == 
null) {
+  fetchStreamed()
--- End diff --

Good catch, I'll rewrite this part.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-21 Thread jeanlyn
Github user jeanlyn commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32891298
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -82,86 +130,169 @@ case class SortMergeJoin(
 
 override final def next(): InternalRow = {
   if (hasNext) {
-// we are using the buffered right rows and run down left 
iterator
-val joinedRow = joinRow(leftElement, 
rightMatches(rightPosition))
-rightPosition += 1
-if (rightPosition >= rightMatches.size) {
-  rightPosition = 0
-  fetchLeft()
-  if (leftElement == null || keyOrdering.compare(leftKey, 
matchKey) != 0) {
-stop = false
-rightMatches = null
+if (bufferedMatches == null || bufferedMatches.size == 0) {
+  // we just found a row with no join match and we are here to 
produce a row
+  // with this row and a standard null row from the other side.
+  if (continueStreamed) {
+val joinedRow = smartJoinRow(streamedElement, 
bufferedNullRow)
+fetchStreamed()
+joinedRow
+  } else {
+val joinedRow = smartJoinRow(streamedNullRow, 
bufferedElement)
+fetchBuffered()
+joinedRow
+  }
+} else {
+  // we are using the buffered right rows and run down left 
iterator
+  val joinedRow = smartJoinRow(streamedElement, 
bufferedMatches(bufferedPosition))
+  bufferedPosition += 1
+  if (bufferedPosition >= bufferedMatches.size) {
+bufferedPosition = 0
+if (joinType != FullOuter || secondStreamedElement == 
null) {
+  fetchStreamed()
+  if (streamedElement == null || 
keyOrdering.compare(streamedKey, matchKey) != 0) {
+stop = false
+bufferedMatches = null
+  }
+} else {
+  // in FullOuter join and the first time we finish the 
match buffer,
+  // we still want to generate all rows with streamed null 
row and buffered
+  // rows that match the join key but not the conditions.
+  streamedElement = secondStreamedElement
+  bufferedMatches = secondBufferedMatches
+  secondStreamedElement = null
+  secondBufferedMatches = null
+}
   }
+  joinedRow
 }
-joinedRow
   } else {
 // no more result
 throw new NoSuchElementException
   }
 }
 
-private def fetchLeft() = {
-  if (leftIter.hasNext) {
-leftElement = leftIter.next()
-leftKey = leftKeyGenerator(leftElement)
+private def smartJoinRow(streamedRow: InternalRow, bufferedRow: 
InternalRow): InternalRow =
+  joinType match {
+case RightOuter => joinRow(bufferedRow, streamedRow)
+case _ => joinRow(streamedRow, bufferedRow)
+  }
+
+private def fetchStreamed(): Unit = {
+  if (streamedIter.hasNext) {
+streamedElement = streamedIter.next()
+streamedKey = streamedKeyGenerator(streamedElement)
   } else {
-leftElement = null
+streamedElement = null
   }
 }
 
-private def fetchRight() = {
-  if (rightIter.hasNext) {
-rightElement = rightIter.next()
-rightKey = rightKeyGenerator(rightElement)
+private def fetchBuffered(): Unit = {
+  if (bufferedIter.hasNext) {
+bufferedElement = bufferedIter.next()
+bufferedKey = bufferedKeyGenerator(bufferedElement)
   } else {
-rightElement = null
+bufferedElement = null
   }
 }
 
 private def initialize() = {
-  fetchLeft()
-  fetchRight()
+  fetchStreamed()
+  fetchBuffered()
 }
 
 /**
  * Searches the right iterator for the next rows that have matches 
in left side, and store
  * them in a buffer.
+ * When this is not a Inner join, we will also return true when we 
get a row with no match
+ * on the other side. This search will jump out every time from 
the same position until
+ * `next()` is called.
+   

[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-21 Thread jeanlyn
Github user jeanlyn commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32891269
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -82,86 +130,169 @@ case class SortMergeJoin(
 
 override final def next(): InternalRow = {
   if (hasNext) {
-// we are using the buffered right rows and run down left 
iterator
-val joinedRow = joinRow(leftElement, 
rightMatches(rightPosition))
-rightPosition += 1
-if (rightPosition >= rightMatches.size) {
-  rightPosition = 0
-  fetchLeft()
-  if (leftElement == null || keyOrdering.compare(leftKey, 
matchKey) != 0) {
-stop = false
-rightMatches = null
+if (bufferedMatches == null || bufferedMatches.size == 0) {
+  // we just found a row with no join match and we are here to 
produce a row
+  // with this row and a standard null row from the other side.
+  if (continueStreamed) {
+val joinedRow = smartJoinRow(streamedElement, 
bufferedNullRow)
+fetchStreamed()
+joinedRow
+  } else {
+val joinedRow = smartJoinRow(streamedNullRow, 
bufferedElement)
+fetchBuffered()
+joinedRow
+  }
+} else {
+  // we are using the buffered right rows and run down left 
iterator
+  val joinedRow = smartJoinRow(streamedElement, 
bufferedMatches(bufferedPosition))
+  bufferedPosition += 1
+  if (bufferedPosition >= bufferedMatches.size) {
+bufferedPosition = 0
+if (joinType != FullOuter || secondStreamedElement == 
null) {
+  fetchStreamed()
--- End diff --

I think we should use `boundCondition ` to update `bufferedMatches ` after 
we `fetchStreamed ()` .Otherwise we may get wrong answer.For example
```sql
table a(key int,value int);table b(key int,value int)
data of a
1 3
1 1
2 1
2 3

data of b
1 1
2 1
select a.key,b.key,a.value-b.value from a left outer join b on a.key=b.key 
and a.value - b.value > 1
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-21 Thread jeanlyn
Github user jeanlyn commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32891271
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -90,13 +90,12 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
left.statistics.sizeInBytes <= 
sqlContext.conf.autoBroadcastJoinThreshold =>
   makeBroadcastHashJoin(leftKeys, rightKeys, left, right, 
condition, joins.BuildLeft)
 
-  // If the sort merge join option is set, we want to use sort merge 
join prior to hashjoin
-  // for now let's support inner join first, then add outer join
-  case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, 
left, right)
+  // If the sort merge join option is set, we want to use sort merge 
join prior to hashjoin.
+  // And for outer join, we can not put conditions outside of the join
+  case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
 if sqlContext.conf.sortMergeJoinEnabled =>
-val mergeJoin =
-  joins.SortMergeJoin(leftKeys, rightKeys, planLater(left), 
planLater(right))
-condition.map(Filter(_, mergeJoin)).getOrElse(mergeJoin) :: Nil
+joins.SortMergeJoin(
+  leftKeys, rightKeys, joinType, planLater(left), 
planLater(right), condition) :: Nil
 
--- End diff --

Shall we move the code to a new `Strategy `(like SortMergeJoin) instead of 
mix in   `Hashjoin`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-19 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-113717701
  
Thanks for updating this; I'll try to take another review pass tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-113690424
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-113690410
  
  [Test build #35337 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35337/console)
 for   PR 5717 at commit 
[`211e101`](https://github.com/apache/spark/commit/211e1012dc28ed610d294d0678b1d5621a901e53).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class SerializableConfiguration(@transient var value: Configuration) 
extends Serializable `
  * `class SerializableJobConf(@transient var value: JobConf) extends 
Serializable `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-113670244
  
  [Test build #35337 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35337/consoleFull)
 for   PR 5717 at commit 
[`211e101`](https://github.com/apache/spark/commit/211e1012dc28ed610d294d0678b1d5621a901e53).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-113670190
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-113670198
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-19 Thread adrian-wang
Github user adrian-wang commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-113669761
  
retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-113408338
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-113408241
  
  [Test build #35233 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35233/console)
 for   PR 5717 at commit 
[`211e101`](https://github.com/apache/spark/commit/211e1012dc28ed610d294d0678b1d5621a901e53).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class SerializableConfiguration(@transient var value: Configuration) 
extends Serializable `
  * `class SerializableJobConf(@transient var value: JobConf) extends 
Serializable `
  * `class ElementwiseProduct(VectorTransformer):`
  * `case class CreateStruct(children: Seq[Expression]) extends Expression `
  * `case class Sqrt(child: Expression) extends 
UnaryMathExpression(math.sqrt, "SQRT")`
  * `case class Logarithm(left: Expression, right: Expression)`
  * `case class SetCommand(kv: Option[(String, Option[String])]) extends 
RunnableCommand with Logging `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-113390868
  
  [Test build #35233 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35233/consoleFull)
 for   PR 5717 at commit 
[`211e101`](https://github.com/apache/spark/commit/211e1012dc28ed610d294d0678b1d5621a901e53).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-113390452
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-113390374
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-18 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32803889
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer
 case class SortMergeJoin(
 leftKeys: Seq[Expression],
 rightKeys: Seq[Expression],
+joinType: JoinType,
 left: SparkPlan,
-right: SparkPlan) extends BinaryNode {
+right: SparkPlan,
+condition: Option[Expression] = None) extends BinaryNode {
 
-  override def output: Seq[Attribute] = left.output ++ right.output
+  val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match {
+case RightOuter => (right, left, rightKeys, leftKeys)
+case _ => (left, right, leftKeys, rightKeys)
+  }
+
+  override def output: Seq[Attribute] = joinType match {
+case Inner =>
+  left.output ++ right.output
+case LeftOuter =>
+  left.output ++ right.output.map(_.withNullability(true))
+case RightOuter =>
+  left.output.map(_.withNullability(true)) ++ right.output
+case FullOuter =>
+  left.output.map(_.withNullability(true)) ++ 
right.output.map(_.withNullability(true))
+case x =>
+  throw new IllegalStateException(s"SortMergeJoin should not take $x 
as the JoinType")
+  }
 
-  override def outputPartitioning: Partitioning = left.outputPartitioning
+  override def outputPartitioning: Partitioning = joinType match {
+case FullOuter =>
+  // when doing Full Outer join, NULL rows from both sides are not so 
partitioned.
+  UnknownPartitioning(streamed.outputPartitioning.numPartitions)
+case _ => streamed.outputPartitioning
+  }
 
   override def requiredChildDistribution: Seq[Distribution] =
 ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
 
   // this is to manually construct an ordering that can be used to compare 
keys from both sides
-  private val keyOrdering: RowOrdering = 
RowOrdering.forSchema(leftKeys.map(_.dataType))
+  private val keyOrdering: RowOrdering = 
RowOrdering.forSchema(streamedKeys.map(_.dataType))
 
-  override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys)
+  override def outputOrdering: Seq[SortOrder] = joinType match {
+case FullOuter => Nil // when doing Full Outer join, NULL rows from 
both sides are not ordered.
+case _ => requiredOrders(streamedKeys)
+  }
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] =
 requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
 
-  @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, 
left.output)
-  @transient protected lazy val rightKeyGenerator = 
newProjection(rightKeys, right.output)
+  @transient protected lazy val streamedKeyGenerator = 
newProjection(streamedKeys, streamed.output)
+  @transient protected lazy val bufferedKeyGenerator = 
newProjection(bufferedKeys, buffered.output)
+
+  // standard null rows
+  @transient private[this] lazy val streamedNullRow = new 
GenericRow(streamed.output.length)
+  @transient private[this] lazy val bufferedNullRow = new 
GenericRow(buffered.output.length)
+
+  // checks if the joinedRow can meet condition requirements
+  @transient private[this] lazy val boundCondition =
+condition.map(newPredicate(_, streamed.output ++ 
buffered.output)).getOrElse((row: Row) => true)
 
   private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
 keys.map(SortOrder(_, Ascending))
 
   protected override def doExecute(): RDD[Row] = {
-val leftResults = left.execute().map(_.copy())
-val rightResults = right.execute().map(_.copy())
+val streamResults = streamed.execute().map(_.copy())
--- End diff --

That's true for Left/Right Outer and even inner join, however, in full 
outer join, we probably need to cache the streamed row once, but you're right, 
we can do the copy whenever necessary during the iterating, not here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-18 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32802991
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer
 case class SortMergeJoin(
 leftKeys: Seq[Expression],
 rightKeys: Seq[Expression],
+joinType: JoinType,
 left: SparkPlan,
-right: SparkPlan) extends BinaryNode {
+right: SparkPlan,
+condition: Option[Expression] = None) extends BinaryNode {
 
-  override def output: Seq[Attribute] = left.output ++ right.output
+  val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match {
+case RightOuter => (right, left, rightKeys, leftKeys)
+case _ => (left, right, leftKeys, rightKeys)
+  }
+
+  override def output: Seq[Attribute] = joinType match {
+case Inner =>
+  left.output ++ right.output
+case LeftOuter =>
+  left.output ++ right.output.map(_.withNullability(true))
+case RightOuter =>
+  left.output.map(_.withNullability(true)) ++ right.output
+case FullOuter =>
+  left.output.map(_.withNullability(true)) ++ 
right.output.map(_.withNullability(true))
+case x =>
+  throw new IllegalStateException(s"SortMergeJoin should not take $x 
as the JoinType")
+  }
 
-  override def outputPartitioning: Partitioning = left.outputPartitioning
+  override def outputPartitioning: Partitioning = joinType match {
+case FullOuter =>
+  // when doing Full Outer join, NULL rows from both sides are not so 
partitioned.
+  UnknownPartitioning(streamed.outputPartitioning.numPartitions)
+case _ => streamed.outputPartitioning
+  }
 
   override def requiredChildDistribution: Seq[Distribution] =
 ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
 
   // this is to manually construct an ordering that can be used to compare 
keys from both sides
-  private val keyOrdering: RowOrdering = 
RowOrdering.forSchema(leftKeys.map(_.dataType))
+  private val keyOrdering: RowOrdering = 
RowOrdering.forSchema(streamedKeys.map(_.dataType))
 
-  override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys)
+  override def outputOrdering: Seq[SortOrder] = joinType match {
+case FullOuter => Nil // when doing Full Outer join, NULL rows from 
both sides are not ordered.
+case _ => requiredOrders(streamedKeys)
+  }
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] =
 requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
 
-  @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, 
left.output)
-  @transient protected lazy val rightKeyGenerator = 
newProjection(rightKeys, right.output)
+  @transient protected lazy val streamedKeyGenerator = 
newProjection(streamedKeys, streamed.output)
+  @transient protected lazy val bufferedKeyGenerator = 
newProjection(bufferedKeys, buffered.output)
+
+  // standard null rows
+  @transient private[this] lazy val streamedNullRow = new 
GenericRow(streamed.output.length)
+  @transient private[this] lazy val bufferedNullRow = new 
GenericRow(buffered.output.length)
+
+  // checks if the joinedRow can meet condition requirements
+  @transient private[this] lazy val boundCondition =
+condition.map(newPredicate(_, streamed.output ++ 
buffered.output)).getOrElse((row: Row) => true)
 
   private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
 keys.map(SortOrder(_, Ascending))
 
   protected override def doExecute(): RDD[Row] = {
-val leftResults = left.execute().map(_.copy())
-val rightResults = right.execute().map(_.copy())
+val streamResults = streamed.execute().map(_.copy())
--- End diff --

We certainly need to copy the inputs that are passed to external sort, but 
the `ExternalSort` operator itself should take care of that.  Here, I think 
we're consuming the result of a sort operator and are not buffering rows from 
`streamResults` (unless I've overlooked other buffering inside of 
`zipPartitions` somehow).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-18 Thread adrian-wang
Github user adrian-wang commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32802825
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer
 case class SortMergeJoin(
 leftKeys: Seq[Expression],
 rightKeys: Seq[Expression],
+joinType: JoinType,
 left: SparkPlan,
-right: SparkPlan) extends BinaryNode {
+right: SparkPlan,
+condition: Option[Expression] = None) extends BinaryNode {
 
-  override def output: Seq[Attribute] = left.output ++ right.output
+  val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match {
+case RightOuter => (right, left, rightKeys, leftKeys)
+case _ => (left, right, leftKeys, rightKeys)
+  }
+
+  override def output: Seq[Attribute] = joinType match {
+case Inner =>
+  left.output ++ right.output
+case LeftOuter =>
+  left.output ++ right.output.map(_.withNullability(true))
+case RightOuter =>
+  left.output.map(_.withNullability(true)) ++ right.output
+case FullOuter =>
+  left.output.map(_.withNullability(true)) ++ 
right.output.map(_.withNullability(true))
+case x =>
+  throw new IllegalStateException(s"SortMergeJoin should not take $x 
as the JoinType")
+  }
 
-  override def outputPartitioning: Partitioning = left.outputPartitioning
+  override def outputPartitioning: Partitioning = joinType match {
+case FullOuter =>
+  // when doing Full Outer join, NULL rows from both sides are not so 
partitioned.
+  UnknownPartitioning(streamed.outputPartitioning.numPartitions)
+case _ => streamed.outputPartitioning
+  }
 
   override def requiredChildDistribution: Seq[Distribution] =
 ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
 
   // this is to manually construct an ordering that can be used to compare 
keys from both sides
-  private val keyOrdering: RowOrdering = 
RowOrdering.forSchema(leftKeys.map(_.dataType))
+  private val keyOrdering: RowOrdering = 
RowOrdering.forSchema(streamedKeys.map(_.dataType))
 
-  override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys)
+  override def outputOrdering: Seq[SortOrder] = joinType match {
+case FullOuter => Nil // when doing Full Outer join, NULL rows from 
both sides are not ordered.
+case _ => requiredOrders(streamedKeys)
+  }
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] =
 requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
 
-  @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, 
left.output)
-  @transient protected lazy val rightKeyGenerator = 
newProjection(rightKeys, right.output)
+  @transient protected lazy val streamedKeyGenerator = 
newProjection(streamedKeys, streamed.output)
+  @transient protected lazy val bufferedKeyGenerator = 
newProjection(bufferedKeys, buffered.output)
+
+  // standard null rows
+  @transient private[this] lazy val streamedNullRow = new 
GenericRow(streamed.output.length)
+  @transient private[this] lazy val bufferedNullRow = new 
GenericRow(buffered.output.length)
+
+  // checks if the joinedRow can meet condition requirements
+  @transient private[this] lazy val boundCondition =
+condition.map(newPredicate(_, streamed.output ++ 
buffered.output)).getOrElse((row: Row) => true)
 
   private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
 keys.map(SortOrder(_, Ascending))
 
   protected override def doExecute(): RDD[Row] = {
-val leftResults = left.execute().map(_.copy())
-val rightResults = right.execute().map(_.copy())
+val streamResults = streamed.execute().map(_.copy())
--- End diff --

I think we need to copy this, it has something to do with the external sort.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-18 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32802697
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer
 case class SortMergeJoin(
 leftKeys: Seq[Expression],
 rightKeys: Seq[Expression],
+joinType: JoinType,
 left: SparkPlan,
-right: SparkPlan) extends BinaryNode {
+right: SparkPlan,
+condition: Option[Expression] = None) extends BinaryNode {
 
-  override def output: Seq[Attribute] = left.output ++ right.output
+  val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match {
+case RightOuter => (right, left, rightKeys, leftKeys)
+case _ => (left, right, leftKeys, rightKeys)
+  }
+
+  override def output: Seq[Attribute] = joinType match {
+case Inner =>
+  left.output ++ right.output
+case LeftOuter =>
+  left.output ++ right.output.map(_.withNullability(true))
+case RightOuter =>
+  left.output.map(_.withNullability(true)) ++ right.output
+case FullOuter =>
+  left.output.map(_.withNullability(true)) ++ 
right.output.map(_.withNullability(true))
+case x =>
+  throw new IllegalStateException(s"SortMergeJoin should not take $x 
as the JoinType")
+  }
 
-  override def outputPartitioning: Partitioning = left.outputPartitioning
+  override def outputPartitioning: Partitioning = joinType match {
+case FullOuter =>
+  // when doing Full Outer join, NULL rows from both sides are not so 
partitioned.
+  UnknownPartitioning(streamed.outputPartitioning.numPartitions)
+case _ => streamed.outputPartitioning
--- End diff --

I think @JoshRosen is right, but currently all of the existed equi-join has 
the same bug which described at 
https://issues.apache.org/jira/browse/SPARK-2205 , so for now, we'd better keep 
it the same as it was to be (just take the streaming output partitioning).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-18 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-113348731
  
@adrian-wang, I'm planning to take another pass on this pretty soon.  At a 
high level, this patch is in very good shape since most of its code is modeled 
after other existing join implementations in Spark SQL.  If you update this in 
the next couple of days, I'll try my best to be responsive with my reviews so 
we can get this in soon and not have too many merge conflicts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-111588668
  
By the way, to provide a bit of context for why I'm reviewing this PR: I'm 
working on some optimizations to sorting in Spark SQL which should benefit 
sort-merge-join, so I've looked over all of this code pretty recently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32338245
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -84,84 +129,164 @@ case class SortMergeJoin(
 
 override final def next(): Row = {
   if (hasNext) {
-// we are using the buffered right rows and run down left 
iterator
-val joinedRow = joinRow(leftElement, 
rightMatches(rightPosition))
-rightPosition += 1
-if (rightPosition >= rightMatches.size) {
-  rightPosition = 0
-  fetchLeft()
-  if (leftElement == null || keyOrdering.compare(leftKey, 
matchKey) != 0) {
-stop = false
-rightMatches = null
+if (bufferedMatches == null || bufferedMatches.size == 0) {
+  // we just found a row with no join match and we are here to 
produce a row
+  // with this row with a standard null row from the other 
side.
+  if (continueStreamed) {
+val joinedRow = smartJoinRow(streamedElement, 
bufferedNullRow.copy())
+fetchStreamed()
+joinedRow
+  } else {
+val joinedRow = smartJoinRow(streamedNullRow.copy(), 
bufferedElement)
+fetchBuffered()
+joinedRow
+  }
+} else {
+  // we are using the buffered right rows and run down left 
iterator
+  val joinedRow = smartJoinRow(streamedElement, 
bufferedMatches(bufferedPosition))
+  bufferedPosition += 1
+  if (bufferedPosition >= bufferedMatches.size) {
+bufferedPosition = 0
+if (joinType != FullOuter || secondStreamedElement == 
null) {
+  fetchStreamed()
+  if (streamedElement == null || 
keyOrdering.compare(streamedKey, matchKey) != 0) {
+stop = false
+bufferedMatches = null
+  }
+} else {
+  // in FullOuter join and the first time we finish the 
match buffer,
+  // we still want to generate all rows with streamed null 
row and buffered
+  // rows that match the join key but not the conditions.
+  streamedElement = secondStreamedElement
+  bufferedMatches = secondBufferedMatches
+  secondStreamedElement = null
+  secondBufferedMatches = null
+}
   }
+  joinedRow
 }
-joinedRow
   } else {
 // no more result
 throw new NoSuchElementException
   }
 }
 
-private def fetchLeft() = {
-  if (leftIter.hasNext) {
-leftElement = leftIter.next()
-leftKey = leftKeyGenerator(leftElement)
+private def smartJoinRow(streamedRow: Row, bufferedRow: Row): Row 
= joinType match {
+  case RightOuter => joinRow(bufferedRow, streamedRow)
+  case _ => joinRow(streamedRow, bufferedRow)
+}
+
+private def fetchStreamed() = {
+  if (streamedIter.hasNext) {
+streamedElement = streamedIter.next()
+streamedKey = streamedKeyGenerator(streamedElement)
   } else {
-leftElement = null
+streamedElement = null
   }
 }
 
-private def fetchRight() = {
-  if (rightIter.hasNext) {
-rightElement = rightIter.next()
-rightKey = rightKeyGenerator(rightElement)
+private def fetchBuffered() = {
+  if (bufferedIter.hasNext) {
+bufferedElement = bufferedIter.next()
+bufferedKey = bufferedKeyGenerator(bufferedElement)
   } else {
-rightElement = null
+bufferedElement = null
   }
 }
 
 private def initialize() = {
-  fetchLeft()
-  fetchRight()
+  fetchStreamed()
+  fetchBuffered()
 }
 
 /**
  * Searches the right iterator for the next rows that have matches 
in left side, and store
  * them in a buffer.
+ * When this is not a Inner join, we will also return true when we 
get a row with no match
+ * on the other side. This search will jump out every time from 
the same position until
+ * `next()` is called.
  *
  * @return true if the search 

[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread adrian-wang
Github user adrian-wang commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-111570564
  
@JoshRosen Thanks for your comments, I will refine the code accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread adrian-wang
Github user adrian-wang commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32337036
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer
 case class SortMergeJoin(
 leftKeys: Seq[Expression],
 rightKeys: Seq[Expression],
+joinType: JoinType,
 left: SparkPlan,
-right: SparkPlan) extends BinaryNode {
+right: SparkPlan,
+condition: Option[Expression] = None) extends BinaryNode {
 
-  override def output: Seq[Attribute] = left.output ++ right.output
+  val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match {
+case RightOuter => (right, left, rightKeys, leftKeys)
+case _ => (left, right, leftKeys, rightKeys)
+  }
+
+  override def output: Seq[Attribute] = joinType match {
+case Inner =>
+  left.output ++ right.output
+case LeftOuter =>
+  left.output ++ right.output.map(_.withNullability(true))
+case RightOuter =>
+  left.output.map(_.withNullability(true)) ++ right.output
+case FullOuter =>
+  left.output.map(_.withNullability(true)) ++ 
right.output.map(_.withNullability(true))
+case x =>
+  throw new IllegalStateException(s"SortMergeJoin should not take $x 
as the JoinType")
+  }
 
-  override def outputPartitioning: Partitioning = left.outputPartitioning
+  override def outputPartitioning: Partitioning = joinType match {
+case FullOuter =>
+  // when doing Full Outer join, NULL rows from both sides are not so 
partitioned.
+  UnknownPartitioning(streamed.outputPartitioning.numPartitions)
+case _ => streamed.outputPartitioning
--- End diff --

I doubt it...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-111569760
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-111569749
  
  [Test build #34777 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/34777/console)
 for   PR 5717 at commit 
[`add49a2`](https://github.com/apache/spark/commit/add49a241ef96517b5cdcb02fde72d27185722e9).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32330738
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -84,84 +129,164 @@ case class SortMergeJoin(
 
 override final def next(): Row = {
   if (hasNext) {
-// we are using the buffered right rows and run down left 
iterator
-val joinedRow = joinRow(leftElement, 
rightMatches(rightPosition))
-rightPosition += 1
-if (rightPosition >= rightMatches.size) {
-  rightPosition = 0
-  fetchLeft()
-  if (leftElement == null || keyOrdering.compare(leftKey, 
matchKey) != 0) {
-stop = false
-rightMatches = null
+if (bufferedMatches == null || bufferedMatches.size == 0) {
+  // we just found a row with no join match and we are here to 
produce a row
+  // with this row with a standard null row from the other 
side.
+  if (continueStreamed) {
+val joinedRow = smartJoinRow(streamedElement, 
bufferedNullRow.copy())
+fetchStreamed()
+joinedRow
+  } else {
+val joinedRow = smartJoinRow(streamedNullRow.copy(), 
bufferedElement)
+fetchBuffered()
+joinedRow
+  }
+} else {
+  // we are using the buffered right rows and run down left 
iterator
+  val joinedRow = smartJoinRow(streamedElement, 
bufferedMatches(bufferedPosition))
+  bufferedPosition += 1
+  if (bufferedPosition >= bufferedMatches.size) {
+bufferedPosition = 0
+if (joinType != FullOuter || secondStreamedElement == 
null) {
+  fetchStreamed()
+  if (streamedElement == null || 
keyOrdering.compare(streamedKey, matchKey) != 0) {
+stop = false
+bufferedMatches = null
+  }
+} else {
+  // in FullOuter join and the first time we finish the 
match buffer,
+  // we still want to generate all rows with streamed null 
row and buffered
+  // rows that match the join key but not the conditions.
+  streamedElement = secondStreamedElement
+  bufferedMatches = secondBufferedMatches
+  secondStreamedElement = null
+  secondBufferedMatches = null
+}
   }
+  joinedRow
 }
-joinedRow
   } else {
 // no more result
 throw new NoSuchElementException
   }
 }
 
-private def fetchLeft() = {
-  if (leftIter.hasNext) {
-leftElement = leftIter.next()
-leftKey = leftKeyGenerator(leftElement)
+private def smartJoinRow(streamedRow: Row, bufferedRow: Row): Row 
= joinType match {
+  case RightOuter => joinRow(bufferedRow, streamedRow)
+  case _ => joinRow(streamedRow, bufferedRow)
+}
+
+private def fetchStreamed() = {
+  if (streamedIter.hasNext) {
+streamedElement = streamedIter.next()
+streamedKey = streamedKeyGenerator(streamedElement)
   } else {
-leftElement = null
+streamedElement = null
   }
 }
 
-private def fetchRight() = {
-  if (rightIter.hasNext) {
-rightElement = rightIter.next()
-rightKey = rightKeyGenerator(rightElement)
+private def fetchBuffered() = {
+  if (bufferedIter.hasNext) {
+bufferedElement = bufferedIter.next()
+bufferedKey = bufferedKeyGenerator(bufferedElement)
   } else {
-rightElement = null
+bufferedElement = null
   }
 }
 
 private def initialize() = {
-  fetchLeft()
-  fetchRight()
+  fetchStreamed()
+  fetchBuffered()
 }
 
 /**
  * Searches the right iterator for the next rows that have matches 
in left side, and store
  * them in a buffer.
+ * When this is not a Inner join, we will also return true when we 
get a row with no match
+ * on the other side. This search will jump out every time from 
the same position until
+ * `next()` is called.
  *
  * @return true if the search 

[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32330289
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -84,84 +129,164 @@ case class SortMergeJoin(
 
 override final def next(): Row = {
   if (hasNext) {
-// we are using the buffered right rows and run down left 
iterator
-val joinedRow = joinRow(leftElement, 
rightMatches(rightPosition))
-rightPosition += 1
-if (rightPosition >= rightMatches.size) {
-  rightPosition = 0
-  fetchLeft()
-  if (leftElement == null || keyOrdering.compare(leftKey, 
matchKey) != 0) {
-stop = false
-rightMatches = null
+if (bufferedMatches == null || bufferedMatches.size == 0) {
+  // we just found a row with no join match and we are here to 
produce a row
+  // with this row with a standard null row from the other 
side.
+  if (continueStreamed) {
+val joinedRow = smartJoinRow(streamedElement, 
bufferedNullRow.copy())
+fetchStreamed()
+joinedRow
+  } else {
+val joinedRow = smartJoinRow(streamedNullRow.copy(), 
bufferedElement)
+fetchBuffered()
+joinedRow
+  }
+} else {
+  // we are using the buffered right rows and run down left 
iterator
+  val joinedRow = smartJoinRow(streamedElement, 
bufferedMatches(bufferedPosition))
+  bufferedPosition += 1
+  if (bufferedPosition >= bufferedMatches.size) {
+bufferedPosition = 0
+if (joinType != FullOuter || secondStreamedElement == 
null) {
+  fetchStreamed()
+  if (streamedElement == null || 
keyOrdering.compare(streamedKey, matchKey) != 0) {
+stop = false
+bufferedMatches = null
+  }
+} else {
+  // in FullOuter join and the first time we finish the 
match buffer,
+  // we still want to generate all rows with streamed null 
row and buffered
+  // rows that match the join key but not the conditions.
+  streamedElement = secondStreamedElement
+  bufferedMatches = secondBufferedMatches
+  secondStreamedElement = null
+  secondBufferedMatches = null
+}
   }
+  joinedRow
 }
-joinedRow
   } else {
 // no more result
 throw new NoSuchElementException
   }
 }
 
-private def fetchLeft() = {
-  if (leftIter.hasNext) {
-leftElement = leftIter.next()
-leftKey = leftKeyGenerator(leftElement)
+private def smartJoinRow(streamedRow: Row, bufferedRow: Row): Row 
= joinType match {
+  case RightOuter => joinRow(bufferedRow, streamedRow)
+  case _ => joinRow(streamedRow, bufferedRow)
+}
+
+private def fetchStreamed() = {
+  if (streamedIter.hasNext) {
+streamedElement = streamedIter.next()
+streamedKey = streamedKeyGenerator(streamedElement)
   } else {
-leftElement = null
+streamedElement = null
   }
 }
 
-private def fetchRight() = {
-  if (rightIter.hasNext) {
-rightElement = rightIter.next()
-rightKey = rightKeyGenerator(rightElement)
+private def fetchBuffered() = {
+  if (bufferedIter.hasNext) {
+bufferedElement = bufferedIter.next()
+bufferedKey = bufferedKeyGenerator(bufferedElement)
   } else {
-rightElement = null
+bufferedElement = null
   }
 }
 
 private def initialize() = {
-  fetchLeft()
-  fetchRight()
+  fetchStreamed()
+  fetchBuffered()
 }
 
 /**
  * Searches the right iterator for the next rows that have matches 
in left side, and store
  * them in a buffer.
+ * When this is not a Inner join, we will also return true when we 
get a row with no match
+ * on the other side. This search will jump out every time from 
the same position until
+ * `next()` is called.
  *
  * @return true if the search 

[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32330049
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -84,84 +129,164 @@ case class SortMergeJoin(
 
 override final def next(): Row = {
   if (hasNext) {
-// we are using the buffered right rows and run down left 
iterator
-val joinedRow = joinRow(leftElement, 
rightMatches(rightPosition))
-rightPosition += 1
-if (rightPosition >= rightMatches.size) {
-  rightPosition = 0
-  fetchLeft()
-  if (leftElement == null || keyOrdering.compare(leftKey, 
matchKey) != 0) {
-stop = false
-rightMatches = null
+if (bufferedMatches == null || bufferedMatches.size == 0) {
+  // we just found a row with no join match and we are here to 
produce a row
+  // with this row with a standard null row from the other 
side.
+  if (continueStreamed) {
+val joinedRow = smartJoinRow(streamedElement, 
bufferedNullRow.copy())
+fetchStreamed()
+joinedRow
+  } else {
+val joinedRow = smartJoinRow(streamedNullRow.copy(), 
bufferedElement)
+fetchBuffered()
+joinedRow
+  }
+} else {
+  // we are using the buffered right rows and run down left 
iterator
+  val joinedRow = smartJoinRow(streamedElement, 
bufferedMatches(bufferedPosition))
+  bufferedPosition += 1
+  if (bufferedPosition >= bufferedMatches.size) {
+bufferedPosition = 0
+if (joinType != FullOuter || secondStreamedElement == 
null) {
+  fetchStreamed()
+  if (streamedElement == null || 
keyOrdering.compare(streamedKey, matchKey) != 0) {
+stop = false
+bufferedMatches = null
+  }
+} else {
+  // in FullOuter join and the first time we finish the 
match buffer,
+  // we still want to generate all rows with streamed null 
row and buffered
+  // rows that match the join key but not the conditions.
+  streamedElement = secondStreamedElement
+  bufferedMatches = secondBufferedMatches
+  secondStreamedElement = null
+  secondBufferedMatches = null
+}
   }
+  joinedRow
 }
-joinedRow
   } else {
 // no more result
 throw new NoSuchElementException
   }
 }
 
-private def fetchLeft() = {
-  if (leftIter.hasNext) {
-leftElement = leftIter.next()
-leftKey = leftKeyGenerator(leftElement)
+private def smartJoinRow(streamedRow: Row, bufferedRow: Row): Row 
= joinType match {
+  case RightOuter => joinRow(bufferedRow, streamedRow)
+  case _ => joinRow(streamedRow, bufferedRow)
+}
+
+private def fetchStreamed() = {
+  if (streamedIter.hasNext) {
+streamedElement = streamedIter.next()
+streamedKey = streamedKeyGenerator(streamedElement)
   } else {
-leftElement = null
+streamedElement = null
   }
 }
 
-private def fetchRight() = {
-  if (rightIter.hasNext) {
-rightElement = rightIter.next()
-rightKey = rightKeyGenerator(rightElement)
+private def fetchBuffered() = {
--- End diff --

Same here: we should add `: Unit = ` if we always ignore the return value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32330019
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -84,84 +129,164 @@ case class SortMergeJoin(
 
 override final def next(): Row = {
   if (hasNext) {
-// we are using the buffered right rows and run down left 
iterator
-val joinedRow = joinRow(leftElement, 
rightMatches(rightPosition))
-rightPosition += 1
-if (rightPosition >= rightMatches.size) {
-  rightPosition = 0
-  fetchLeft()
-  if (leftElement == null || keyOrdering.compare(leftKey, 
matchKey) != 0) {
-stop = false
-rightMatches = null
+if (bufferedMatches == null || bufferedMatches.size == 0) {
+  // we just found a row with no join match and we are here to 
produce a row
+  // with this row with a standard null row from the other 
side.
+  if (continueStreamed) {
+val joinedRow = smartJoinRow(streamedElement, 
bufferedNullRow.copy())
+fetchStreamed()
+joinedRow
+  } else {
+val joinedRow = smartJoinRow(streamedNullRow.copy(), 
bufferedElement)
+fetchBuffered()
+joinedRow
+  }
+} else {
+  // we are using the buffered right rows and run down left 
iterator
+  val joinedRow = smartJoinRow(streamedElement, 
bufferedMatches(bufferedPosition))
+  bufferedPosition += 1
+  if (bufferedPosition >= bufferedMatches.size) {
+bufferedPosition = 0
+if (joinType != FullOuter || secondStreamedElement == 
null) {
+  fetchStreamed()
+  if (streamedElement == null || 
keyOrdering.compare(streamedKey, matchKey) != 0) {
+stop = false
+bufferedMatches = null
+  }
+} else {
+  // in FullOuter join and the first time we finish the 
match buffer,
+  // we still want to generate all rows with streamed null 
row and buffered
+  // rows that match the join key but not the conditions.
+  streamedElement = secondStreamedElement
+  bufferedMatches = secondBufferedMatches
+  secondStreamedElement = null
+  secondBufferedMatches = null
+}
   }
+  joinedRow
 }
-joinedRow
   } else {
 // no more result
 throw new NoSuchElementException
   }
 }
 
-private def fetchLeft() = {
-  if (leftIter.hasNext) {
-leftElement = leftIter.next()
-leftKey = leftKeyGenerator(leftElement)
+private def smartJoinRow(streamedRow: Row, bufferedRow: Row): Row 
= joinType match {
+  case RightOuter => joinRow(bufferedRow, streamedRow)
+  case _ => joinRow(streamedRow, bufferedRow)
+}
+
+private def fetchStreamed() = {
--- End diff --

It looks like the return value of `fetchStreamed` isn't used anywhere, so 
can you add a `: Unit = ` to this to make that clear?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32329757
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -84,84 +129,164 @@ case class SortMergeJoin(
 
 override final def next(): Row = {
   if (hasNext) {
-// we are using the buffered right rows and run down left 
iterator
-val joinedRow = joinRow(leftElement, 
rightMatches(rightPosition))
-rightPosition += 1
-if (rightPosition >= rightMatches.size) {
-  rightPosition = 0
-  fetchLeft()
-  if (leftElement == null || keyOrdering.compare(leftKey, 
matchKey) != 0) {
-stop = false
-rightMatches = null
+if (bufferedMatches == null || bufferedMatches.size == 0) {
+  // we just found a row with no join match and we are here to 
produce a row
+  // with this row with a standard null row from the other 
side.
--- End diff --

Typo?  Maybe this meant to read "with this row _and_ a standard null row"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32327932
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -84,84 +129,164 @@ case class SortMergeJoin(
 
 override final def next(): Row = {
   if (hasNext) {
-// we are using the buffered right rows and run down left 
iterator
-val joinedRow = joinRow(leftElement, 
rightMatches(rightPosition))
-rightPosition += 1
-if (rightPosition >= rightMatches.size) {
-  rightPosition = 0
-  fetchLeft()
-  if (leftElement == null || keyOrdering.compare(leftKey, 
matchKey) != 0) {
-stop = false
-rightMatches = null
+if (bufferedMatches == null || bufferedMatches.size == 0) {
+  // we just found a row with no join match and we are here to 
produce a row
+  // with this row with a standard null row from the other 
side.
+  if (continueStreamed) {
+val joinedRow = smartJoinRow(streamedElement, 
bufferedNullRow.copy())
--- End diff --

Why do we have to copy the standard null rows?  Are we worried about 
downstream operators mutating them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32327674
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer
 case class SortMergeJoin(
 leftKeys: Seq[Expression],
 rightKeys: Seq[Expression],
+joinType: JoinType,
 left: SparkPlan,
-right: SparkPlan) extends BinaryNode {
+right: SparkPlan,
+condition: Option[Expression] = None) extends BinaryNode {
 
-  override def output: Seq[Attribute] = left.output ++ right.output
+  val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match {
+case RightOuter => (right, left, rightKeys, leftKeys)
+case _ => (left, right, leftKeys, rightKeys)
+  }
+
+  override def output: Seq[Attribute] = joinType match {
+case Inner =>
+  left.output ++ right.output
+case LeftOuter =>
+  left.output ++ right.output.map(_.withNullability(true))
+case RightOuter =>
+  left.output.map(_.withNullability(true)) ++ right.output
+case FullOuter =>
+  left.output.map(_.withNullability(true)) ++ 
right.output.map(_.withNullability(true))
+case x =>
+  throw new IllegalStateException(s"SortMergeJoin should not take $x 
as the JoinType")
+  }
 
-  override def outputPartitioning: Partitioning = left.outputPartitioning
+  override def outputPartitioning: Partitioning = joinType match {
+case FullOuter =>
+  // when doing Full Outer join, NULL rows from both sides are not so 
partitioned.
+  UnknownPartitioning(streamed.outputPartitioning.numPartitions)
+case _ => streamed.outputPartitioning
--- End diff --

Both `streamed` and `buffered` have the same output partitioning, since 
we've co-partitioned them in order to perform the join? Just want to 
double-check my understanding.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-111532078
  
  [Test build #34777 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/34777/consoleFull)
 for   PR 5717 at commit 
[`add49a2`](https://github.com/apache/spark/commit/add49a241ef96517b5cdcb02fde72d27185722e9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32327276
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer
 case class SortMergeJoin(
 leftKeys: Seq[Expression],
 rightKeys: Seq[Expression],
+joinType: JoinType,
 left: SparkPlan,
-right: SparkPlan) extends BinaryNode {
+right: SparkPlan,
+condition: Option[Expression] = None) extends BinaryNode {
 
-  override def output: Seq[Attribute] = left.output ++ right.output
+  val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match {
+case RightOuter => (right, left, rightKeys, leftKeys)
+case _ => (left, right, leftKeys, rightKeys)
+  }
+
+  override def output: Seq[Attribute] = joinType match {
+case Inner =>
+  left.output ++ right.output
+case LeftOuter =>
+  left.output ++ right.output.map(_.withNullability(true))
+case RightOuter =>
+  left.output.map(_.withNullability(true)) ++ right.output
+case FullOuter =>
+  left.output.map(_.withNullability(true)) ++ 
right.output.map(_.withNullability(true))
+case x =>
+  throw new IllegalStateException(s"SortMergeJoin should not take $x 
as the JoinType")
+  }
 
-  override def outputPartitioning: Partitioning = left.outputPartitioning
+  override def outputPartitioning: Partitioning = joinType match {
+case FullOuter =>
+  // when doing Full Outer join, NULL rows from both sides are not so 
partitioned.
+  UnknownPartitioning(streamed.outputPartitioning.numPartitions)
+case _ => streamed.outputPartitioning
+  }
 
   override def requiredChildDistribution: Seq[Distribution] =
 ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
 
   // this is to manually construct an ordering that can be used to compare 
keys from both sides
-  private val keyOrdering: RowOrdering = 
RowOrdering.forSchema(leftKeys.map(_.dataType))
+  private val keyOrdering: RowOrdering = 
RowOrdering.forSchema(streamedKeys.map(_.dataType))
 
-  override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys)
+  override def outputOrdering: Seq[SortOrder] = joinType match {
+case FullOuter => Nil // when doing Full Outer join, NULL rows from 
both sides are not ordered.
+case _ => requiredOrders(streamedKeys)
+  }
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] =
 requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
 
-  @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, 
left.output)
-  @transient protected lazy val rightKeyGenerator = 
newProjection(rightKeys, right.output)
+  @transient protected lazy val streamedKeyGenerator = 
newProjection(streamedKeys, streamed.output)
+  @transient protected lazy val bufferedKeyGenerator = 
newProjection(bufferedKeys, buffered.output)
+
+  // standard null rows
+  @transient private[this] lazy val streamedNullRow = new 
GenericRow(streamed.output.length)
--- End diff --

Can we move `streamedNullRow` and `bufferedNullRow` into the 
`zipPartitions` call so that we don't have to make a `transient lazy val`?  I 
find `transient lazy val` to be a bit confusing and like to avoid it when I can.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-111531521
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/5717#discussion_r32327111
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 ---
@@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer
 case class SortMergeJoin(
 leftKeys: Seq[Expression],
 rightKeys: Seq[Expression],
+joinType: JoinType,
 left: SparkPlan,
-right: SparkPlan) extends BinaryNode {
+right: SparkPlan,
+condition: Option[Expression] = None) extends BinaryNode {
 
-  override def output: Seq[Attribute] = left.output ++ right.output
+  val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match {
+case RightOuter => (right, left, rightKeys, leftKeys)
+case _ => (left, right, leftKeys, rightKeys)
+  }
+
+  override def output: Seq[Attribute] = joinType match {
+case Inner =>
+  left.output ++ right.output
+case LeftOuter =>
+  left.output ++ right.output.map(_.withNullability(true))
+case RightOuter =>
+  left.output.map(_.withNullability(true)) ++ right.output
+case FullOuter =>
+  left.output.map(_.withNullability(true)) ++ 
right.output.map(_.withNullability(true))
+case x =>
+  throw new IllegalStateException(s"SortMergeJoin should not take $x 
as the JoinType")
+  }
 
-  override def outputPartitioning: Partitioning = left.outputPartitioning
+  override def outputPartitioning: Partitioning = joinType match {
+case FullOuter =>
+  // when doing Full Outer join, NULL rows from both sides are not so 
partitioned.
+  UnknownPartitioning(streamed.outputPartitioning.numPartitions)
+case _ => streamed.outputPartitioning
+  }
 
   override def requiredChildDistribution: Seq[Distribution] =
 ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
 
   // this is to manually construct an ordering that can be used to compare 
keys from both sides
-  private val keyOrdering: RowOrdering = 
RowOrdering.forSchema(leftKeys.map(_.dataType))
+  private val keyOrdering: RowOrdering = 
RowOrdering.forSchema(streamedKeys.map(_.dataType))
 
-  override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys)
+  override def outputOrdering: Seq[SortOrder] = joinType match {
+case FullOuter => Nil // when doing Full Outer join, NULL rows from 
both sides are not ordered.
+case _ => requiredOrders(streamedKeys)
+  }
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] =
 requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
 
-  @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, 
left.output)
-  @transient protected lazy val rightKeyGenerator = 
newProjection(rightKeys, right.output)
+  @transient protected lazy val streamedKeyGenerator = 
newProjection(streamedKeys, streamed.output)
+  @transient protected lazy val bufferedKeyGenerator = 
newProjection(bufferedKeys, buffered.output)
+
+  // standard null rows
+  @transient private[this] lazy val streamedNullRow = new 
GenericRow(streamed.output.length)
+  @transient private[this] lazy val bufferedNullRow = new 
GenericRow(buffered.output.length)
+
+  // checks if the joinedRow can meet condition requirements
+  @transient private[this] lazy val boundCondition =
+condition.map(newPredicate(_, streamed.output ++ 
buffered.output)).getOrElse((row: Row) => true)
 
   private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
 keys.map(SortOrder(_, Ascending))
 
   protected override def doExecute(): RDD[Row] = {
-val leftResults = left.execute().map(_.copy())
-val rightResults = right.execute().map(_.copy())
+val streamResults = streamed.execute().map(_.copy())
--- End diff --

Why do we need to copy the streamed rows?  I understand why we need to do 
the copy for the buffered results, since we might be dealing with mutable input 
rows, but that shouldn't be a problem for the stream side, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-111531490
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-06-12 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-111531231
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-05-28 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-106680748
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...

2015-05-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5717#issuecomment-106680745
  
  [Test build #33707 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/33707/consoleFull)
 for   PR 5717 at commit 
[`add49a2`](https://github.com/apache/spark/commit/add49a241ef96517b5cdcb02fde72d27185722e9).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >