[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/5717 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r36493358 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -62,111 +100,250 @@ case class SortMergeJoin( } protected override def doExecute(): RDD[InternalRow] = { -val leftResults = left.execute().map(_.copy()) -val rightResults = right.execute().map(_.copy()) +val streamResults = streamedPlan.execute().map(_.copy()) +val bufferResults = bufferedPlan.execute().map(_.copy()) -leftResults.zipPartitions(rightResults) { (leftIter, rightIter) => +streamResults.zipPartitions(bufferResults) ( (streamedIter, bufferedIter) => { + // standard null rows + val streamedNullRow = InternalRow.fromSeq(Seq.fill(streamedPlan.output.length)(null)) + val bufferedNullRow = InternalRow.fromSeq(Seq.fill(bufferedPlan.output.length)(null)) new Iterator[InternalRow] { // An ordering that can be used to compare keys from both sides. private[this] val keyOrdering = newNaturalAscendingOrdering(leftKeys.map(_.dataType)) // Mutable per row objects. private[this] val joinRow = new JoinedRow -private[this] var leftElement: InternalRow = _ -private[this] var rightElement: InternalRow = _ -private[this] var leftKey: InternalRow = _ -private[this] var rightKey: InternalRow = _ -private[this] var rightMatches: CompactBuffer[InternalRow] = _ -private[this] var rightPosition: Int = -1 +private[this] var streamedElement: InternalRow = _ +private[this] var bufferedElement: InternalRow = _ +private[this] var streamedKey: InternalRow = _ +private[this] var bufferedKey: InternalRow = _ +private[this] var bufferedMatches: CompactBuffer[InternalRow] = _ +private[this] var bufferedPosition: Int = -1 private[this] var stop: Boolean = false private[this] var matchKey: InternalRow = _ +// when we do merge algorithm and find some not matched join key, there must be a side +// that do not have a corresponding match. So we need to mark which side it is. True means +// streamed side not have match, and False means the buffered side. Only set when needed. +private[this] var continueStreamed: Boolean = _ +private[this] var streamNullGenerated: Boolean = false +// Tracks if each element in bufferedMatches have a matched streamedElement. +private[this] var bitSet: BitSet = _ +// marks if the found result has been fetched. +private[this] var found: Boolean = false +private[this] var bufferNullGenerated: Boolean = false // initialize iterator initialize() -override final def hasNext: Boolean = nextMatchingPair() +override final def hasNext: Boolean = { + val matching = nextMatchingBlock() + if (matching && !isBufferEmpty(bufferedMatches)) { +// The buffer stores all rows that match key, but condition may not be matched. +// If none of rows in the buffer match condition, we'll fetch next matching block. +findNextInBuffer() || hasNext + } else { +matching + } +} + +/** + * Run down the current `bufferedMatches` to find rows that match conditions. + * If `joinType` is not `Inner`, we will use `bufferNullGenerated` to mark if + * we need to build a bufferedNullRow for result. + * If `joinType` is `FullOuter`, we will use `streamNullGenerated` to mark if + * a buffered element need to join with a streamedNullRow. + * The method can be called multiple times since `found` serves as a guardian. + */ +def findNextInBuffer(): Boolean = { + while (!found && streamedElement != null +&& keyOrdering.compare(streamedKey, matchKey) == 0) { +while (bufferedPosition < bufferedMatches.size && !boundCondition( + joinRow(streamedElement, bufferedMatches(bufferedPosition { + bufferedPosition += 1 +} +if (bufferedPosition == bufferedMatches.size) { + if (joinType == Inner || bufferNullGenerated) { +bufferNullGenerated = false +bufferedPosition = 0 +fetchStreamed() + } else { +found = true + } +} else { + // mark as true so we don't generate null row for streamed row. +
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r36493188 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -62,111 +100,250 @@ case class SortMergeJoin( } protected override def doExecute(): RDD[InternalRow] = { -val leftResults = left.execute().map(_.copy()) -val rightResults = right.execute().map(_.copy()) +val streamResults = streamedPlan.execute().map(_.copy()) +val bufferResults = bufferedPlan.execute().map(_.copy()) -leftResults.zipPartitions(rightResults) { (leftIter, rightIter) => +streamResults.zipPartitions(bufferResults) ( (streamedIter, bufferedIter) => { + // standard null rows + val streamedNullRow = InternalRow.fromSeq(Seq.fill(streamedPlan.output.length)(null)) + val bufferedNullRow = InternalRow.fromSeq(Seq.fill(bufferedPlan.output.length)(null)) new Iterator[InternalRow] { // An ordering that can be used to compare keys from both sides. private[this] val keyOrdering = newNaturalAscendingOrdering(leftKeys.map(_.dataType)) // Mutable per row objects. private[this] val joinRow = new JoinedRow -private[this] var leftElement: InternalRow = _ -private[this] var rightElement: InternalRow = _ -private[this] var leftKey: InternalRow = _ -private[this] var rightKey: InternalRow = _ -private[this] var rightMatches: CompactBuffer[InternalRow] = _ -private[this] var rightPosition: Int = -1 +private[this] var streamedElement: InternalRow = _ +private[this] var bufferedElement: InternalRow = _ +private[this] var streamedKey: InternalRow = _ +private[this] var bufferedKey: InternalRow = _ +private[this] var bufferedMatches: CompactBuffer[InternalRow] = _ +private[this] var bufferedPosition: Int = -1 private[this] var stop: Boolean = false private[this] var matchKey: InternalRow = _ +// when we do merge algorithm and find some not matched join key, there must be a side +// that do not have a corresponding match. So we need to mark which side it is. True means +// streamed side not have match, and False means the buffered side. Only set when needed. +private[this] var continueStreamed: Boolean = _ +private[this] var streamNullGenerated: Boolean = false +// Tracks if each element in bufferedMatches have a matched streamedElement. +private[this] var bitSet: BitSet = _ +// marks if the found result has been fetched. +private[this] var found: Boolean = false +private[this] var bufferNullGenerated: Boolean = false // initialize iterator initialize() -override final def hasNext: Boolean = nextMatchingPair() +override final def hasNext: Boolean = { + val matching = nextMatchingBlock() + if (matching && !isBufferEmpty(bufferedMatches)) { +// The buffer stores all rows that match key, but condition may not be matched. +// If none of rows in the buffer match condition, we'll fetch next matching block. +findNextInBuffer() || hasNext + } else { +matching + } +} + +/** + * Run down the current `bufferedMatches` to find rows that match conditions. + * If `joinType` is not `Inner`, we will use `bufferNullGenerated` to mark if + * we need to build a bufferedNullRow for result. + * If `joinType` is `FullOuter`, we will use `streamNullGenerated` to mark if + * a buffered element need to join with a streamedNullRow. + * The method can be called multiple times since `found` serves as a guardian. + */ +def findNextInBuffer(): Boolean = { + while (!found && streamedElement != null +&& keyOrdering.compare(streamedKey, matchKey) == 0) { +while (bufferedPosition < bufferedMatches.size && !boundCondition( + joinRow(streamedElement, bufferedMatches(bufferedPosition { + bufferedPosition += 1 +} +if (bufferedPosition == bufferedMatches.size) { + if (joinType == Inner || bufferNullGenerated) { +bufferNullGenerated = false +bufferedPosition = 0 +fetchStreamed() + } else { +found = true + } +} else { + // mark as true so we don't generate null row for streamed row. +
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-128239391 [Test build #39983 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39983/console) for PR 5717 at commit [`d02f6bb`](https://github.com/apache/spark/commit/d02f6bbab969a20e8a3cd9d6b065db39462d6ff5). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-128239606 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-128224368 [Test build #39983 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39983/consoleFull) for PR 5717 at commit [`d02f6bb`](https://github.com/apache/spark/commit/d02f6bbab969a20e8a3cd9d6b065db39462d6ff5). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-128224165 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-128224169 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127908732 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127908730 [Test build #39844 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39844/console) for PR 5717 at commit [`549796e`](https://github.com/apache/spark/commit/549796eba511316fd43d3cecdfb289665c0027bc). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127907896 [Test build #227 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SlowSparkPullRequestBuilder/227/console) for PR 5717 at commit [`549796e`](https://github.com/apache/spark/commit/549796eba511316fd43d3cecdfb289665c0027bc). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127907901 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127906716 [Test build #39844 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39844/consoleFull) for PR 5717 at commit [`549796e`](https://github.com/apache/spark/commit/549796eba511316fd43d3cecdfb289665c0027bc). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127906326 [Test build #227 has started](https://amplab.cs.berkeley.edu/jenkins/job/SlowSparkPullRequestBuilder/227/consoleFull) for PR 5717 at commit [`549796e`](https://github.com/apache/spark/commit/549796eba511316fd43d3cecdfb289665c0027bc). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127906214 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127906184 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127905222 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127905246 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user adrian-wang commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127905132 retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127900058 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127899918 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127899891 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r36238964 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala --- @@ -152,16 +149,14 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { ctx.cacheManager.clearCache() ctx.sql("CACHE TABLE testData") -val SORTMERGEJOIN_ENABLED: Boolean = ctx.conf.sortMergeJoinEnabled Seq( - ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[ShuffledHashOuterJoin]), + ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeJoin]), ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2", -classOf[BroadcastHashOuterJoin]), --- End diff -- It looks like this patch causes us to plan SortMergeJoin for outer joins that are capable of using BroadcastHashOuterJoin, which seems like it could lead to performance issues by triggering unnecessary shuffling of the large table. As a result, I think that we should not change the broadcast-enabled half of the test, but, rather, should update the broadcast-disabled half to test both the sort-merge-join enabled and sort-merge-join-disabled configurations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127723655 Also, I think that it might be a little clearer to introduce a separate `SortMergeOuterJoin` operator rather than trying to combine the inner and outer joins into the same operator. This would be consistent with what we've done for other joins. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127713743 We should have a test to guard against reintroduction of the the bug that @jeanlyn mentioned. I find the code here to be really dense and hard to understand, so I'd like to try to pursue my design first. There's another 1.5 blocker / critical related to eliminating JoinedRow in favor of Tungsten's RowJoiner when UnsafeRows are used, and I think that the code re-use enabled by my design will make this significantly easier to accomplish. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user adrian-wang commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127560250 @JoshRosen I've fixed the bug that @jeanlyn mentioned, can you merge this first and then do the following steps in #7904 ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127558998 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127558897 [Test build #39689 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39689/console) for PR 5717 at commit [`bff834a`](https://github.com/apache/spark/commit/bff834aa11d074f387c33ffe6bee3acf670be2ac). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127513516 [Test build #39689 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39689/consoleFull) for PR 5717 at commit [`bff834a`](https://github.com/apache/spark/commit/bff834aa11d074f387c33ffe6bee3acf670be2ac). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127513075 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127513050 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127508758 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127508742 [Test build #39678 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39678/console) for PR 5717 at commit [`d0e65c5`](https://github.com/apache/spark/commit/d0e65c59019e80440ad90532d074863697ec46d4). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127505435 [Test build #39678 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39678/consoleFull) for PR 5717 at commit [`d0e65c5`](https://github.com/apache/spark/commit/d0e65c59019e80440ad90532d074863697ec46d4). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127504700 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-127504682 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126581760 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126581734 [Test build #39159 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39159/console) for PR 5717 at commit [`f520079`](https://github.com/apache/spark/commit/f5200790e1f5ddf4d3cb2cb68dfdfff45473aaf4). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126569073 [Test build #39159 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39159/consoleFull) for PR 5717 at commit [`f520079`](https://github.com/apache/spark/commit/f5200790e1f5ddf4d3cb2cb68dfdfff45473aaf4). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126568863 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126568916 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126219174 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126219170 [Test build #39030 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39030/console) for PR 5717 at commit [`fd73084`](https://github.com/apache/spark/commit/fd73084f40a6eb74fab98d062c13c54e049ef055). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126218775 [Test build #39030 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39030/consoleFull) for PR 5717 at commit [`fd73084`](https://github.com/apache/spark/commit/fd73084f40a6eb74fab98d062c13c54e049ef055). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126218039 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126218132 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126213835 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126213811 [Test build #39024 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39024/console) for PR 5717 at commit [`4152e86`](https://github.com/apache/spark/commit/4152e864a19490ee6c5db7f9d3589fed7693efde). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126209436 [Test build #39024 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39024/consoleFull) for PR 5717 at commit [`4152e86`](https://github.com/apache/spark/commit/4152e864a19490ee6c5db7f9d3589fed7693efde). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126208727 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126208711 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126005227 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126005211 [Test build #38839 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38839/console) for PR 5717 at commit [`07f96b5`](https://github.com/apache/spark/commit/07f96b5e9a0d2a79498589976f287f38eacbd11e). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-126002674 [Test build #38839 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38839/consoleFull) for PR 5717 at commit [`07f96b5`](https://github.com/apache/spark/commit/07f96b5e9a0d2a79498589976f287f38eacbd11e). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-12555 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-125921405 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-125860123 Yup I will work on this myself tomorrow based on this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-125859852 /bump, do you think that we'll have time to finish this for 1.5.0? /cc @rxin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user adrian-wang commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r33010716 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -82,86 +130,169 @@ case class SortMergeJoin( override final def next(): InternalRow = { if (hasNext) { -// we are using the buffered right rows and run down left iterator -val joinedRow = joinRow(leftElement, rightMatches(rightPosition)) -rightPosition += 1 -if (rightPosition >= rightMatches.size) { - rightPosition = 0 - fetchLeft() - if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) { -stop = false -rightMatches = null +if (bufferedMatches == null || bufferedMatches.size == 0) { + // we just found a row with no join match and we are here to produce a row + // with this row and a standard null row from the other side. + if (continueStreamed) { +val joinedRow = smartJoinRow(streamedElement, bufferedNullRow) +fetchStreamed() +joinedRow + } else { +val joinedRow = smartJoinRow(streamedNullRow, bufferedElement) +fetchBuffered() +joinedRow + } +} else { + // we are using the buffered right rows and run down left iterator + val joinedRow = smartJoinRow(streamedElement, bufferedMatches(bufferedPosition)) + bufferedPosition += 1 + if (bufferedPosition >= bufferedMatches.size) { +bufferedPosition = 0 +if (joinType != FullOuter || secondStreamedElement == null) { + fetchStreamed() --- End diff -- Good catch, I'll rewrite this part. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user jeanlyn commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32891298 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -82,86 +130,169 @@ case class SortMergeJoin( override final def next(): InternalRow = { if (hasNext) { -// we are using the buffered right rows and run down left iterator -val joinedRow = joinRow(leftElement, rightMatches(rightPosition)) -rightPosition += 1 -if (rightPosition >= rightMatches.size) { - rightPosition = 0 - fetchLeft() - if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) { -stop = false -rightMatches = null +if (bufferedMatches == null || bufferedMatches.size == 0) { + // we just found a row with no join match and we are here to produce a row + // with this row and a standard null row from the other side. + if (continueStreamed) { +val joinedRow = smartJoinRow(streamedElement, bufferedNullRow) +fetchStreamed() +joinedRow + } else { +val joinedRow = smartJoinRow(streamedNullRow, bufferedElement) +fetchBuffered() +joinedRow + } +} else { + // we are using the buffered right rows and run down left iterator + val joinedRow = smartJoinRow(streamedElement, bufferedMatches(bufferedPosition)) + bufferedPosition += 1 + if (bufferedPosition >= bufferedMatches.size) { +bufferedPosition = 0 +if (joinType != FullOuter || secondStreamedElement == null) { + fetchStreamed() + if (streamedElement == null || keyOrdering.compare(streamedKey, matchKey) != 0) { +stop = false +bufferedMatches = null + } +} else { + // in FullOuter join and the first time we finish the match buffer, + // we still want to generate all rows with streamed null row and buffered + // rows that match the join key but not the conditions. + streamedElement = secondStreamedElement + bufferedMatches = secondBufferedMatches + secondStreamedElement = null + secondBufferedMatches = null +} } + joinedRow } -joinedRow } else { // no more result throw new NoSuchElementException } } -private def fetchLeft() = { - if (leftIter.hasNext) { -leftElement = leftIter.next() -leftKey = leftKeyGenerator(leftElement) +private def smartJoinRow(streamedRow: InternalRow, bufferedRow: InternalRow): InternalRow = + joinType match { +case RightOuter => joinRow(bufferedRow, streamedRow) +case _ => joinRow(streamedRow, bufferedRow) + } + +private def fetchStreamed(): Unit = { + if (streamedIter.hasNext) { +streamedElement = streamedIter.next() +streamedKey = streamedKeyGenerator(streamedElement) } else { -leftElement = null +streamedElement = null } } -private def fetchRight() = { - if (rightIter.hasNext) { -rightElement = rightIter.next() -rightKey = rightKeyGenerator(rightElement) +private def fetchBuffered(): Unit = { + if (bufferedIter.hasNext) { +bufferedElement = bufferedIter.next() +bufferedKey = bufferedKeyGenerator(bufferedElement) } else { -rightElement = null +bufferedElement = null } } private def initialize() = { - fetchLeft() - fetchRight() + fetchStreamed() + fetchBuffered() } /** * Searches the right iterator for the next rows that have matches in left side, and store * them in a buffer. + * When this is not a Inner join, we will also return true when we get a row with no match + * on the other side. This search will jump out every time from the same position until + * `next()` is called. +
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user jeanlyn commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32891269 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -82,86 +130,169 @@ case class SortMergeJoin( override final def next(): InternalRow = { if (hasNext) { -// we are using the buffered right rows and run down left iterator -val joinedRow = joinRow(leftElement, rightMatches(rightPosition)) -rightPosition += 1 -if (rightPosition >= rightMatches.size) { - rightPosition = 0 - fetchLeft() - if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) { -stop = false -rightMatches = null +if (bufferedMatches == null || bufferedMatches.size == 0) { + // we just found a row with no join match and we are here to produce a row + // with this row and a standard null row from the other side. + if (continueStreamed) { +val joinedRow = smartJoinRow(streamedElement, bufferedNullRow) +fetchStreamed() +joinedRow + } else { +val joinedRow = smartJoinRow(streamedNullRow, bufferedElement) +fetchBuffered() +joinedRow + } +} else { + // we are using the buffered right rows and run down left iterator + val joinedRow = smartJoinRow(streamedElement, bufferedMatches(bufferedPosition)) + bufferedPosition += 1 + if (bufferedPosition >= bufferedMatches.size) { +bufferedPosition = 0 +if (joinType != FullOuter || secondStreamedElement == null) { + fetchStreamed() --- End diff -- I think we should use `boundCondition ` to update `bufferedMatches ` after we `fetchStreamed ()` .Otherwise we may get wrong answer.For example ```sql table a(key int,value int);table b(key int,value int) data of a 1 3 1 1 2 1 2 3 data of b 1 1 2 1 select a.key,b.key,a.value-b.value from a left outer join b on a.key=b.key and a.value - b.value > 1 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user jeanlyn commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32891271 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -90,13 +90,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft) - // If the sort merge join option is set, we want to use sort merge join prior to hashjoin - // for now let's support inner join first, then add outer join - case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) + // If the sort merge join option is set, we want to use sort merge join prior to hashjoin. + // And for outer join, we can not put conditions outside of the join + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if sqlContext.conf.sortMergeJoinEnabled => -val mergeJoin = - joins.SortMergeJoin(leftKeys, rightKeys, planLater(left), planLater(right)) -condition.map(Filter(_, mergeJoin)).getOrElse(mergeJoin) :: Nil +joins.SortMergeJoin( + leftKeys, rightKeys, joinType, planLater(left), planLater(right), condition) :: Nil --- End diff -- Shall we move the code to a new `Strategy `(like SortMergeJoin) instead of mix in `Hashjoin`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-113717701 Thanks for updating this; I'll try to take another review pass tomorrow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-113690424 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-113690410 [Test build #35337 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35337/console) for PR 5717 at commit [`211e101`](https://github.com/apache/spark/commit/211e1012dc28ed610d294d0678b1d5621a901e53). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class SerializableConfiguration(@transient var value: Configuration) extends Serializable ` * `class SerializableJobConf(@transient var value: JobConf) extends Serializable ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-113670244 [Test build #35337 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35337/consoleFull) for PR 5717 at commit [`211e101`](https://github.com/apache/spark/commit/211e1012dc28ed610d294d0678b1d5621a901e53). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-113670190 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-113670198 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user adrian-wang commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-113669761 retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-113408338 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-113408241 [Test build #35233 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35233/console) for PR 5717 at commit [`211e101`](https://github.com/apache/spark/commit/211e1012dc28ed610d294d0678b1d5621a901e53). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class SerializableConfiguration(@transient var value: Configuration) extends Serializable ` * `class SerializableJobConf(@transient var value: JobConf) extends Serializable ` * `class ElementwiseProduct(VectorTransformer):` * `case class CreateStruct(children: Seq[Expression]) extends Expression ` * `case class Sqrt(child: Expression) extends UnaryMathExpression(math.sqrt, "SQRT")` * `case class Logarithm(left: Expression, right: Expression)` * `case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableCommand with Logging ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-113390868 [Test build #35233 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35233/consoleFull) for PR 5717 at commit [`211e101`](https://github.com/apache/spark/commit/211e1012dc28ed610d294d0678b1d5621a901e53). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-113390452 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-113390374 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32803889 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer case class SortMergeJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], +joinType: JoinType, left: SparkPlan, -right: SparkPlan) extends BinaryNode { +right: SparkPlan, +condition: Option[Expression] = None) extends BinaryNode { - override def output: Seq[Attribute] = left.output ++ right.output + val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match { +case RightOuter => (right, left, rightKeys, leftKeys) +case _ => (left, right, leftKeys, rightKeys) + } + + override def output: Seq[Attribute] = joinType match { +case Inner => + left.output ++ right.output +case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) +case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output +case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) +case x => + throw new IllegalStateException(s"SortMergeJoin should not take $x as the JoinType") + } - override def outputPartitioning: Partitioning = left.outputPartitioning + override def outputPartitioning: Partitioning = joinType match { +case FullOuter => + // when doing Full Outer join, NULL rows from both sides are not so partitioned. + UnknownPartitioning(streamed.outputPartitioning.numPartitions) +case _ => streamed.outputPartitioning + } override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil // this is to manually construct an ordering that can be used to compare keys from both sides - private val keyOrdering: RowOrdering = RowOrdering.forSchema(leftKeys.map(_.dataType)) + private val keyOrdering: RowOrdering = RowOrdering.forSchema(streamedKeys.map(_.dataType)) - override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys) + override def outputOrdering: Seq[SortOrder] = joinType match { +case FullOuter => Nil // when doing Full Outer join, NULL rows from both sides are not ordered. +case _ => requiredOrders(streamedKeys) + } override def requiredChildOrdering: Seq[Seq[SortOrder]] = requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil - @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, left.output) - @transient protected lazy val rightKeyGenerator = newProjection(rightKeys, right.output) + @transient protected lazy val streamedKeyGenerator = newProjection(streamedKeys, streamed.output) + @transient protected lazy val bufferedKeyGenerator = newProjection(bufferedKeys, buffered.output) + + // standard null rows + @transient private[this] lazy val streamedNullRow = new GenericRow(streamed.output.length) + @transient private[this] lazy val bufferedNullRow = new GenericRow(buffered.output.length) + + // checks if the joinedRow can meet condition requirements + @transient private[this] lazy val boundCondition = +condition.map(newPredicate(_, streamed.output ++ buffered.output)).getOrElse((row: Row) => true) private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = keys.map(SortOrder(_, Ascending)) protected override def doExecute(): RDD[Row] = { -val leftResults = left.execute().map(_.copy()) -val rightResults = right.execute().map(_.copy()) +val streamResults = streamed.execute().map(_.copy()) --- End diff -- That's true for Left/Right Outer and even inner join, however, in full outer join, we probably need to cache the streamed row once, but you're right, we can do the copy whenever necessary during the iterating, not here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32802991 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer case class SortMergeJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], +joinType: JoinType, left: SparkPlan, -right: SparkPlan) extends BinaryNode { +right: SparkPlan, +condition: Option[Expression] = None) extends BinaryNode { - override def output: Seq[Attribute] = left.output ++ right.output + val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match { +case RightOuter => (right, left, rightKeys, leftKeys) +case _ => (left, right, leftKeys, rightKeys) + } + + override def output: Seq[Attribute] = joinType match { +case Inner => + left.output ++ right.output +case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) +case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output +case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) +case x => + throw new IllegalStateException(s"SortMergeJoin should not take $x as the JoinType") + } - override def outputPartitioning: Partitioning = left.outputPartitioning + override def outputPartitioning: Partitioning = joinType match { +case FullOuter => + // when doing Full Outer join, NULL rows from both sides are not so partitioned. + UnknownPartitioning(streamed.outputPartitioning.numPartitions) +case _ => streamed.outputPartitioning + } override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil // this is to manually construct an ordering that can be used to compare keys from both sides - private val keyOrdering: RowOrdering = RowOrdering.forSchema(leftKeys.map(_.dataType)) + private val keyOrdering: RowOrdering = RowOrdering.forSchema(streamedKeys.map(_.dataType)) - override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys) + override def outputOrdering: Seq[SortOrder] = joinType match { +case FullOuter => Nil // when doing Full Outer join, NULL rows from both sides are not ordered. +case _ => requiredOrders(streamedKeys) + } override def requiredChildOrdering: Seq[Seq[SortOrder]] = requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil - @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, left.output) - @transient protected lazy val rightKeyGenerator = newProjection(rightKeys, right.output) + @transient protected lazy val streamedKeyGenerator = newProjection(streamedKeys, streamed.output) + @transient protected lazy val bufferedKeyGenerator = newProjection(bufferedKeys, buffered.output) + + // standard null rows + @transient private[this] lazy val streamedNullRow = new GenericRow(streamed.output.length) + @transient private[this] lazy val bufferedNullRow = new GenericRow(buffered.output.length) + + // checks if the joinedRow can meet condition requirements + @transient private[this] lazy val boundCondition = +condition.map(newPredicate(_, streamed.output ++ buffered.output)).getOrElse((row: Row) => true) private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = keys.map(SortOrder(_, Ascending)) protected override def doExecute(): RDD[Row] = { -val leftResults = left.execute().map(_.copy()) -val rightResults = right.execute().map(_.copy()) +val streamResults = streamed.execute().map(_.copy()) --- End diff -- We certainly need to copy the inputs that are passed to external sort, but the `ExternalSort` operator itself should take care of that. Here, I think we're consuming the result of a sort operator and are not buffering rows from `streamResults` (unless I've overlooked other buffering inside of `zipPartitions` somehow). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user adrian-wang commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32802825 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer case class SortMergeJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], +joinType: JoinType, left: SparkPlan, -right: SparkPlan) extends BinaryNode { +right: SparkPlan, +condition: Option[Expression] = None) extends BinaryNode { - override def output: Seq[Attribute] = left.output ++ right.output + val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match { +case RightOuter => (right, left, rightKeys, leftKeys) +case _ => (left, right, leftKeys, rightKeys) + } + + override def output: Seq[Attribute] = joinType match { +case Inner => + left.output ++ right.output +case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) +case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output +case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) +case x => + throw new IllegalStateException(s"SortMergeJoin should not take $x as the JoinType") + } - override def outputPartitioning: Partitioning = left.outputPartitioning + override def outputPartitioning: Partitioning = joinType match { +case FullOuter => + // when doing Full Outer join, NULL rows from both sides are not so partitioned. + UnknownPartitioning(streamed.outputPartitioning.numPartitions) +case _ => streamed.outputPartitioning + } override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil // this is to manually construct an ordering that can be used to compare keys from both sides - private val keyOrdering: RowOrdering = RowOrdering.forSchema(leftKeys.map(_.dataType)) + private val keyOrdering: RowOrdering = RowOrdering.forSchema(streamedKeys.map(_.dataType)) - override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys) + override def outputOrdering: Seq[SortOrder] = joinType match { +case FullOuter => Nil // when doing Full Outer join, NULL rows from both sides are not ordered. +case _ => requiredOrders(streamedKeys) + } override def requiredChildOrdering: Seq[Seq[SortOrder]] = requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil - @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, left.output) - @transient protected lazy val rightKeyGenerator = newProjection(rightKeys, right.output) + @transient protected lazy val streamedKeyGenerator = newProjection(streamedKeys, streamed.output) + @transient protected lazy val bufferedKeyGenerator = newProjection(bufferedKeys, buffered.output) + + // standard null rows + @transient private[this] lazy val streamedNullRow = new GenericRow(streamed.output.length) + @transient private[this] lazy val bufferedNullRow = new GenericRow(buffered.output.length) + + // checks if the joinedRow can meet condition requirements + @transient private[this] lazy val boundCondition = +condition.map(newPredicate(_, streamed.output ++ buffered.output)).getOrElse((row: Row) => true) private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = keys.map(SortOrder(_, Ascending)) protected override def doExecute(): RDD[Row] = { -val leftResults = left.execute().map(_.copy()) -val rightResults = right.execute().map(_.copy()) +val streamResults = streamed.execute().map(_.copy()) --- End diff -- I think we need to copy this, it has something to do with the external sort. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32802697 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer case class SortMergeJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], +joinType: JoinType, left: SparkPlan, -right: SparkPlan) extends BinaryNode { +right: SparkPlan, +condition: Option[Expression] = None) extends BinaryNode { - override def output: Seq[Attribute] = left.output ++ right.output + val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match { +case RightOuter => (right, left, rightKeys, leftKeys) +case _ => (left, right, leftKeys, rightKeys) + } + + override def output: Seq[Attribute] = joinType match { +case Inner => + left.output ++ right.output +case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) +case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output +case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) +case x => + throw new IllegalStateException(s"SortMergeJoin should not take $x as the JoinType") + } - override def outputPartitioning: Partitioning = left.outputPartitioning + override def outputPartitioning: Partitioning = joinType match { +case FullOuter => + // when doing Full Outer join, NULL rows from both sides are not so partitioned. + UnknownPartitioning(streamed.outputPartitioning.numPartitions) +case _ => streamed.outputPartitioning --- End diff -- I think @JoshRosen is right, but currently all of the existed equi-join has the same bug which described at https://issues.apache.org/jira/browse/SPARK-2205 , so for now, we'd better keep it the same as it was to be (just take the streaming output partitioning). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-113348731 @adrian-wang, I'm planning to take another pass on this pretty soon. At a high level, this patch is in very good shape since most of its code is modeled after other existing join implementations in Spark SQL. If you update this in the next couple of days, I'll try my best to be responsive with my reviews so we can get this in soon and not have too many merge conflicts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-111588668 By the way, to provide a bit of context for why I'm reviewing this PR: I'm working on some optimizations to sorting in Spark SQL which should benefit sort-merge-join, so I've looked over all of this code pretty recently. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32338245 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -84,84 +129,164 @@ case class SortMergeJoin( override final def next(): Row = { if (hasNext) { -// we are using the buffered right rows and run down left iterator -val joinedRow = joinRow(leftElement, rightMatches(rightPosition)) -rightPosition += 1 -if (rightPosition >= rightMatches.size) { - rightPosition = 0 - fetchLeft() - if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) { -stop = false -rightMatches = null +if (bufferedMatches == null || bufferedMatches.size == 0) { + // we just found a row with no join match and we are here to produce a row + // with this row with a standard null row from the other side. + if (continueStreamed) { +val joinedRow = smartJoinRow(streamedElement, bufferedNullRow.copy()) +fetchStreamed() +joinedRow + } else { +val joinedRow = smartJoinRow(streamedNullRow.copy(), bufferedElement) +fetchBuffered() +joinedRow + } +} else { + // we are using the buffered right rows and run down left iterator + val joinedRow = smartJoinRow(streamedElement, bufferedMatches(bufferedPosition)) + bufferedPosition += 1 + if (bufferedPosition >= bufferedMatches.size) { +bufferedPosition = 0 +if (joinType != FullOuter || secondStreamedElement == null) { + fetchStreamed() + if (streamedElement == null || keyOrdering.compare(streamedKey, matchKey) != 0) { +stop = false +bufferedMatches = null + } +} else { + // in FullOuter join and the first time we finish the match buffer, + // we still want to generate all rows with streamed null row and buffered + // rows that match the join key but not the conditions. + streamedElement = secondStreamedElement + bufferedMatches = secondBufferedMatches + secondStreamedElement = null + secondBufferedMatches = null +} } + joinedRow } -joinedRow } else { // no more result throw new NoSuchElementException } } -private def fetchLeft() = { - if (leftIter.hasNext) { -leftElement = leftIter.next() -leftKey = leftKeyGenerator(leftElement) +private def smartJoinRow(streamedRow: Row, bufferedRow: Row): Row = joinType match { + case RightOuter => joinRow(bufferedRow, streamedRow) + case _ => joinRow(streamedRow, bufferedRow) +} + +private def fetchStreamed() = { + if (streamedIter.hasNext) { +streamedElement = streamedIter.next() +streamedKey = streamedKeyGenerator(streamedElement) } else { -leftElement = null +streamedElement = null } } -private def fetchRight() = { - if (rightIter.hasNext) { -rightElement = rightIter.next() -rightKey = rightKeyGenerator(rightElement) +private def fetchBuffered() = { + if (bufferedIter.hasNext) { +bufferedElement = bufferedIter.next() +bufferedKey = bufferedKeyGenerator(bufferedElement) } else { -rightElement = null +bufferedElement = null } } private def initialize() = { - fetchLeft() - fetchRight() + fetchStreamed() + fetchBuffered() } /** * Searches the right iterator for the next rows that have matches in left side, and store * them in a buffer. + * When this is not a Inner join, we will also return true when we get a row with no match + * on the other side. This search will jump out every time from the same position until + * `next()` is called. * * @return true if the search
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user adrian-wang commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-111570564 @JoshRosen Thanks for your comments, I will refine the code accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user adrian-wang commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32337036 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer case class SortMergeJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], +joinType: JoinType, left: SparkPlan, -right: SparkPlan) extends BinaryNode { +right: SparkPlan, +condition: Option[Expression] = None) extends BinaryNode { - override def output: Seq[Attribute] = left.output ++ right.output + val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match { +case RightOuter => (right, left, rightKeys, leftKeys) +case _ => (left, right, leftKeys, rightKeys) + } + + override def output: Seq[Attribute] = joinType match { +case Inner => + left.output ++ right.output +case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) +case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output +case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) +case x => + throw new IllegalStateException(s"SortMergeJoin should not take $x as the JoinType") + } - override def outputPartitioning: Partitioning = left.outputPartitioning + override def outputPartitioning: Partitioning = joinType match { +case FullOuter => + // when doing Full Outer join, NULL rows from both sides are not so partitioned. + UnknownPartitioning(streamed.outputPartitioning.numPartitions) +case _ => streamed.outputPartitioning --- End diff -- I doubt it... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-111569760 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-111569749 [Test build #34777 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/34777/console) for PR 5717 at commit [`add49a2`](https://github.com/apache/spark/commit/add49a241ef96517b5cdcb02fde72d27185722e9). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32330738 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -84,84 +129,164 @@ case class SortMergeJoin( override final def next(): Row = { if (hasNext) { -// we are using the buffered right rows and run down left iterator -val joinedRow = joinRow(leftElement, rightMatches(rightPosition)) -rightPosition += 1 -if (rightPosition >= rightMatches.size) { - rightPosition = 0 - fetchLeft() - if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) { -stop = false -rightMatches = null +if (bufferedMatches == null || bufferedMatches.size == 0) { + // we just found a row with no join match and we are here to produce a row + // with this row with a standard null row from the other side. + if (continueStreamed) { +val joinedRow = smartJoinRow(streamedElement, bufferedNullRow.copy()) +fetchStreamed() +joinedRow + } else { +val joinedRow = smartJoinRow(streamedNullRow.copy(), bufferedElement) +fetchBuffered() +joinedRow + } +} else { + // we are using the buffered right rows and run down left iterator + val joinedRow = smartJoinRow(streamedElement, bufferedMatches(bufferedPosition)) + bufferedPosition += 1 + if (bufferedPosition >= bufferedMatches.size) { +bufferedPosition = 0 +if (joinType != FullOuter || secondStreamedElement == null) { + fetchStreamed() + if (streamedElement == null || keyOrdering.compare(streamedKey, matchKey) != 0) { +stop = false +bufferedMatches = null + } +} else { + // in FullOuter join and the first time we finish the match buffer, + // we still want to generate all rows with streamed null row and buffered + // rows that match the join key but not the conditions. + streamedElement = secondStreamedElement + bufferedMatches = secondBufferedMatches + secondStreamedElement = null + secondBufferedMatches = null +} } + joinedRow } -joinedRow } else { // no more result throw new NoSuchElementException } } -private def fetchLeft() = { - if (leftIter.hasNext) { -leftElement = leftIter.next() -leftKey = leftKeyGenerator(leftElement) +private def smartJoinRow(streamedRow: Row, bufferedRow: Row): Row = joinType match { + case RightOuter => joinRow(bufferedRow, streamedRow) + case _ => joinRow(streamedRow, bufferedRow) +} + +private def fetchStreamed() = { + if (streamedIter.hasNext) { +streamedElement = streamedIter.next() +streamedKey = streamedKeyGenerator(streamedElement) } else { -leftElement = null +streamedElement = null } } -private def fetchRight() = { - if (rightIter.hasNext) { -rightElement = rightIter.next() -rightKey = rightKeyGenerator(rightElement) +private def fetchBuffered() = { + if (bufferedIter.hasNext) { +bufferedElement = bufferedIter.next() +bufferedKey = bufferedKeyGenerator(bufferedElement) } else { -rightElement = null +bufferedElement = null } } private def initialize() = { - fetchLeft() - fetchRight() + fetchStreamed() + fetchBuffered() } /** * Searches the right iterator for the next rows that have matches in left side, and store * them in a buffer. + * When this is not a Inner join, we will also return true when we get a row with no match + * on the other side. This search will jump out every time from the same position until + * `next()` is called. * * @return true if the search
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32330289 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -84,84 +129,164 @@ case class SortMergeJoin( override final def next(): Row = { if (hasNext) { -// we are using the buffered right rows and run down left iterator -val joinedRow = joinRow(leftElement, rightMatches(rightPosition)) -rightPosition += 1 -if (rightPosition >= rightMatches.size) { - rightPosition = 0 - fetchLeft() - if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) { -stop = false -rightMatches = null +if (bufferedMatches == null || bufferedMatches.size == 0) { + // we just found a row with no join match and we are here to produce a row + // with this row with a standard null row from the other side. + if (continueStreamed) { +val joinedRow = smartJoinRow(streamedElement, bufferedNullRow.copy()) +fetchStreamed() +joinedRow + } else { +val joinedRow = smartJoinRow(streamedNullRow.copy(), bufferedElement) +fetchBuffered() +joinedRow + } +} else { + // we are using the buffered right rows and run down left iterator + val joinedRow = smartJoinRow(streamedElement, bufferedMatches(bufferedPosition)) + bufferedPosition += 1 + if (bufferedPosition >= bufferedMatches.size) { +bufferedPosition = 0 +if (joinType != FullOuter || secondStreamedElement == null) { + fetchStreamed() + if (streamedElement == null || keyOrdering.compare(streamedKey, matchKey) != 0) { +stop = false +bufferedMatches = null + } +} else { + // in FullOuter join and the first time we finish the match buffer, + // we still want to generate all rows with streamed null row and buffered + // rows that match the join key but not the conditions. + streamedElement = secondStreamedElement + bufferedMatches = secondBufferedMatches + secondStreamedElement = null + secondBufferedMatches = null +} } + joinedRow } -joinedRow } else { // no more result throw new NoSuchElementException } } -private def fetchLeft() = { - if (leftIter.hasNext) { -leftElement = leftIter.next() -leftKey = leftKeyGenerator(leftElement) +private def smartJoinRow(streamedRow: Row, bufferedRow: Row): Row = joinType match { + case RightOuter => joinRow(bufferedRow, streamedRow) + case _ => joinRow(streamedRow, bufferedRow) +} + +private def fetchStreamed() = { + if (streamedIter.hasNext) { +streamedElement = streamedIter.next() +streamedKey = streamedKeyGenerator(streamedElement) } else { -leftElement = null +streamedElement = null } } -private def fetchRight() = { - if (rightIter.hasNext) { -rightElement = rightIter.next() -rightKey = rightKeyGenerator(rightElement) +private def fetchBuffered() = { + if (bufferedIter.hasNext) { +bufferedElement = bufferedIter.next() +bufferedKey = bufferedKeyGenerator(bufferedElement) } else { -rightElement = null +bufferedElement = null } } private def initialize() = { - fetchLeft() - fetchRight() + fetchStreamed() + fetchBuffered() } /** * Searches the right iterator for the next rows that have matches in left side, and store * them in a buffer. + * When this is not a Inner join, we will also return true when we get a row with no match + * on the other side. This search will jump out every time from the same position until + * `next()` is called. * * @return true if the search
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32330049 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -84,84 +129,164 @@ case class SortMergeJoin( override final def next(): Row = { if (hasNext) { -// we are using the buffered right rows and run down left iterator -val joinedRow = joinRow(leftElement, rightMatches(rightPosition)) -rightPosition += 1 -if (rightPosition >= rightMatches.size) { - rightPosition = 0 - fetchLeft() - if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) { -stop = false -rightMatches = null +if (bufferedMatches == null || bufferedMatches.size == 0) { + // we just found a row with no join match and we are here to produce a row + // with this row with a standard null row from the other side. + if (continueStreamed) { +val joinedRow = smartJoinRow(streamedElement, bufferedNullRow.copy()) +fetchStreamed() +joinedRow + } else { +val joinedRow = smartJoinRow(streamedNullRow.copy(), bufferedElement) +fetchBuffered() +joinedRow + } +} else { + // we are using the buffered right rows and run down left iterator + val joinedRow = smartJoinRow(streamedElement, bufferedMatches(bufferedPosition)) + bufferedPosition += 1 + if (bufferedPosition >= bufferedMatches.size) { +bufferedPosition = 0 +if (joinType != FullOuter || secondStreamedElement == null) { + fetchStreamed() + if (streamedElement == null || keyOrdering.compare(streamedKey, matchKey) != 0) { +stop = false +bufferedMatches = null + } +} else { + // in FullOuter join and the first time we finish the match buffer, + // we still want to generate all rows with streamed null row and buffered + // rows that match the join key but not the conditions. + streamedElement = secondStreamedElement + bufferedMatches = secondBufferedMatches + secondStreamedElement = null + secondBufferedMatches = null +} } + joinedRow } -joinedRow } else { // no more result throw new NoSuchElementException } } -private def fetchLeft() = { - if (leftIter.hasNext) { -leftElement = leftIter.next() -leftKey = leftKeyGenerator(leftElement) +private def smartJoinRow(streamedRow: Row, bufferedRow: Row): Row = joinType match { + case RightOuter => joinRow(bufferedRow, streamedRow) + case _ => joinRow(streamedRow, bufferedRow) +} + +private def fetchStreamed() = { + if (streamedIter.hasNext) { +streamedElement = streamedIter.next() +streamedKey = streamedKeyGenerator(streamedElement) } else { -leftElement = null +streamedElement = null } } -private def fetchRight() = { - if (rightIter.hasNext) { -rightElement = rightIter.next() -rightKey = rightKeyGenerator(rightElement) +private def fetchBuffered() = { --- End diff -- Same here: we should add `: Unit = ` if we always ignore the return value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32330019 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -84,84 +129,164 @@ case class SortMergeJoin( override final def next(): Row = { if (hasNext) { -// we are using the buffered right rows and run down left iterator -val joinedRow = joinRow(leftElement, rightMatches(rightPosition)) -rightPosition += 1 -if (rightPosition >= rightMatches.size) { - rightPosition = 0 - fetchLeft() - if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) { -stop = false -rightMatches = null +if (bufferedMatches == null || bufferedMatches.size == 0) { + // we just found a row with no join match and we are here to produce a row + // with this row with a standard null row from the other side. + if (continueStreamed) { +val joinedRow = smartJoinRow(streamedElement, bufferedNullRow.copy()) +fetchStreamed() +joinedRow + } else { +val joinedRow = smartJoinRow(streamedNullRow.copy(), bufferedElement) +fetchBuffered() +joinedRow + } +} else { + // we are using the buffered right rows and run down left iterator + val joinedRow = smartJoinRow(streamedElement, bufferedMatches(bufferedPosition)) + bufferedPosition += 1 + if (bufferedPosition >= bufferedMatches.size) { +bufferedPosition = 0 +if (joinType != FullOuter || secondStreamedElement == null) { + fetchStreamed() + if (streamedElement == null || keyOrdering.compare(streamedKey, matchKey) != 0) { +stop = false +bufferedMatches = null + } +} else { + // in FullOuter join and the first time we finish the match buffer, + // we still want to generate all rows with streamed null row and buffered + // rows that match the join key but not the conditions. + streamedElement = secondStreamedElement + bufferedMatches = secondBufferedMatches + secondStreamedElement = null + secondBufferedMatches = null +} } + joinedRow } -joinedRow } else { // no more result throw new NoSuchElementException } } -private def fetchLeft() = { - if (leftIter.hasNext) { -leftElement = leftIter.next() -leftKey = leftKeyGenerator(leftElement) +private def smartJoinRow(streamedRow: Row, bufferedRow: Row): Row = joinType match { + case RightOuter => joinRow(bufferedRow, streamedRow) + case _ => joinRow(streamedRow, bufferedRow) +} + +private def fetchStreamed() = { --- End diff -- It looks like the return value of `fetchStreamed` isn't used anywhere, so can you add a `: Unit = ` to this to make that clear? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32329757 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -84,84 +129,164 @@ case class SortMergeJoin( override final def next(): Row = { if (hasNext) { -// we are using the buffered right rows and run down left iterator -val joinedRow = joinRow(leftElement, rightMatches(rightPosition)) -rightPosition += 1 -if (rightPosition >= rightMatches.size) { - rightPosition = 0 - fetchLeft() - if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) { -stop = false -rightMatches = null +if (bufferedMatches == null || bufferedMatches.size == 0) { + // we just found a row with no join match and we are here to produce a row + // with this row with a standard null row from the other side. --- End diff -- Typo? Maybe this meant to read "with this row _and_ a standard null row"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32327932 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -84,84 +129,164 @@ case class SortMergeJoin( override final def next(): Row = { if (hasNext) { -// we are using the buffered right rows and run down left iterator -val joinedRow = joinRow(leftElement, rightMatches(rightPosition)) -rightPosition += 1 -if (rightPosition >= rightMatches.size) { - rightPosition = 0 - fetchLeft() - if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) { -stop = false -rightMatches = null +if (bufferedMatches == null || bufferedMatches.size == 0) { + // we just found a row with no join match and we are here to produce a row + // with this row with a standard null row from the other side. + if (continueStreamed) { +val joinedRow = smartJoinRow(streamedElement, bufferedNullRow.copy()) --- End diff -- Why do we have to copy the standard null rows? Are we worried about downstream operators mutating them? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32327674 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer case class SortMergeJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], +joinType: JoinType, left: SparkPlan, -right: SparkPlan) extends BinaryNode { +right: SparkPlan, +condition: Option[Expression] = None) extends BinaryNode { - override def output: Seq[Attribute] = left.output ++ right.output + val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match { +case RightOuter => (right, left, rightKeys, leftKeys) +case _ => (left, right, leftKeys, rightKeys) + } + + override def output: Seq[Attribute] = joinType match { +case Inner => + left.output ++ right.output +case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) +case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output +case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) +case x => + throw new IllegalStateException(s"SortMergeJoin should not take $x as the JoinType") + } - override def outputPartitioning: Partitioning = left.outputPartitioning + override def outputPartitioning: Partitioning = joinType match { +case FullOuter => + // when doing Full Outer join, NULL rows from both sides are not so partitioned. + UnknownPartitioning(streamed.outputPartitioning.numPartitions) +case _ => streamed.outputPartitioning --- End diff -- Both `streamed` and `buffered` have the same output partitioning, since we've co-partitioned them in order to perform the join? Just want to double-check my understanding. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-111532078 [Test build #34777 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/34777/consoleFull) for PR 5717 at commit [`add49a2`](https://github.com/apache/spark/commit/add49a241ef96517b5cdcb02fde72d27185722e9). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32327276 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer case class SortMergeJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], +joinType: JoinType, left: SparkPlan, -right: SparkPlan) extends BinaryNode { +right: SparkPlan, +condition: Option[Expression] = None) extends BinaryNode { - override def output: Seq[Attribute] = left.output ++ right.output + val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match { +case RightOuter => (right, left, rightKeys, leftKeys) +case _ => (left, right, leftKeys, rightKeys) + } + + override def output: Seq[Attribute] = joinType match { +case Inner => + left.output ++ right.output +case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) +case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output +case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) +case x => + throw new IllegalStateException(s"SortMergeJoin should not take $x as the JoinType") + } - override def outputPartitioning: Partitioning = left.outputPartitioning + override def outputPartitioning: Partitioning = joinType match { +case FullOuter => + // when doing Full Outer join, NULL rows from both sides are not so partitioned. + UnknownPartitioning(streamed.outputPartitioning.numPartitions) +case _ => streamed.outputPartitioning + } override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil // this is to manually construct an ordering that can be used to compare keys from both sides - private val keyOrdering: RowOrdering = RowOrdering.forSchema(leftKeys.map(_.dataType)) + private val keyOrdering: RowOrdering = RowOrdering.forSchema(streamedKeys.map(_.dataType)) - override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys) + override def outputOrdering: Seq[SortOrder] = joinType match { +case FullOuter => Nil // when doing Full Outer join, NULL rows from both sides are not ordered. +case _ => requiredOrders(streamedKeys) + } override def requiredChildOrdering: Seq[Seq[SortOrder]] = requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil - @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, left.output) - @transient protected lazy val rightKeyGenerator = newProjection(rightKeys, right.output) + @transient protected lazy val streamedKeyGenerator = newProjection(streamedKeys, streamed.output) + @transient protected lazy val bufferedKeyGenerator = newProjection(bufferedKeys, buffered.output) + + // standard null rows + @transient private[this] lazy val streamedNullRow = new GenericRow(streamed.output.length) --- End diff -- Can we move `streamedNullRow` and `bufferedNullRow` into the `zipPartitions` call so that we don't have to make a `transient lazy val`? I find `transient lazy val` to be a bit confusing and like to avoid it when I can. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-111531521 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/5717#discussion_r32327111 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala --- @@ -36,46 +36,91 @@ import org.apache.spark.util.collection.CompactBuffer case class SortMergeJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], +joinType: JoinType, left: SparkPlan, -right: SparkPlan) extends BinaryNode { +right: SparkPlan, +condition: Option[Expression] = None) extends BinaryNode { - override def output: Seq[Attribute] = left.output ++ right.output + val (streamed, buffered, streamedKeys, bufferedKeys) = joinType match { +case RightOuter => (right, left, rightKeys, leftKeys) +case _ => (left, right, leftKeys, rightKeys) + } + + override def output: Seq[Attribute] = joinType match { +case Inner => + left.output ++ right.output +case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) +case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output +case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) +case x => + throw new IllegalStateException(s"SortMergeJoin should not take $x as the JoinType") + } - override def outputPartitioning: Partitioning = left.outputPartitioning + override def outputPartitioning: Partitioning = joinType match { +case FullOuter => + // when doing Full Outer join, NULL rows from both sides are not so partitioned. + UnknownPartitioning(streamed.outputPartitioning.numPartitions) +case _ => streamed.outputPartitioning + } override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil // this is to manually construct an ordering that can be used to compare keys from both sides - private val keyOrdering: RowOrdering = RowOrdering.forSchema(leftKeys.map(_.dataType)) + private val keyOrdering: RowOrdering = RowOrdering.forSchema(streamedKeys.map(_.dataType)) - override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys) + override def outputOrdering: Seq[SortOrder] = joinType match { +case FullOuter => Nil // when doing Full Outer join, NULL rows from both sides are not ordered. +case _ => requiredOrders(streamedKeys) + } override def requiredChildOrdering: Seq[Seq[SortOrder]] = requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil - @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, left.output) - @transient protected lazy val rightKeyGenerator = newProjection(rightKeys, right.output) + @transient protected lazy val streamedKeyGenerator = newProjection(streamedKeys, streamed.output) + @transient protected lazy val bufferedKeyGenerator = newProjection(bufferedKeys, buffered.output) + + // standard null rows + @transient private[this] lazy val streamedNullRow = new GenericRow(streamed.output.length) + @transient private[this] lazy val bufferedNullRow = new GenericRow(buffered.output.length) + + // checks if the joinedRow can meet condition requirements + @transient private[this] lazy val boundCondition = +condition.map(newPredicate(_, streamed.output ++ buffered.output)).getOrElse((row: Row) => true) private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = keys.map(SortOrder(_, Ascending)) protected override def doExecute(): RDD[Row] = { -val leftResults = left.execute().map(_.copy()) -val rightResults = right.execute().map(_.copy()) +val streamResults = streamed.execute().map(_.copy()) --- End diff -- Why do we need to copy the streamed rows? I understand why we need to do the copy for the buffered results, since we might be dealing with mutable input rows, but that shouldn't be a problem for the stream side, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-111531490 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-111531231 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-106680748 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7165] [SQL] use sort merge join for out...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5717#issuecomment-106680745 [Test build #33707 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/33707/consoleFull) for PR 5717 at commit [`add49a2`](https://github.com/apache/spark/commit/add49a241ef96517b5cdcb02fde72d27185722e9). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org