This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new eadb591f37a [SPARK-45203][SQL][TEST] Join tests in 
KeyGroupedPartitioningSuite should use merge join hint
eadb591f37a is described below

commit eadb591f37a118096bab637e4b6ca913c2753a6b
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Mon Sep 18 11:50:16 2023 -0700

    [SPARK-45203][SQL][TEST] Join tests in KeyGroupedPartitioningSuite should 
use merge join hint
    
    ### What changes were proposed in this pull request?
    
    It's more robust to use join hints to enforce sort-merge join in the tests, 
instead of setting configs which may be ineffective after more advanced 
optimizations in the future.
    
    ### Why are the changes needed?
    
    make tests more future-proof
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    N/A
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #42983 from cloud-fan/minor.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../connector/KeyGroupedPartitioningSuite.scala    | 246 ++++++++-------------
 1 file changed, 93 insertions(+), 153 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index ffd1c8e31e9..4cb5457b66b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.connector
 
 import java.util.Collections
 
+import org.apache.spark.SparkConf
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression}
@@ -44,25 +45,9 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
     UnboundBucketFunction,
     UnboundTruncateFunction)
 
-  private var originalV2BucketingEnabled: Boolean = false
-  private var originalAutoBroadcastJoinThreshold: Long = -1
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    originalV2BucketingEnabled = conf.getConf(V2_BUCKETING_ENABLED)
-    conf.setConf(V2_BUCKETING_ENABLED, true)
-    originalAutoBroadcastJoinThreshold = 
conf.getConf(AUTO_BROADCASTJOIN_THRESHOLD)
-    conf.setConf(AUTO_BROADCASTJOIN_THRESHOLD, -1L)
-  }
-
-  override def afterAll(): Unit = {
-    try {
-      super.afterAll()
-    } finally {
-      conf.setConf(V2_BUCKETING_ENABLED, originalV2BucketingEnabled)
-      conf.setConf(AUTO_BROADCASTJOIN_THRESHOLD, 
originalAutoBroadcastJoinThreshold)
-    }
-  }
+  override def sparkConf: SparkConf = super.sparkConf
+    .set(V2_BUCKETING_ENABLED, true)
+    .set(AUTO_BROADCASTJOIN_THRESHOLD, -1L)
 
   before {
     functions.foreach { f =>
@@ -261,6 +246,25 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
       .add("order_amount", DoubleType)
       .add("customer_id", LongType)
 
+  private def selectWithMergeJoinHint(t1: String, t2: String): String = {
+    s"SELECT /*+ MERGE($t1, $t2) */ "
+  }
+
+  private def createJoinTestDF(
+      keys: Seq[(String, String)],
+      extraColumns: Seq[String] = Nil,
+      joinType: String = ""): DataFrame = {
+    val extraColList = if (extraColumns.isEmpty) "" else 
extraColumns.mkString(", ", ", ", "")
+    sql(
+      s"""
+         |${selectWithMergeJoinHint("i", "p")}
+         |id, name, i.price as purchase_price, p.price as sale_price 
$extraColList
+         |FROM testcat.ns.$items i $joinType JOIN testcat.ns.$purchases p
+         |ON ${keys.map(k => s"i.${k._1} = p.${k._2}").mkString(" AND ")}
+         |ORDER BY id, purchase_price, sale_price $extraColList
+         |""".stripMargin)
+  }
+
   private def testWithCustomersAndOrders(
       customers_partitions: Array[Transform],
       orders_partitions: Array[Transform],
@@ -273,9 +277,13 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
     sql(s"INSERT INTO testcat.ns.$orders VALUES " +
         s"(100.0, 1), (200.0, 1), (150.0, 2), (250.0, 2), (350.0, 2), (400.50, 
3)")
 
-    val df = sql("SELECT customer_name, customer_age, order_amount " +
-        s"FROM testcat.ns.$customers c JOIN testcat.ns.$orders o " +
-        "ON c.customer_id = o.customer_id ORDER BY c.customer_id, 
order_amount")
+    val df = sql(
+      s"""
+        |${selectWithMergeJoinHint("c", "o")}
+        |customer_name, customer_age, order_amount
+        |FROM testcat.ns.$customers c JOIN testcat.ns.$orders o
+        |ON c.customer_id = o.customer_id ORDER BY c.customer_id, order_amount
+        |""".stripMargin)
 
     val shuffles = collectShuffles(df.queryExecution.executedPlan)
     assert(shuffles.length == expectedNumOfShuffleExecs)
@@ -354,11 +362,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
 
     Seq(true, false).foreach { pushDownValues =>
       withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
-        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-            "ON i.id = p.item_id AND i.arrive_time = p.time " +
-            "ORDER BY id, purchase_price, sale_price")
-
+        val df = createJoinTestDF(Seq("id" -> "item_id", "arrive_time" -> 
"time"))
         val shuffles = collectShuffles(df.queryExecution.executedPlan)
         assert(shuffles.isEmpty, "should not add shuffle for both sides of the 
join")
         checkAnswer(df,
@@ -390,11 +394,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
 
     Seq(true, false).foreach { pushDownValues =>
       withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
-        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-            "ON i.id = p.item_id AND i.arrive_time = p.time " +
-            "ORDER BY id, purchase_price, sale_price")
-
+        val df = createJoinTestDF(Seq("id" -> "item_id", "arrive_time" -> 
"time"))
         val shuffles = collectShuffles(df.queryExecution.executedPlan)
         assert(shuffles.isEmpty, "should not add shuffle for both sides of the 
join")
         checkAnswer(df,
@@ -422,11 +422,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
 
     Seq(true, false).foreach { pushDownValues =>
       withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
-        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-            "ON i.id = p.item_id AND i.arrive_time = p.time " +
-            "ORDER BY id, purchase_price, sale_price")
-
+        val df = createJoinTestDF(Seq("id" -> "item_id", "arrive_time" -> 
"time"))
         val shuffles = collectShuffles(df.queryExecution.executedPlan)
         if (pushDownValues) {
           assert(shuffles.isEmpty, "should not add shuffle when partition 
values mismatch")
@@ -460,10 +456,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
 
     Seq(true, false).foreach { pushDownValues =>
       withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
-        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-            "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
+        val df = createJoinTestDF(Seq("id" -> "item_id"))
         val shuffles = collectShuffles(df.queryExecution.executedPlan)
         if (pushDownValues) {
           assert(shuffles.isEmpty, "should not add shuffle when partition 
values mismatch")
@@ -495,10 +488,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
 
     Seq(true, false).foreach { pushDownValues =>
       withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
-        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-            "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
+        val df = createJoinTestDF(Seq("id" -> "item_id"))
         val shuffles = collectShuffles(df.queryExecution.executedPlan)
         if (pushDownValues) {
           assert(shuffles.isEmpty, "should not add shuffle when partition 
values mismatch")
@@ -529,10 +519,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
 
     Seq(true, false).foreach { pushDownValues =>
       withSQLConf(SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
-        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-            "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
+        val df = createJoinTestDF(Seq("id" -> "item_id"))
         val shuffles = collectShuffles(df.queryExecution.executedPlan)
         if (pushDownValues) {
           assert(shuffles.isEmpty, "should not add shuffle when partition 
values mismatch")
@@ -569,10 +556,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
           withSQLConf(
               SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString,
               
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) {
-            val df = sql("SELECT id, name, i.price as purchase_price, p.price 
as sale_price " +
-                s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-                "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
+            val df = createJoinTestDF(Seq("id" -> "item_id"))
             val shuffles = collectShuffles(df.queryExecution.executedPlan)
             assert(shuffles.isEmpty, "should not contain any shuffle")
             if (pushDownValues) {
@@ -613,10 +597,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
           withSQLConf(
               SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString,
               
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) {
-            val df = sql("SELECT id, name, i.price as purchase_price, p.price 
as sale_price " +
-                s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-                "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
+            val df = createJoinTestDF(Seq("id" -> "item_id"))
             val shuffles = collectShuffles(df.queryExecution.executedPlan)
             assert(shuffles.isEmpty, "should not contain any shuffle")
             if (pushDownValues) {
@@ -663,10 +644,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
           withSQLConf(
               SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString,
               
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) {
-            val df = sql("SELECT id, name, i.price as purchase_price, p.price 
as sale_price " +
-                s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-                "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
+            val df = createJoinTestDF(Seq("id" -> "item_id"))
             val shuffles = collectShuffles(df.queryExecution.executedPlan)
             if (pushDownValues) {
               assert(shuffles.isEmpty, "should not contain any shuffle")
@@ -714,10 +692,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
           withSQLConf(
               SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString,
               
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) {
-            val df = sql("SELECT id, name, i.price as purchase_price, p.price 
as sale_price " +
-                s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-                "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
+            val df = createJoinTestDF(Seq("id" -> "item_id"))
             val shuffles = collectShuffles(df.queryExecution.executedPlan)
             if (pushDownValues) {
               assert(shuffles.isEmpty, "should not contain any shuffle")
@@ -761,10 +736,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
           withSQLConf(
               SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString,
               
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) {
-            val df = sql("SELECT id, name, i.price as purchase_price, p.price 
as sale_price " +
-                s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-                "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
+            val df = createJoinTestDF(Seq("id" -> "item_id"))
             val shuffles = collectShuffles(df.queryExecution.executedPlan)
             if (pushDownValues) {
               assert(shuffles.isEmpty, "should not contain any shuffle")
@@ -808,11 +780,8 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
             SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> 
false.toString,
             SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString,
             SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key 
-> enable) {
-            val df = sql("SELECT id, name, i.price as purchase_price, p.price 
as sale_price " +
-                s"FROM testcat.ns.$items i LEFT JOIN testcat.ns.$purchases p " 
+
-                "ON i.id = p.item_id AND i.arrive_time = p.time " +
-                "ORDER BY id, purchase_price, sale_price")
-
+            val df = createJoinTestDF(
+              Seq("id" -> "item_id", "arrive_time" -> "time"), joinType = 
"LEFT")
             val shuffles = collectShuffles(df.queryExecution.executedPlan)
             if (pushDownValues) {
               assert(shuffles.isEmpty, "should not contain any shuffle")
@@ -860,11 +829,8 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
             SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> 
false.toString,
             SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString,
             SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key 
-> enable) {
-            val df = sql("SELECT id, name, i.price as purchase_price, p.price 
as sale_price " +
-                s"FROM testcat.ns.$items i RIGHT JOIN testcat.ns.$purchases p 
" +
-                "ON i.id = p.item_id AND i.arrive_time = p.time " +
-                "ORDER BY id, purchase_price, sale_price")
-
+            val df = createJoinTestDF(
+              Seq("id" -> "item_id", "arrive_time" -> "time"), joinType = 
"RIGHT")
             val shuffles = collectShuffles(df.queryExecution.executedPlan)
             if (pushDownValues) {
               assert(shuffles.isEmpty, "should not contain any shuffle")
@@ -911,11 +877,8 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
             SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> 
false.toString,
             SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString,
             SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key 
-> enable) {
-            val df = sql("SELECT id, name, i.price as purchase_price, p.price 
as sale_price " +
-                s"FROM testcat.ns.$items i FULL OUTER JOIN 
testcat.ns.$purchases p " +
-                "ON i.id = p.item_id AND i.arrive_time = p.time " +
-                "ORDER BY id, purchase_price, sale_price")
-
+            val df = createJoinTestDF(
+              Seq("id" -> "item_id", "arrive_time" -> "time"), joinType = 
"FULL OUTER")
             val shuffles = collectShuffles(df.queryExecution.executedPlan)
             if (pushDownValues) {
               assert(shuffles.isEmpty, "should not contain any shuffle")
@@ -1059,10 +1022,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
 
     Seq(true, false).foreach { shuffle =>
       withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> 
shuffle.toString) {
-        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-          s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-          "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
+        val df = createJoinTestDF(Seq("id" -> "item_id"))
         val shuffles = collectShuffles(df.queryExecution.executedPlan)
         if (shuffle) {
           assert(shuffles.size == 1, "only shuffle one side not report 
partitioning")
@@ -1094,11 +1054,8 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
 
     Seq(true, false).foreach { shuffle =>
       withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> 
shuffle.toString) {
-        Seq("JOIN", "LEFT OUTER JOIN", "RIGHT OUTER JOIN", "FULL OUTER 
JOIN").foreach { joinType =>
-          val df = sql(s"SELECT id, name, i.price as purchase_price, p.price 
as sale_price " +
-            s"FROM testcat.ns.$items i $joinType testcat.ns.$purchases p " +
-            "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
+        Seq("", "LEFT OUTER", "RIGHT OUTER", "FULL OUTER").foreach { joinType 
=>
+          val df = createJoinTestDF(Seq("id" -> "item_id"), joinType = 
joinType)
           val shuffles = collectShuffles(df.queryExecution.executedPlan)
           if (shuffle) {
             assert(shuffles.size == 1, "only shuffle one side not report 
partitioning")
@@ -1107,15 +1064,15 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
               "side is not enabled")
           }
           joinType match {
-            case "JOIN" =>
+            case "" =>
               checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 
19.5)))
-            case "LEFT OUTER JOIN" =>
+            case "LEFT OUTER" =>
               checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 
19.5),
                 Row(4, "cc", 15.5, null)))
-            case "RIGHT OUTER JOIN" =>
+            case "RIGHT OUTER" =>
               checkAnswer(df, Seq(Row(null, null, null, 26.0), Row(null, null, 
null, 50.0),
                 Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5)))
-            case "FULL OUTER JOIN" =>
+            case "FULL OUTER" =>
               checkAnswer(df, Seq(Row(null, null, null, 26.0), Row(null, null, 
null, 50.0),
                 Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5),
                 Row(4, "cc", 15.5, null)))
@@ -1141,10 +1098,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
 
     Seq(true, false).foreach { shuffle =>
       withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> 
shuffle.toString) {
-        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-          s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-          "ON i.id = p.item_id and i.arrive_time = p.time ORDER BY id, 
purchase_price, sale_price")
-
+        val df = createJoinTestDF(Seq("id" -> "item_id", "arrive_time" -> 
"time"))
         val shuffles = collectShuffles(df.queryExecution.executedPlan)
         if (shuffle) {
           assert(shuffles.size == 1, "only shuffle one side not report 
partitioning")
@@ -1174,10 +1128,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
 
     Seq(true, false).foreach { shuffle =>
       withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> 
shuffle.toString) {
-        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-          s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-          "ON i.arrive_time = p.time ORDER BY id, purchase_price, sale_price")
-
+        val df = createJoinTestDF(Seq("arrive_time" -> "time"))
         val shuffles = collectShuffles(df.queryExecution.executedPlan)
         if (shuffle) {
           assert(shuffles.size == 2, "partitioning with transform not work 
now")
@@ -1215,10 +1166,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
         SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString,
         SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
         SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> 
"true") {
-        val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-          s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-          "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
+        val df = createJoinTestDF(Seq("id" -> "item_id"))
         checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 
19.5)))
       }
     }
@@ -1252,27 +1200,21 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
               partiallyClusteredEnabled.toString) {
 
           // join keys are not the same as the partition keys, therefore SPJ 
is not triggered.
-          val df = sql(
-            s"""
-               SELECT id, name, i.price as purchase_price, p.item_id, p.price 
as sale_price
-               FROM testcat.ns.$items i JOIN testcat.ns.$purchases p
-               ON i.arrive_time = p.time ORDER BY id, purchase_price, 
p.item_id, sale_price
-               """)
-
+          val df = createJoinTestDF(Seq("arrive_time" -> "time"), extraColumns 
= Seq("p.item_id"))
           val shuffles = collectShuffles(df.queryExecution.executedPlan)
           assert(shuffles.nonEmpty, "shuffle should exist when SPJ is not 
used")
 
           checkAnswer(df,
             Seq(
-              Row(1, "aa", 40.0, 1, 42.0),
-              Row(1, "aa", 40.0, 2, 11.0),
-              Row(1, "aa", 41.0, 1, 44.0),
-              Row(1, "aa", 41.0, 1, 45.0),
-              Row(2, "bb", 10.0, 1, 42.0),
-              Row(2, "bb", 10.0, 2, 11.0),
-              Row(2, "bb", 10.5, 1, 42.0),
-              Row(2, "bb", 10.5, 2, 11.0),
-              Row(3, "cc", 15.5, 3, 19.5)
+              Row(1, "aa", 40.0, 11.0, 2),
+              Row(1, "aa", 40.0, 42.0, 1),
+              Row(1, "aa", 41.0, 44.0, 1),
+              Row(1, "aa", 41.0, 45.0, 1),
+              Row(2, "bb", 10.0, 11.0, 2),
+              Row(2, "bb", 10.0, 42.0, 1),
+              Row(2, "bb", 10.5, 11.0, 2),
+              Row(2, "bb", 10.5, 42.0, 1),
+              Row(3, "cc", 15.5, 19.5, 3)
             )
           )
         }
@@ -1316,11 +1258,13 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
                 partiallyClustered.toString,
             SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key 
->
                 allowJoinKeysSubsetOfPartitionKeys.toString) {
-
-            val df = sql("SELECT t1.id AS id, t1.data AS t1data, t2.data AS 
t2data " +
-                s"FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 " +
-                "ON t1.id = t2.id ORDER BY t1.id, t1data, t2data")
-
+            val df = sql(
+              s"""
+                |${selectWithMergeJoinHint("t1", "t2")}
+                |t1.id AS id, t1.data AS t1data, t2.data AS t2data
+                |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+                |ON t1.id = t2.id ORDER BY t1.id, t1data, t2data
+                |""".stripMargin)
             val shuffles = collectShuffles(df.queryExecution.executedPlan)
             if (allowJoinKeysSubsetOfPartitionKeys) {
               assert(shuffles.isEmpty, "SPJ should be triggered")
@@ -1394,10 +1338,14 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
             SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key 
->
                 allowJoinKeysSubsetOfPartitionKeys.toString) {
 
-            val df = sql("SELECT t1.id AS t1id, t2.id as t2id, t1.data AS data 
" +
-                s"FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 " +
-                "ON t1.data = t2.data ORDER BY t1id, t1id, data")
-
+            val df = sql(
+              s"""
+                |${selectWithMergeJoinHint("t1", "t2")}
+                |t1.id AS t1id, t2.id as t2id, t1.data AS data
+                |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+                |ON t1.data = t2.data
+                |ORDER BY t1id, t1id, data
+                |""".stripMargin)
             checkAnswer(df, Seq(Row(1, 4, "aa"), Row(2, 5, "bb"), Row(3, 6, 
"cc")))
 
             val shuffles = collectShuffles(df.queryExecution.executedPlan)
@@ -1451,12 +1399,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
                 partiallyClustered.toString,
             SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key 
->
                 allowJoinKeysSubsetOfPartitionKeys.toString) {
-            val df = sql("SELECT id, name, i.price as purchase_price, " +
-                "p.item_id, p.price as sale_price " +
-                s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-                "ON i.arrive_time = p.time " +
-                "ORDER BY id, purchase_price, p.item_id, sale_price")
-
+            val df = createJoinTestDF(Seq("arrive_time" -> "time"), 
extraColumns = Seq("p.item_id"))
             // Currently SPJ for case where join key not same as partition key
             // only supported when push-part-values enabled
             val shuffles = collectShuffles(df.queryExecution.executedPlan)
@@ -1479,15 +1422,15 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
 
             checkAnswer(df,
               Seq(
-                Row(1, "aa", 40.0, 1, 42.0),
-                Row(1, "aa", 40.0, 2, 11.0),
-                Row(1, "aa", 41.0, 1, 44.0),
-                Row(1, "aa", 41.0, 1, 45.0),
-                Row(2, "bb", 10.0, 1, 42.0),
-                Row(2, "bb", 10.0, 2, 11.0),
-                Row(2, "bb", 10.5, 1, 42.0),
-                Row(2, "bb", 10.5, 2, 11.0),
-                Row(3, "cc", 15.5, 3, 19.5)
+                Row(1, "aa", 40.0, 11.0, 2),
+                Row(1, "aa", 40.0, 42.0, 1),
+                Row(1, "aa", 41.0, 44.0, 1),
+                Row(1, "aa", 41.0, 45.0, 1),
+                Row(2, "bb", 10.0, 11.0, 2),
+                Row(2, "bb", 10.0, 42.0, 1),
+                Row(2, "bb", 10.5, 11.0, 2),
+                Row(2, "bb", 10.5, 42.0, 1),
+                Row(3, "cc", 15.5, 19.5, 3)
               )
             )
           }
@@ -1522,10 +1465,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
           SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key
             -> partiallyClustered.toString,
           SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> 
"true") {
-          val df = sql("SELECT id, name, i.price as purchase_price, p.price as 
sale_price " +
-            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
-            "ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
-
+          val df = createJoinTestDF(Seq("id" -> "item_id"))
           val shuffles = collectShuffles(df.queryExecution.executedPlan)
           assert(shuffles.size == 1, "SPJ should be triggered")
           checkAnswer(df, Seq(Row(1, "aa", 30.0, 42.0),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to