This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f605f5  [SPARK-28621][SQL] Make spark.sql.crossJoin.enabled default 
value true
7f605f5 is described below

commit 7f605f5559a6508acfa90ca4f3875c430f585770
Author: WeichenXu <weichen...@databricks.com>
AuthorDate: Tue Aug 27 21:53:37 2019 +0800

    [SPARK-28621][SQL] Make spark.sql.crossJoin.enabled default value true
    
    ### What changes were proposed in this pull request?
    
    Make `spark.sql.crossJoin.enabled` default value true
    
    ### Why are the changes needed?
    
    For implicit cross join, we can set up a watchdog to cancel it if running 
for a long time.
    When "spark.sql.crossJoin.enabled" is false, because 
`CheckCartesianProducts` is implemented in logical plan stage, it may generate 
some mismatching error which may confuse end user:
    * it's done in logical phase, so we may fail queries that can be executed 
via broadcast join, which is very fast.
    * if we move the check to the physical phase, then a query may success at 
the beginning, and begin to fail when the table size gets larger (other people 
insert data to the table). This can be quite confusing.
    * the CROSS JOIN syntax doesn't work well if join reorder happens.
    * some non-equi-join will generate plan using cartesian product, but 
`CheckCartesianProducts` do not detect it and raise error.
    
    So that in order to address this in simpler way, we can turn off showing 
this cross-join error by default.
    
    For reference, I list some cases raising mismatching error here:
    Providing:
    ```
    spark.range(2).createOrReplaceTempView("sm1") // can be broadcast
    spark.range(50000000).createOrReplaceTempView("bg1") // cannot be broadcast
    spark.range(60000000).createOrReplaceTempView("bg2") // cannot be broadcast
    ```
    1) Some join could be convert to broadcast nested loop join, but 
CheckCartesianProducts raise error. e.g.
    ```
    select sm1.id, bg1.id from bg1 join sm1 where sm1.id < bg1.id
    ```
    2) Some join will run by CartesianJoin but CheckCartesianProducts DO NOT 
raise error. e.g.
    ```
    select bg1.id, bg2.id from bg1 join bg2 where bg1.id < bg2.id
    ```
    
    ### Does this PR introduce any user-facing change?
    
    ### How was this patch tested?
    
    Closes #25520 from WeichenXu123/SPARK-28621.
    
    Authored-by: WeichenXu <weichen...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 R/pkg/tests/fulltests/test_sparkSQL.R                  | 18 ++++++++++++++----
 docs/sql-migration-guide-upgrade.md                    |  2 ++
 python/pyspark/sql/tests/test_dataframe.py             |  9 +++++----
 python/pyspark/sql/tests/test_udf.py                   |  5 +++--
 .../scala/org/apache/spark/sql/internal/SQLConf.scala  |  3 ++-
 .../ExtractPythonUDFFromJoinConditionSuite.scala       |  8 +++++---
 .../test/scala/org/apache/spark/sql/JoinSuite.scala    |  6 ++++--
 7 files changed, 35 insertions(+), 16 deletions(-)

diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index fdc7474..035525a 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -2352,10 +2352,20 @@ test_that("join(), crossJoin() and merge() on a 
DataFrame", {
 
   # inner join, not cartesian join
   expect_equal(count(where(join(df, df2), df$name == df2$name)), 3)
-  # cartesian join
-  expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }),
-               paste0(".*(org.apache.spark.sql.AnalysisException: Detected 
implicit cartesian",
-                      " product for INNER join between logical plans).*"))
+
+  conf <- callJMethod(sparkSession, "conf")
+  crossJoinEnabled <- callJMethod(conf, "get", "spark.sql.crossJoin.enabled")
+  callJMethod(conf, "set", "spark.sql.crossJoin.enabled", "false")
+  tryCatch({
+    # cartesian join
+    expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) 
}),
+                 paste0(".*(org.apache.spark.sql.AnalysisException: Detected 
implicit cartesian",
+                        " product for INNER join between logical plans).*"))
+  },
+  finally = {
+    # Resetting the conf back to default value
+    callJMethod(conf, "set", "spark.sql.crossJoin.enabled", crossJoinEnabled)
+  })
 
   joined <- crossJoin(df, df2)
   expect_equal(names(joined), c("age", "name", "name", "test"))
diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index a643a84..cc3ef1e 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -23,6 +23,8 @@ license: |
 {:toc}
 
 ## Upgrading From Spark SQL 2.4 to 3.0
+  - Since Spark 3.0, configuration `spark.sql.crossJoin.enabled` become 
internal configuration, and is true by default, so by default spark won't raise 
exception on sql with implicit cross join.
+
   - Since Spark 3.0, we reversed argument order of the trim function from 
`TRIM(trimStr, str)` to `TRIM(str, trimStr)` to be compatible with other 
databases.
 
   - Since Spark 3.0, PySpark requires a Pandas version of 0.23.2 or higher to 
use Pandas related functionality, such as `toPandas`, `createDataFrame` from 
Pandas DataFrame, etc.
diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index 5550a09..bc4ee88 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -466,11 +466,12 @@ class DataFrameTests(ReusedSQLTestCase):
         df1 = self.spark.createDataFrame([(1, "1")], ("key", "value"))
         df2 = self.spark.createDataFrame([(1, "1")], ("key", "value"))
 
-        # joins without conditions require cross join syntax
-        self.assertRaises(AnalysisException, lambda: df1.join(df2).collect())
+        with self.sql_conf({"spark.sql.crossJoin.enabled": False}):
+            # joins without conditions require cross join syntax
+            self.assertRaises(AnalysisException, lambda: 
df1.join(df2).collect())
 
-        # works with crossJoin
-        self.assertEqual(1, df1.crossJoin(df2).count())
+            # works with crossJoin
+            self.assertEqual(1, df1.crossJoin(df2).count())
 
     def test_cache(self):
         spark = self.spark
diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index 1999311..4a0a376 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -200,8 +200,9 @@ class UDFTests(ReusedSQLTestCase):
         # The udf uses attributes from both sides of join, so it is pulled out 
as Filter +
         # Cross join.
         df = left.join(right, f("a", "b"))
-        with self.assertRaisesRegexp(AnalysisException, 'Detected implicit 
cartesian product'):
-            df.collect()
+        with self.sql_conf({"spark.sql.crossJoin.enabled": False}):
+            with self.assertRaisesRegexp(AnalysisException, 'Detected implicit 
cartesian product'):
+                df.collect()
         with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
             self.assertEqual(df.collect(), [Row(a=1, b=1)])
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4eb4bc1..abe7353 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -731,10 +731,11 @@ object SQLConf {
     .createWithDefault(100000)
 
   val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
+    .internal()
     .doc("When false, we will throw an error if a query contains a cartesian 
product without " +
         "explicit CROSS JOIN syntax.")
     .booleanConf
-    .createWithDefault(false)
+    .createWithDefault(true)
 
   val ORDER_BY_ORDINAL = buildConf("spark.sql.orderByOrdinal")
     .doc("When true, the ordinal numbers are treated as the position in the 
select list. " +
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ExtractPythonUDFFromJoinConditionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ExtractPythonUDFFromJoinConditionSuite.scala
index 03afdd1..77bfc0b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ExtractPythonUDFFromJoinConditionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ExtractPythonUDFFromJoinConditionSuite.scala
@@ -68,10 +68,12 @@ class ExtractPythonUDFFromJoinConditionSuite extends 
PlanTest {
 
   private def comparePlanWithCrossJoinEnable(query: LogicalPlan, expected: 
LogicalPlan): Unit = {
     // AnalysisException thrown by CheckCartesianProducts while 
spark.sql.crossJoin.enabled=false
-    val exception = intercept[AnalysisException] {
-      Optimize.execute(query.analyze)
+    withSQLConf(CROSS_JOINS_ENABLED.key -> "false") {
+      val exception = intercept[AnalysisException] {
+        Optimize.execute(query.analyze)
+      }
+      assert(exception.message.startsWith("Detected implicit cartesian 
product"))
     }
-    assert(exception.message.startsWith("Detected implicit cartesian product"))
 
     // pull out the python udf while set spark.sql.crossJoin.enabled=true
     withSQLConf(CROSS_JOINS_ENABLED.key -> "true") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 1e97347..7274264 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -611,14 +611,16 @@ class JoinSuite extends QueryTest with SharedSparkSession 
{
       /** Cartesian product involving C, which is not involved in a CROSS join 
*/
       "select * from ((A join B on (A.key = B.key)) cross join D) join C on 
(A.key = D.a)");
 
-     def checkCartesianDetection(query: String): Unit = {
+    def checkCartesianDetection(query: String): Unit = {
       val e = intercept[Exception] {
         checkAnswer(sql(query), Nil);
       }
       assert(e.getMessage.contains("Detected implicit cartesian product"))
     }
 
-    cartesianQueries.foreach(checkCartesianDetection)
+    withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") {
+      cartesianQueries.foreach(checkCartesianDetection)
+    }
 
     // Check that left_semi, left_anti, existence joins without conditions do 
not throw
     // an exception if cross joins are disabled


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to