This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7f605f5 [SPARK-28621][SQL] Make spark.sql.crossJoin.enabled default value true 7f605f5 is described below commit 7f605f5559a6508acfa90ca4f3875c430f585770 Author: WeichenXu <weichen...@databricks.com> AuthorDate: Tue Aug 27 21:53:37 2019 +0800 [SPARK-28621][SQL] Make spark.sql.crossJoin.enabled default value true ### What changes were proposed in this pull request? Make `spark.sql.crossJoin.enabled` default value true ### Why are the changes needed? For implicit cross join, we can set up a watchdog to cancel it if running for a long time. When "spark.sql.crossJoin.enabled" is false, because `CheckCartesianProducts` is implemented in logical plan stage, it may generate some mismatching error which may confuse end user: * it's done in logical phase, so we may fail queries that can be executed via broadcast join, which is very fast. * if we move the check to the physical phase, then a query may success at the beginning, and begin to fail when the table size gets larger (other people insert data to the table). This can be quite confusing. * the CROSS JOIN syntax doesn't work well if join reorder happens. * some non-equi-join will generate plan using cartesian product, but `CheckCartesianProducts` do not detect it and raise error. So that in order to address this in simpler way, we can turn off showing this cross-join error by default. For reference, I list some cases raising mismatching error here: Providing: ``` spark.range(2).createOrReplaceTempView("sm1") // can be broadcast spark.range(50000000).createOrReplaceTempView("bg1") // cannot be broadcast spark.range(60000000).createOrReplaceTempView("bg2") // cannot be broadcast ``` 1) Some join could be convert to broadcast nested loop join, but CheckCartesianProducts raise error. e.g. ``` select sm1.id, bg1.id from bg1 join sm1 where sm1.id < bg1.id ``` 2) Some join will run by CartesianJoin but CheckCartesianProducts DO NOT raise error. e.g. ``` select bg1.id, bg2.id from bg1 join bg2 where bg1.id < bg2.id ``` ### Does this PR introduce any user-facing change? ### How was this patch tested? Closes #25520 from WeichenXu123/SPARK-28621. Authored-by: WeichenXu <weichen...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- R/pkg/tests/fulltests/test_sparkSQL.R | 18 ++++++++++++++---- docs/sql-migration-guide-upgrade.md | 2 ++ python/pyspark/sql/tests/test_dataframe.py | 9 +++++---- python/pyspark/sql/tests/test_udf.py | 5 +++-- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 3 ++- .../ExtractPythonUDFFromJoinConditionSuite.scala | 8 +++++--- .../test/scala/org/apache/spark/sql/JoinSuite.scala | 6 ++++-- 7 files changed, 35 insertions(+), 16 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index fdc7474..035525a 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -2352,10 +2352,20 @@ test_that("join(), crossJoin() and merge() on a DataFrame", { # inner join, not cartesian join expect_equal(count(where(join(df, df2), df$name == df2$name)), 3) - # cartesian join - expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }), - paste0(".*(org.apache.spark.sql.AnalysisException: Detected implicit cartesian", - " product for INNER join between logical plans).*")) + + conf <- callJMethod(sparkSession, "conf") + crossJoinEnabled <- callJMethod(conf, "get", "spark.sql.crossJoin.enabled") + callJMethod(conf, "set", "spark.sql.crossJoin.enabled", "false") + tryCatch({ + # cartesian join + expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }), + paste0(".*(org.apache.spark.sql.AnalysisException: Detected implicit cartesian", + " product for INNER join between logical plans).*")) + }, + finally = { + # Resetting the conf back to default value + callJMethod(conf, "set", "spark.sql.crossJoin.enabled", crossJoinEnabled) + }) joined <- crossJoin(df, df2) expect_equal(names(joined), c("age", "name", "name", "test")) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index a643a84..cc3ef1e 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -23,6 +23,8 @@ license: | {:toc} ## Upgrading From Spark SQL 2.4 to 3.0 + - Since Spark 3.0, configuration `spark.sql.crossJoin.enabled` become internal configuration, and is true by default, so by default spark won't raise exception on sql with implicit cross join. + - Since Spark 3.0, we reversed argument order of the trim function from `TRIM(trimStr, str)` to `TRIM(str, trimStr)` to be compatible with other databases. - Since Spark 3.0, PySpark requires a Pandas version of 0.23.2 or higher to use Pandas related functionality, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc. diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 5550a09..bc4ee88 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -466,11 +466,12 @@ class DataFrameTests(ReusedSQLTestCase): df1 = self.spark.createDataFrame([(1, "1")], ("key", "value")) df2 = self.spark.createDataFrame([(1, "1")], ("key", "value")) - # joins without conditions require cross join syntax - self.assertRaises(AnalysisException, lambda: df1.join(df2).collect()) + with self.sql_conf({"spark.sql.crossJoin.enabled": False}): + # joins without conditions require cross join syntax + self.assertRaises(AnalysisException, lambda: df1.join(df2).collect()) - # works with crossJoin - self.assertEqual(1, df1.crossJoin(df2).count()) + # works with crossJoin + self.assertEqual(1, df1.crossJoin(df2).count()) def test_cache(self): spark = self.spark diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 1999311..4a0a376 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -200,8 +200,9 @@ class UDFTests(ReusedSQLTestCase): # The udf uses attributes from both sides of join, so it is pulled out as Filter + # Cross join. df = left.join(right, f("a", "b")) - with self.assertRaisesRegexp(AnalysisException, 'Detected implicit cartesian product'): - df.collect() + with self.sql_conf({"spark.sql.crossJoin.enabled": False}): + with self.assertRaisesRegexp(AnalysisException, 'Detected implicit cartesian product'): + df.collect() with self.sql_conf({"spark.sql.crossJoin.enabled": True}): self.assertEqual(df.collect(), [Row(a=1, b=1)]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4eb4bc1..abe7353 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -731,10 +731,11 @@ object SQLConf { .createWithDefault(100000) val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") + .internal() .doc("When false, we will throw an error if a query contains a cartesian product without " + "explicit CROSS JOIN syntax.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val ORDER_BY_ORDINAL = buildConf("spark.sql.orderByOrdinal") .doc("When true, the ordinal numbers are treated as the position in the select list. " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ExtractPythonUDFFromJoinConditionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ExtractPythonUDFFromJoinConditionSuite.scala index 03afdd1..77bfc0b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ExtractPythonUDFFromJoinConditionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ExtractPythonUDFFromJoinConditionSuite.scala @@ -68,10 +68,12 @@ class ExtractPythonUDFFromJoinConditionSuite extends PlanTest { private def comparePlanWithCrossJoinEnable(query: LogicalPlan, expected: LogicalPlan): Unit = { // AnalysisException thrown by CheckCartesianProducts while spark.sql.crossJoin.enabled=false - val exception = intercept[AnalysisException] { - Optimize.execute(query.analyze) + withSQLConf(CROSS_JOINS_ENABLED.key -> "false") { + val exception = intercept[AnalysisException] { + Optimize.execute(query.analyze) + } + assert(exception.message.startsWith("Detected implicit cartesian product")) } - assert(exception.message.startsWith("Detected implicit cartesian product")) // pull out the python udf while set spark.sql.crossJoin.enabled=true withSQLConf(CROSS_JOINS_ENABLED.key -> "true") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 1e97347..7274264 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -611,14 +611,16 @@ class JoinSuite extends QueryTest with SharedSparkSession { /** Cartesian product involving C, which is not involved in a CROSS join */ "select * from ((A join B on (A.key = B.key)) cross join D) join C on (A.key = D.a)"); - def checkCartesianDetection(query: String): Unit = { + def checkCartesianDetection(query: String): Unit = { val e = intercept[Exception] { checkAnswer(sql(query), Nil); } assert(e.getMessage.contains("Detected implicit cartesian product")) } - cartesianQueries.foreach(checkCartesianDetection) + withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") { + cartesianQueries.foreach(checkCartesianDetection) + } // Check that left_semi, left_anti, existence joins without conditions do not throw // an exception if cross joins are disabled --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org