[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/21156#discussion_r227268021 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinUtils.scala --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, HashClusteredDistribution, HashPartitioning} +import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec + +object JoinUtils { + private def avoidShuffleIfPossible( + joinKeys: Seq[Expression], + expressions: Seq[Expression], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression]): Seq[Distribution] = { +val indices = expressions.map(x => joinKeys.indexWhere(_.semanticEquals(x))) +HashClusteredDistribution(indices.map(leftKeys(_))) :: + HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil + } + + def requiredChildDistributionForShuffledJoin( + partitioningDetection: Boolean, + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + left: SparkPlan, + right: SparkPlan): Seq[Distribution] = { +if (!partitioningDetection) { + return HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil +} + +val leftPartitioning = left.outputPartitioning +val rightPartitioning = right.outputPartitioning --- End diff -- @cloud-fan in this PR, `requiredChildDistribution` is always re-calculated each time it is invoked, could it be more precise than `EnsureRequirements.reorderJoinPredicates`ï¼ This kind of scenario is common, do we have a plan to improve the framework in 3.0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21156#discussion_r227253732 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinUtils.scala --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, HashClusteredDistribution, HashPartitioning} +import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec + +object JoinUtils { + private def avoidShuffleIfPossible( + joinKeys: Seq[Expression], + expressions: Seq[Expression], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression]): Seq[Distribution] = { +val indices = expressions.map(x => joinKeys.indexWhere(_.semanticEquals(x))) +HashClusteredDistribution(indices.map(leftKeys(_))) :: + HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil + } + + def requiredChildDistributionForShuffledJoin( + partitioningDetection: Boolean, + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + left: SparkPlan, + right: SparkPlan): Seq[Distribution] = { +if (!partitioningDetection) { + return HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil +} + +val leftPartitioning = left.outputPartitioning +val rightPartitioning = right.outputPartitioning --- End diff -- This is my biggest concern. Currently Spark adds shuffle with a rule, so we can't always get the children partitioning precisely. We implemented a similar feature in `EnsureRequirements.reorderJoinPredicates`, which is hacky and we should improve the framework before adding more features like this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
GitHub user yucai reopened a pull request: https://github.com/apache/spark/pull/21156 [SPARK-24087][SQL] Avoid shuffle when join keys are a super-set of bucket keys ## What changes were proposed in this pull request? To improve the bucket join, when join keys are a super-set of bucket keys, we should avoid shuffle. ## How was this patch tested? Enable ignored test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark SPARK-24087 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21156.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21156 commit b6bfdc21ed8edf98f9a3b9ac1c253c59adb141a2 Author: yucai Date: 2018-04-25T00:49:43Z [SPARK-24087][SQL] Avoid shuffle when join keys are a super-set of bucket keys commit a59c94f5b655fc034ce8907b98022cacf6bf318e Author: yucai Date: 2018-04-26T04:33:08Z simplify the codes commit 4e026e5e437dc7f578434244b55bb1ebe189bace Author: yucai Date: 2018-06-04T02:22:12Z Add spark.sql.sortMergeJoinExec.childrenPartitioningDetection for user to disable this feature commit fa76a7823baf4e6eb05f33bc746ade7f65f44372 Author: yucai Date: 2018-06-04T05:25:01Z enable spark.sql.sortMergeJoinExec.childrenPartitioningDetection by default commit 946688aee3d03d37a57270e654e00bb9236f21c4 Author: yucai Date: 2018-06-04T05:28:51Z should return commit 981a0fd22d30768ce533982c9fcc701b15d4dc44 Author: yucai Date: 2018-07-06T06:51:24Z skip RangePartition commit 76e7d5f67017604c29179ce55280e0fc56574fde Author: yucai Date: 2018-07-09T10:14:43Z Merge remote-tracking branch 'origin/master' into pr21156 commit 371c3a932f4dede4aeb1be2c9db404b457547ecf Author: yucai Date: 2018-07-09T11:33:17Z improve tests commit de2bc4de76077f257b85e6a1d58ee17fbc770c8e Author: yucai Date: 2018-07-12T01:43:35Z support shuffled hash join commit f40606203da01efe400431ed9d2b8b70c0476fc6 Author: yucai Date: 2018-07-26T14:33:40Z remove bucket table check --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user yucai closed the pull request at: https://github.com/apache/spark/pull/21156 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21156#discussion_r200947380 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -76,8 +76,36 @@ case class SortMergeJoinExec( s"${getClass.getSimpleName} should not take $x as the JoinType") } - override def requiredChildDistribution: Seq[Distribution] = -HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + private def avoidShuffleIfPossible( + joinKeys: Seq[Expression], + expressions: Seq[Expression]): Seq[Distribution] = { +val indices = expressions.map(x => joinKeys.indexWhere(_.semanticEquals(x))) --- End diff -- yes, you're right, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/21156#discussion_r200945621 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -76,8 +76,36 @@ case class SortMergeJoinExec( s"${getClass.getSimpleName} should not take $x as the JoinType") } - override def requiredChildDistribution: Seq[Distribution] = -HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + private def avoidShuffleIfPossible( + joinKeys: Seq[Expression], + expressions: Seq[Expression]): Seq[Distribution] = { +val indices = expressions.map(x => joinKeys.indexWhere(_.semanticEquals(x))) --- End diff -- ``` case HashPartitioning(leftExpressions, _) if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) => avoidShuffleIfPossible(leftKeys, leftExpressions) ``` ` if leftPartitioning.satisfies(ClusteredDistribution(leftKeys))` has ensured `expressions` is a subset of `joinKeys`, so it would not return -1, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21156#discussion_r200941904 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -76,8 +76,36 @@ case class SortMergeJoinExec( s"${getClass.getSimpleName} should not take $x as the JoinType") } - override def requiredChildDistribution: Seq[Distribution] = -HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + private def avoidShuffleIfPossible( + joinKeys: Seq[Expression], + expressions: Seq[Expression]): Seq[Distribution] = { +val indices = expressions.map(x => joinKeys.indexWhere(_.semanticEquals(x))) +HashClusteredDistribution(indices.map(leftKeys(_))) :: + HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil + } + + override def requiredChildDistribution: Seq[Distribution] = { +if (!conf.sortMergeJoinExecChildrenPartitioningDetection) { + return HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil +} + +val leftPartitioning = left.outputPartitioning +val rightPartitioning = right.outputPartitioning +leftPartitioning match { + case HashPartitioning(leftExpressions, _) +if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) => +avoidShuffleIfPossible(leftKeys, leftExpressions) + + case _ => rightPartitioning match { --- End diff -- yes, we can do that, but anyway this `case` is useless... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/21156#discussion_r200940424 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -76,8 +76,36 @@ case class SortMergeJoinExec( s"${getClass.getSimpleName} should not take $x as the JoinType") } - override def requiredChildDistribution: Seq[Distribution] = -HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + private def avoidShuffleIfPossible( + joinKeys: Seq[Expression], + expressions: Seq[Expression]): Seq[Distribution] = { +val indices = expressions.map(x => joinKeys.indexWhere(_.semanticEquals(x))) +HashClusteredDistribution(indices.map(leftKeys(_))) :: + HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil + } + + override def requiredChildDistribution: Seq[Distribution] = { +if (!conf.sortMergeJoinExecChildrenPartitioningDetection) { + return HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil +} + +val leftPartitioning = left.outputPartitioning +val rightPartitioning = right.outputPartitioning +leftPartitioning match { + case HashPartitioning(leftExpressions, _) +if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) => +avoidShuffleIfPossible(leftKeys, leftExpressions) + + case _ => rightPartitioning match { --- End diff -- In that case, we can return `OrderedDistribution :: OrderedDistribution :: Nil` to avoid shuffle for the `RangePartitioning` side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21156#discussion_r200940409 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -76,8 +76,36 @@ case class SortMergeJoinExec( s"${getClass.getSimpleName} should not take $x as the JoinType") } - override def requiredChildDistribution: Seq[Distribution] = -HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + private def avoidShuffleIfPossible( + joinKeys: Seq[Expression], + expressions: Seq[Expression]): Seq[Distribution] = { +val indices = expressions.map(x => joinKeys.indexWhere(_.semanticEquals(x))) --- End diff -- what if here we don't find an expression? I think it would return -1 causing an error when using the index later. Can we also add a test case for this situation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21156#discussion_r200938314 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -76,8 +76,36 @@ case class SortMergeJoinExec( s"${getClass.getSimpleName} should not take $x as the JoinType") } - override def requiredChildDistribution: Seq[Distribution] = -HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + private def avoidShuffleIfPossible( + joinKeys: Seq[Expression], + expressions: Seq[Expression]): Seq[Distribution] = { +val indices = expressions.map(x => joinKeys.indexWhere(_.semanticEquals(x))) +HashClusteredDistribution(indices.map(leftKeys(_))) :: + HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil + } + + override def requiredChildDistribution: Seq[Distribution] = { +if (!conf.sortMergeJoinExecChildrenPartitioningDetection) { + return HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil +} + +val leftPartitioning = left.outputPartitioning +val rightPartitioning = right.outputPartitioning +leftPartitioning match { + case HashPartitioning(leftExpressions, _) +if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) => +avoidShuffleIfPossible(leftKeys, leftExpressions) + + case _ => rightPartitioning match { --- End diff -- But that case would not be covered anyway here as we are returning that we require a `HashClusteredDistribution` so a `RangePartitioning` would never match anyway, wouldn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/21156#discussion_r200937190 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -76,8 +76,36 @@ case class SortMergeJoinExec( s"${getClass.getSimpleName} should not take $x as the JoinType") } - override def requiredChildDistribution: Seq[Distribution] = -HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + private def avoidShuffleIfPossible( + joinKeys: Seq[Expression], + expressions: Seq[Expression]): Seq[Distribution] = { +val indices = expressions.map(x => joinKeys.indexWhere(_.semanticEquals(x))) +HashClusteredDistribution(indices.map(leftKeys(_))) :: + HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil + } + + override def requiredChildDistribution: Seq[Distribution] = { +if (!conf.sortMergeJoinExecChildrenPartitioningDetection) { + return HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil +} + +val leftPartitioning = left.outputPartitioning +val rightPartitioning = right.outputPartitioning +leftPartitioning match { + case HashPartitioning(leftExpressions, _) +if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) => +avoidShuffleIfPossible(leftKeys, leftExpressions) + + case _ => rightPartitioning match { --- End diff -- Yes, you are right. The main purpose of this feature is for the bucketed table, so the `HashPartitioning` is enough. Actually, with the similar way, we can skip the shuffle for one side if it is `RangePartitioning` also, but I am not sure if it is really useful. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21156#discussion_r200934272 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala --- @@ -334,8 +334,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { ) } - // Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704 - ignore("avoid shuffle when join keys are a super-set of bucket keys") { + test("avoid shuffle when join keys are a super-set of bucket keys") { --- End diff -- can we add more tests with different `BucketSpec` on the two sides? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21156#discussion_r200933998 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -76,8 +76,36 @@ case class SortMergeJoinExec( s"${getClass.getSimpleName} should not take $x as the JoinType") } - override def requiredChildDistribution: Seq[Distribution] = -HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + private def avoidShuffleIfPossible( + joinKeys: Seq[Expression], + expressions: Seq[Expression]): Seq[Distribution] = { +val indices = expressions.map(x => joinKeys.indexWhere(_.semanticEquals(x))) +HashClusteredDistribution(indices.map(leftKeys(_))) :: + HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil + } + + override def requiredChildDistribution: Seq[Distribution] = { +if (!conf.sortMergeJoinExecChildrenPartitioningDetection) { + return HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil +} + +val leftPartitioning = left.outputPartitioning +val rightPartitioning = right.outputPartitioning +leftPartitioning match { + case HashPartitioning(leftExpressions, _) +if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) => +avoidShuffleIfPossible(leftKeys, leftExpressions) + + case _ => rightPartitioning match { --- End diff -- IIUC if either `left` or `right` are not `HashPartitioning` we are sure we won't meet the required distribution, so I guess this is useless, isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/21156 [SPARK-24087][SQL] Avoid shuffle when join keys are a super-set of bucket keys ## What changes were proposed in this pull request? To improve the bucket join, when join keys are a super-set of bucket keys, we should avoid shuffle. ## How was this patch tested? Enable ignored test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark SPARK-24087 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21156.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21156 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org