[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-10-23 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21156#discussion_r227268021
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinUtils.scala ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, HashClusteredDistribution, HashPartitioning}
+import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+
+object JoinUtils {
+  private def avoidShuffleIfPossible(
+  joinKeys: Seq[Expression],
+  expressions: Seq[Expression],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression]): Seq[Distribution] = {
+val indices = expressions.map(x => 
joinKeys.indexWhere(_.semanticEquals(x)))
+HashClusteredDistribution(indices.map(leftKeys(_))) ::
+  HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil
+  }
+
+  def requiredChildDistributionForShuffledJoin(
+  partitioningDetection: Boolean,
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  left: SparkPlan,
+  right: SparkPlan): Seq[Distribution] = {
+if (!partitioningDetection) {
+  return HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+}
+
+val leftPartitioning = left.outputPartitioning
+val rightPartitioning = right.outputPartitioning
--- End diff --

@cloud-fan in this PR, `requiredChildDistribution` is always re-calculated 
each time it is invoked, could it be more precise than 
`EnsureRequirements.reorderJoinPredicates`?

This kind of scenario is common, do we have a plan to improve the framework 
in 3.0?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21156#discussion_r227253732
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinUtils.scala ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, HashClusteredDistribution, HashPartitioning}
+import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+
+object JoinUtils {
+  private def avoidShuffleIfPossible(
+  joinKeys: Seq[Expression],
+  expressions: Seq[Expression],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression]): Seq[Distribution] = {
+val indices = expressions.map(x => 
joinKeys.indexWhere(_.semanticEquals(x)))
+HashClusteredDistribution(indices.map(leftKeys(_))) ::
+  HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil
+  }
+
+  def requiredChildDistributionForShuffledJoin(
+  partitioningDetection: Boolean,
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  left: SparkPlan,
+  right: SparkPlan): Seq[Distribution] = {
+if (!partitioningDetection) {
+  return HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+}
+
+val leftPartitioning = left.outputPartitioning
+val rightPartitioning = right.outputPartitioning
--- End diff --

This is my biggest concern. Currently Spark adds shuffle with a rule, so we 
can't always get the children partitioning precisely. We implemented a similar 
feature in `EnsureRequirements.reorderJoinPredicates`, which is hacky and we 
should improve the framework before adding more features like this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-07-26 Thread yucai
GitHub user yucai reopened a pull request:

https://github.com/apache/spark/pull/21156

[SPARK-24087][SQL] Avoid shuffle when join keys are a super-set of bucket 
keys

## What changes were proposed in this pull request?

To improve the bucket join, when join keys are a super-set of bucket keys, 
we should avoid shuffle.

## How was this patch tested?

Enable ignored test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yucai/spark SPARK-24087

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21156.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21156


commit b6bfdc21ed8edf98f9a3b9ac1c253c59adb141a2
Author: yucai 
Date:   2018-04-25T00:49:43Z

[SPARK-24087][SQL] Avoid shuffle when join keys are a super-set of bucket 
keys

commit a59c94f5b655fc034ce8907b98022cacf6bf318e
Author: yucai 
Date:   2018-04-26T04:33:08Z

simplify the codes

commit 4e026e5e437dc7f578434244b55bb1ebe189bace
Author: yucai 
Date:   2018-06-04T02:22:12Z

Add spark.sql.sortMergeJoinExec.childrenPartitioningDetection for user to 
disable this feature

commit fa76a7823baf4e6eb05f33bc746ade7f65f44372
Author: yucai 
Date:   2018-06-04T05:25:01Z

enable spark.sql.sortMergeJoinExec.childrenPartitioningDetection by default

commit 946688aee3d03d37a57270e654e00bb9236f21c4
Author: yucai 
Date:   2018-06-04T05:28:51Z

should return

commit 981a0fd22d30768ce533982c9fcc701b15d4dc44
Author: yucai 
Date:   2018-07-06T06:51:24Z

skip RangePartition

commit 76e7d5f67017604c29179ce55280e0fc56574fde
Author: yucai 
Date:   2018-07-09T10:14:43Z

Merge remote-tracking branch 'origin/master' into pr21156

commit 371c3a932f4dede4aeb1be2c9db404b457547ecf
Author: yucai 
Date:   2018-07-09T11:33:17Z

improve tests

commit de2bc4de76077f257b85e6a1d58ee17fbc770c8e
Author: yucai 
Date:   2018-07-12T01:43:35Z

support shuffled hash join

commit f40606203da01efe400431ed9d2b8b70c0476fc6
Author: yucai 
Date:   2018-07-26T14:33:40Z

remove bucket table check




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-07-26 Thread yucai
Github user yucai closed the pull request at:

https://github.com/apache/spark/pull/21156


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-07-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21156#discussion_r200947380
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -76,8 +76,36 @@ case class SortMergeJoinExec(
 s"${getClass.getSimpleName} should not take $x as the JoinType")
   }
 
-  override def requiredChildDistribution: Seq[Distribution] =
-HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+  private def avoidShuffleIfPossible(
+  joinKeys: Seq[Expression],
+  expressions: Seq[Expression]): Seq[Distribution] = {
+val indices = expressions.map(x => 
joinKeys.indexWhere(_.semanticEquals(x)))
--- End diff --

yes, you're right, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-07-09 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21156#discussion_r200945621
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -76,8 +76,36 @@ case class SortMergeJoinExec(
 s"${getClass.getSimpleName} should not take $x as the JoinType")
   }
 
-  override def requiredChildDistribution: Seq[Distribution] =
-HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+  private def avoidShuffleIfPossible(
+  joinKeys: Seq[Expression],
+  expressions: Seq[Expression]): Seq[Distribution] = {
+val indices = expressions.map(x => 
joinKeys.indexWhere(_.semanticEquals(x)))
--- End diff --

```  
  case HashPartitioning(leftExpressions, _)
if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) =>
avoidShuffleIfPossible(leftKeys, leftExpressions)
```
` if leftPartitioning.satisfies(ClusteredDistribution(leftKeys))` has 
ensured `expressions` is a subset of `joinKeys`, so it would not return -1, 
right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-07-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21156#discussion_r200941904
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -76,8 +76,36 @@ case class SortMergeJoinExec(
 s"${getClass.getSimpleName} should not take $x as the JoinType")
   }
 
-  override def requiredChildDistribution: Seq[Distribution] =
-HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+  private def avoidShuffleIfPossible(
+  joinKeys: Seq[Expression],
+  expressions: Seq[Expression]): Seq[Distribution] = {
+val indices = expressions.map(x => 
joinKeys.indexWhere(_.semanticEquals(x)))
+HashClusteredDistribution(indices.map(leftKeys(_))) ::
+  HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil
+  }
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+if (!conf.sortMergeJoinExecChildrenPartitioningDetection) {
+  return HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+}
+
+val leftPartitioning = left.outputPartitioning
+val rightPartitioning = right.outputPartitioning
+leftPartitioning match {
+  case HashPartitioning(leftExpressions, _)
+if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) =>
+avoidShuffleIfPossible(leftKeys, leftExpressions)
+
+  case _ => rightPartitioning match {
--- End diff --

yes, we can do that, but anyway this `case` is useless...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-07-09 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21156#discussion_r200940424
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -76,8 +76,36 @@ case class SortMergeJoinExec(
 s"${getClass.getSimpleName} should not take $x as the JoinType")
   }
 
-  override def requiredChildDistribution: Seq[Distribution] =
-HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+  private def avoidShuffleIfPossible(
+  joinKeys: Seq[Expression],
+  expressions: Seq[Expression]): Seq[Distribution] = {
+val indices = expressions.map(x => 
joinKeys.indexWhere(_.semanticEquals(x)))
+HashClusteredDistribution(indices.map(leftKeys(_))) ::
+  HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil
+  }
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+if (!conf.sortMergeJoinExecChildrenPartitioningDetection) {
+  return HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+}
+
+val leftPartitioning = left.outputPartitioning
+val rightPartitioning = right.outputPartitioning
+leftPartitioning match {
+  case HashPartitioning(leftExpressions, _)
+if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) =>
+avoidShuffleIfPossible(leftKeys, leftExpressions)
+
+  case _ => rightPartitioning match {
--- End diff --

In that case, we can return `OrderedDistribution :: OrderedDistribution :: 
Nil` to avoid shuffle for the `RangePartitioning` side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-07-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21156#discussion_r200940409
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -76,8 +76,36 @@ case class SortMergeJoinExec(
 s"${getClass.getSimpleName} should not take $x as the JoinType")
   }
 
-  override def requiredChildDistribution: Seq[Distribution] =
-HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+  private def avoidShuffleIfPossible(
+  joinKeys: Seq[Expression],
+  expressions: Seq[Expression]): Seq[Distribution] = {
+val indices = expressions.map(x => 
joinKeys.indexWhere(_.semanticEquals(x)))
--- End diff --

what if here we don't find an expression? I think it would return -1 
causing an error when using the index later. Can we also add a test case for 
this situation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-07-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21156#discussion_r200938314
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -76,8 +76,36 @@ case class SortMergeJoinExec(
 s"${getClass.getSimpleName} should not take $x as the JoinType")
   }
 
-  override def requiredChildDistribution: Seq[Distribution] =
-HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+  private def avoidShuffleIfPossible(
+  joinKeys: Seq[Expression],
+  expressions: Seq[Expression]): Seq[Distribution] = {
+val indices = expressions.map(x => 
joinKeys.indexWhere(_.semanticEquals(x)))
+HashClusteredDistribution(indices.map(leftKeys(_))) ::
+  HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil
+  }
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+if (!conf.sortMergeJoinExecChildrenPartitioningDetection) {
+  return HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+}
+
+val leftPartitioning = left.outputPartitioning
+val rightPartitioning = right.outputPartitioning
+leftPartitioning match {
+  case HashPartitioning(leftExpressions, _)
+if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) =>
+avoidShuffleIfPossible(leftKeys, leftExpressions)
+
+  case _ => rightPartitioning match {
--- End diff --

But that case would not be covered anyway here as we are returning that we 
require a `HashClusteredDistribution` so a `RangePartitioning` would never 
match anyway, wouldn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-07-09 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21156#discussion_r200937190
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -76,8 +76,36 @@ case class SortMergeJoinExec(
 s"${getClass.getSimpleName} should not take $x as the JoinType")
   }
 
-  override def requiredChildDistribution: Seq[Distribution] =
-HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+  private def avoidShuffleIfPossible(
+  joinKeys: Seq[Expression],
+  expressions: Seq[Expression]): Seq[Distribution] = {
+val indices = expressions.map(x => 
joinKeys.indexWhere(_.semanticEquals(x)))
+HashClusteredDistribution(indices.map(leftKeys(_))) ::
+  HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil
+  }
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+if (!conf.sortMergeJoinExecChildrenPartitioningDetection) {
+  return HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+}
+
+val leftPartitioning = left.outputPartitioning
+val rightPartitioning = right.outputPartitioning
+leftPartitioning match {
+  case HashPartitioning(leftExpressions, _)
+if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) =>
+avoidShuffleIfPossible(leftKeys, leftExpressions)
+
+  case _ => rightPartitioning match {
--- End diff --

Yes, you are right. The main purpose of this feature is for the bucketed 
table, so the `HashPartitioning` is enough.
Actually,  with the similar way, we can skip the shuffle for one side if it 
is `RangePartitioning` also, but I am not sure if it is really useful.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-07-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21156#discussion_r200934272
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---
@@ -334,8 +334,7 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
 )
   }
 
-  // Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704
-  ignore("avoid shuffle when join keys are a super-set of bucket keys") {
+  test("avoid shuffle when join keys are a super-set of bucket keys") {
--- End diff --

can we add more tests with different `BucketSpec` on the two sides?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-07-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21156#discussion_r200933998
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -76,8 +76,36 @@ case class SortMergeJoinExec(
 s"${getClass.getSimpleName} should not take $x as the JoinType")
   }
 
-  override def requiredChildDistribution: Seq[Distribution] =
-HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+  private def avoidShuffleIfPossible(
+  joinKeys: Seq[Expression],
+  expressions: Seq[Expression]): Seq[Distribution] = {
+val indices = expressions.map(x => 
joinKeys.indexWhere(_.semanticEquals(x)))
+HashClusteredDistribution(indices.map(leftKeys(_))) ::
+  HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil
+  }
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+if (!conf.sortMergeJoinExecChildrenPartitioningDetection) {
+  return HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
+}
+
+val leftPartitioning = left.outputPartitioning
+val rightPartitioning = right.outputPartitioning
+leftPartitioning match {
+  case HashPartitioning(leftExpressions, _)
+if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) =>
+avoidShuffleIfPossible(leftKeys, leftExpressions)
+
+  case _ => rightPartitioning match {
--- End diff --

IIUC if either `left` or `right` are not `HashPartitioning` we are sure we 
won't meet the required distribution, so I guess this is useless, isn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...

2018-04-25 Thread yucai
GitHub user yucai opened a pull request:

https://github.com/apache/spark/pull/21156

[SPARK-24087][SQL] Avoid shuffle when join keys are a super-set of bucket 
keys

## What changes were proposed in this pull request?

To improve the bucket join, when join keys are a super-set of bucket keys, 
we should avoid shuffle.

## How was this patch tested?

Enable ignored test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yucai/spark SPARK-24087

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21156.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21156






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org