[spark] branch master updated: [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9520087409d [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys 9520087409d is described below commit 9520087409d5bd7e6a2651dacf2c295d564d5559 Author: Szehon Ho AuthorDate: Mon Sep 11 11:18:46 2023 -0700 [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys ### What changes were proposed in this pull request? - Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled - Change key compatibility checks in EnsureRequirements. Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case (if this flag is enabled) - Change BatchScanExec/DataSourceV2Relation to group splits by join keys if they differ from partition keys (previously grouped only by partition values). Do same for all auxiliary data structure, like commonPartValues. - Implement partiallyClustered skew-handling. - Group only the replicate side (now by join key as well), replicate by the total size of other-side partitions that share the join key. - add an additional sort for partitions based on join key, as when we group the replicate side, partition ordering becomes out of order from the non-replicate side. ### Why are the changes needed? - Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? -Added tests in KeyGroupedPartitioningSuite -Found two existing problems, will address in separate PR: - Because of https://github.com/apache/spark/pull/37886 we have to select all join keys to trigger SPJ in this case, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered. Need to see how to relax this. - https://issues.apache.org/jira/browse/SPARK-44641 was found when testing this change. This pr refactors some of those code to add group-by-join-key, but doesnt change the underlying logic, so issue continues to exist. Hopefully this will also get fixed in another way. Closes #42306 from szehon-ho/spj_attempt_master. Authored-by: Szehon Ho Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/plans/physical/partitioning.scala | 59 - .../org/apache/spark/sql/internal/SQLConf.scala| 15 ++ .../execution/datasources/v2/BatchScanExec.scala | 56 +++-- .../execution/exchange/EnsureRequirements.scala| 15 +- .../connector/KeyGroupedPartitioningSuite.scala| 265 - 5 files changed, 378 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 0be4a61f275..a61bd3b7324 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -355,7 +355,14 @@ case class KeyGroupedPartitioning( } else { // We'll need to find leaf attributes from the partition expressions first. val attributes = expressions.flatMap(_.collectLeaves()) -attributes.forall(x => requiredClustering.exists(_.semanticEquals(x))) + +if (SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) { + // check that all join keys (required clustering keys) contained in partitioning + requiredClustering.forall(x => attributes.exists(_.semanticEquals(x))) && + expressions.forall(_.collectLeaves().size == 1) +} else { + attributes.forall(x => requiredClustering.exists(_.semanticEquals(x))) +} } case _ => @@ -364,8 +371,20 @@ case class KeyGroupedPartitioning( } } - override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = -KeyGroupedShuffleSpec(this, distribution) + override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = { +val result = KeyGroupedShuffleSpec(this, distribution) +if (SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) { + // If allowing join keys to be subset of clustering keys, we should create a new + // `KeyGroupedPartitioning` here that is grouped on the join keys instead, and use that as + // the returned shuffle spec. + val joinKeyPositions = result.keyPositions.map(_.nonEmpty).zipWithIndex.filter(_._1).map(_._2) + val projectedPartitioning =
[spark] branch dependabot/maven/resource-managers/mesos/org.apache.mesos-mesos-1.6.2 deleted (was d137718b0be)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/resource-managers/mesos/org.apache.mesos-mesos-1.6.2 in repository https://gitbox.apache.org/repos/asf/spark.git was d137718b0be Bump org.apache.mesos:mesos in /resource-managers/mesos The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-45109][SQL][CONNECT][FOLLOWUP] Fix log function in Connect
This is an automated email from the ASF dual-hosted git repository. ptoth pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new ecf507fd976 [SPARK-45109][SQL][CONNECT][FOLLOWUP] Fix log function in Connect ecf507fd976 is described below commit ecf507fd976752eb466ccba4a7ed005c1542a22d Author: Peter Toth AuthorDate: Mon Sep 11 19:04:41 2023 +0200 [SPARK-45109][SQL][CONNECT][FOLLOWUP] Fix log function in Connect ### What changes were proposed in this pull request? This is a follow-up PR to https://github.com/apache/spark/pull/42863, the 1 argument `log` function should also point to `ln`. ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No, these Spark Connect functions haven't been released. ### How was this patch tested? Exsiting UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42869 from peter-toth/SPARK-45109-fix-log. Authored-by: Peter Toth Signed-off-by: Peter Toth (cherry picked from commit 6c3d9f5d89dfc974a5f799b73325aebf10f3cf16) Signed-off-by: Peter Toth --- .../src/main/scala/org/apache/spark/sql/functions.scala | 2 +- .../query-tests/explain-results/function_log.explain| 2 +- .../resources/query-tests/queries/function_log.json | 2 +- .../query-tests/queries/function_log.proto.bin | Bin 172 -> 171 bytes 4 files changed, 3 insertions(+), 3 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 16e787f825a..8f55954a63f 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -2632,7 +2632,7 @@ object functions { * @group math_funcs * @since 3.4.0 */ - def log(e: Column): Column = Column.fn("log", e) + def log(e: Column): Column = ln(e) /** * Computes the natural logarithm of the given column. diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_log.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_log.explain index d3c3743b1ef..66b782ac817 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_log.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_log.explain @@ -1,2 +1,2 @@ -Project [LOG(E(), b#0) AS LOG(E(), b)#0] +Project [ln(b#0) AS ln(b)#0] +- LocalRelation , [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_log.json b/connector/connect/common/src/test/resources/query-tests/queries/function_log.json index 1b2d0ed0b14..ababbc52d08 100644 --- a/connector/connect/common/src/test/resources/query-tests/queries/function_log.json +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_log.json @@ -13,7 +13,7 @@ }, "expressions": [{ "unresolvedFunction": { -"functionName": "log", +"functionName": "ln", "arguments": [{ "unresolvedAttribute": { "unparsedIdentifier": "b" diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_log.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_log.proto.bin index 548fb480dd2..ecb87a1fc41 100644 Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_log.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_log.proto.bin differ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45109][SQL][CONNECT][FOLLOWUP] Fix log function in Connect
This is an automated email from the ASF dual-hosted git repository. ptoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6c3d9f5d89d [SPARK-45109][SQL][CONNECT][FOLLOWUP] Fix log function in Connect 6c3d9f5d89d is described below commit 6c3d9f5d89dfc974a5f799b73325aebf10f3cf16 Author: Peter Toth AuthorDate: Mon Sep 11 19:04:41 2023 +0200 [SPARK-45109][SQL][CONNECT][FOLLOWUP] Fix log function in Connect ### What changes were proposed in this pull request? This is a follow-up PR to https://github.com/apache/spark/pull/42863, the 1 argument `log` function should also point to `ln`. ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No, these Spark Connect functions haven't been released. ### How was this patch tested? Exsiting UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42869 from peter-toth/SPARK-45109-fix-log. Authored-by: Peter Toth Signed-off-by: Peter Toth --- .../src/main/scala/org/apache/spark/sql/functions.scala | 2 +- .../query-tests/explain-results/function_log.explain| 2 +- .../resources/query-tests/queries/function_log.json | 2 +- .../query-tests/queries/function_log.proto.bin | Bin 172 -> 171 bytes 4 files changed, 3 insertions(+), 3 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index ab71cb80186..b2102d4ba55 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -2632,7 +2632,7 @@ object functions { * @group math_funcs * @since 3.4.0 */ - def log(e: Column): Column = Column.fn("log", e) + def log(e: Column): Column = ln(e) /** * Computes the natural logarithm of the given column. diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_log.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_log.explain index d3c3743b1ef..66b782ac817 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_log.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_log.explain @@ -1,2 +1,2 @@ -Project [LOG(E(), b#0) AS LOG(E(), b)#0] +Project [ln(b#0) AS ln(b)#0] +- LocalRelation , [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_log.json b/connector/connect/common/src/test/resources/query-tests/queries/function_log.json index 1b2d0ed0b14..ababbc52d08 100644 --- a/connector/connect/common/src/test/resources/query-tests/queries/function_log.json +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_log.json @@ -13,7 +13,7 @@ }, "expressions": [{ "unresolvedFunction": { -"functionName": "log", +"functionName": "ln", "arguments": [{ "unresolvedAttribute": { "unparsedIdentifier": "b" diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_log.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_log.proto.bin index 548fb480dd2..ecb87a1fc41 100644 Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_log.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_log.proto.bin differ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: Revert "[SPARK-45075][SQL] Fix alter table with invalid default value will not report error"
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new e3b8402a1c0 Revert "[SPARK-45075][SQL] Fix alter table with invalid default value will not report error" e3b8402a1c0 is described below commit e3b8402a1c042c46d84a6527516402e79fbf0c19 Author: Dongjoon Hyun AuthorDate: Mon Sep 11 09:05:29 2023 -0700 Revert "[SPARK-45075][SQL] Fix alter table with invalid default value will not report error" This reverts commit 4a181e5eacfb8103cce50decaeabdd1441dca676. --- .../spark/sql/connector/catalog/TableChange.java | 3 ++- .../plans/logical/v2AlterTableCommands.scala | 11 ++-- .../spark/sql/connector/AlterTableTests.scala | 29 -- 3 files changed, 4 insertions(+), 39 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java index ebecb6f507e..609cfab2d56 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java @@ -696,8 +696,9 @@ public interface TableChange { /** * Returns the column default value SQL string (Spark SQL dialect). The default value literal * is not provided as updating column default values does not need to back-fill existing data. - * Empty string means dropping the column default value. + * Null means dropping the column default value. */ +@Nullable public String newDefaultValue() { return newDefaultValue; } @Override diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala index b02c4fac12d..eb9d45f06ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, ResolvedFieldName} +import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, TypeUtils} +import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.DataType @@ -228,13 +228,6 @@ case class AlterColumn( TableChange.updateColumnPosition(colName, newPosition.position) } val defaultValueChange = setDefaultExpression.map { newDefaultExpression => - if (newDefaultExpression.nonEmpty) { -// SPARK-45075: We call 'ResolveDefaultColumns.analyze' here to make sure that the default -// value parses successfully, and return an error otherwise -val newDataType = dataType.getOrElse(column.asInstanceOf[ResolvedFieldName].field.dataType) -ResolveDefaultColumns.analyze(column.name.last, newDataType, newDefaultExpression, - "ALTER TABLE ALTER COLUMN") - } TableChange.updateColumnDefaultValue(colName, newDefaultExpression) } typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange ++ defaultValueChange diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index 6d20c45d48f..2047212a4ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -366,35 +366,6 @@ trait AlterTableTests extends SharedSparkSession with QueryErrorsBase { } } - test("SPARK-45075: ALTER COLUMN with invalid default value") { -withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format, ") { - withTable("t") { -sql(s"create table t(i boolean) using $v2Format") -// The default value fails to analyze. -checkError( - exception = intercept[AnalysisException] { -sql("alter table t add column s bigint default badvalue") - }, - errorClass = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION", - parameters = Map( -"statement" -> "ALTER TABLE", -"colName" -> "`s`", -"defaultValue" -> "badvalue")) - -sql("alter table t add column s bigint default 3L") -
[spark] branch dependabot/maven/resource-managers/mesos/org.apache.mesos-mesos-1.6.2 created (now d137718b0be)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/resource-managers/mesos/org.apache.mesos-mesos-1.6.2 in repository https://gitbox.apache.org/repos/asf/spark.git at d137718b0be Bump org.apache.mesos:mesos in /resource-managers/mesos No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45111][BUILD] Upgrade maven to 3.9.4
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 169aa4bee95 [SPARK-45111][BUILD] Upgrade maven to 3.9.4 169aa4bee95 is described below commit 169aa4bee950e2249d853f00b4e5fca67edfaa80 Author: yangjie01 AuthorDate: Mon Sep 11 10:59:57 2023 -0500 [SPARK-45111][BUILD] Upgrade maven to 3.9.4 ### What changes were proposed in this pull request? This PR aims to upgrade Maven to 3.8.8 from 3.9.4. ### Why are the changes needed? The new version [lift JDK minimum to JDK 8](https://issues.apache.org/jira/browse/MNG-7452) and [make the build work on JDK 20](https://issues.apache.org/jira/browse/MNG-7743) . It also brings a series of bug fixes, such as [Fix deadlock during forked lifecycle executions](https://issues.apache.org/jira/browse/MNG-7487), along with a number of new optimizations like [Profile activation by packaging](https://issues.apache.org/jira/browse/MNG-6609). On the other hand, the new version re [...] For other updates, refer to the corresponding release notes: - https://maven.apache.org/docs/3.9.0/release-notes.html | https://github.com/apache/maven/releases/tag/maven-3.9.0 - https://maven.apache.org/docs/3.9.1/release-notes.html | https://github.com/apache/maven/releases/tag/maven-3.9.1 - https://maven.apache.org/docs/3.9.2/release-notes.html | https://github.com/apache/maven/releases/tag/maven-3.9.2 - https://maven.apache.org/docs/3.9.3/release-notes.html | https://github.com/apache/maven/releases/tag/maven-3.9.3 - https://maven.apache.org/docs/3.9.4/release-notes.html | https://github.com/apache/maven/releases/tag/maven-3.9.4 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual test : run `build/mvn -version` wll trigger download `apache-maven-3.9.4-bin.tar.gz` ``` exec: curl --silent --show-error -L https://www.apache.org/dyn/closer.lua/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz?action=download ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #42827 from LuciferYang/maven-394. Authored-by: yangjie01 Signed-off-by: Sean Owen --- dev/appveyor-install-dependencies.ps1 | 2 +- docs/building-spark.md| 2 +- pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/appveyor-install-dependencies.ps1 b/dev/appveyor-install-dependencies.ps1 index db154cd51da..682d388bdf9 100644 --- a/dev/appveyor-install-dependencies.ps1 +++ b/dev/appveyor-install-dependencies.ps1 @@ -81,7 +81,7 @@ if (!(Test-Path $tools)) { # == Maven # Push-Location $tools # -# $mavenVer = "3.8.8" +# $mavenVer = "3.9.4" # Start-FileDownload "https://archive.apache.org/dist/maven/maven-3/$mavenVer/binaries/apache-maven-$mavenVer-bin.zip; "maven.zip" # # # extract diff --git a/docs/building-spark.md b/docs/building-spark.md index 4b8e70655d5..bbbc51d8c22 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -27,7 +27,7 @@ license: | ## Apache Maven The Maven-based build is the build of reference for Apache Spark. -Building Spark using Maven requires Maven 3.8.8 and Java 8/11/17. +Building Spark using Maven requires Maven 3.9.4 and Java 8/11/17. Spark requires Scala 2.12/2.13; support for Scala 2.11 was removed in Spark 3.0.0. ### Setting up Maven's Memory Usage diff --git a/pom.xml b/pom.xml index a61d603fe1c..02920c0ae74 100644 --- a/pom.xml +++ b/pom.xml @@ -115,7 +115,7 @@ 1.8 ${java.version} ${java.version} -3.8.8 +3.9.4 3.1.0 spark 9.5 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43251][SQL] Replace the error class `_LEGACY_ERROR_TEMP_2015` with an internal error
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c7ea3f7d53d [SPARK-43251][SQL] Replace the error class `_LEGACY_ERROR_TEMP_2015` with an internal error c7ea3f7d53d is described below commit c7ea3f7d53d5a7674f3da0db07018c1f0c43dbf6 Author: dengziming AuthorDate: Mon Sep 11 18:28:31 2023 +0300 [SPARK-43251][SQL] Replace the error class `_LEGACY_ERROR_TEMP_2015` with an internal error ### What changes were proposed in this pull request? Replace the legacy error class `_LEGACY_ERROR_TEMP_2015` with an internal error as it is not triggered by the user space. ### Why are the changes needed? As the error is not triggered by the user space, the legacy error class can be replaced by an internal error. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing test cases. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42845 from dengziming/SPARK-43251. Authored-by: dengziming Signed-off-by: Max Gekk --- common/utils/src/main/resources/error/error-classes.json | 5 - .../scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala | 9 +++-- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 2954d8b9338..282af8c199d 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -4944,11 +4944,6 @@ "Negative values found in " ] }, - "_LEGACY_ERROR_TEMP_2015" : { -"message" : [ - "Cannot generate code for incomparable type: ." -] - }, "_LEGACY_ERROR_TEMP_2016" : { "message" : [ "Can not interpolate into code block." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 2d655be0e70..417ba38c66f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -405,12 +405,9 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE } def cannotGenerateCodeForIncomparableTypeError( - codeType: String, dataType: DataType): SparkIllegalArgumentException = { -new SparkIllegalArgumentException( - errorClass = "_LEGACY_ERROR_TEMP_2015", - messageParameters = Map( -"codeType" -> codeType, -"dataType" -> dataType.catalogString)) + codeType: String, dataType: DataType): Throwable = { +SparkException.internalError( + s"Cannot generate $codeType code for incomparable type: ${toSQLType(dataType)}.") } def cannotInterpolateClassIntoCodeBlockError(arg: Any): SparkIllegalArgumentException = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45069][SQL] SQL variable should always be resolved after outer reference
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2a10c8d93aa [SPARK-45069][SQL] SQL variable should always be resolved after outer reference 2a10c8d93aa is described below commit 2a10c8d93aa9033842471e4f676fddb3b3f90940 Author: Wenchen Fan AuthorDate: Mon Sep 11 22:57:47 2023 +0800 [SPARK-45069][SQL] SQL variable should always be resolved after outer reference ### What changes were proposed in this pull request? This is a bug fix for the recently added SQL variable feature. It's designed to resolve columns to SQL variable as the last resort, but for columns in Aggregate, we may resolve columns to outer reference first. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? yes, the query result can be wrong before this fix ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #42803 from cloud-fan/meta-col. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/analysis/Analyzer.scala | 50 +++--- .../catalyst/analysis/ColumnResolutionHelper.scala | 26 --- .../analysis/ResolveReferencesInAggregate.scala| 24 +-- .../analysis/ResolveReferencesInSort.scala | 13 +++--- .../analyzer-results/sql-session-variables.sql.out | 25 +-- .../sql-tests/inputs/sql-session-variables.sql | 3 ++ .../results/sql-session-variables.sql.out | 19 +++- 7 files changed, 105 insertions(+), 55 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a8c99075cdb..da983ff0c7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -683,7 +683,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // of the analysis phase. val colResolved = h.mapExpressions { e => resolveExpressionByPlanOutput( - resolveColWithAgg(e, aggForResolving), aggForResolving, allowOuter = true) + resolveColWithAgg(e, aggForResolving), aggForResolving, includeLastResort = true) } val cond = if (SubqueryExpression.hasSubquery(colResolved.havingCondition)) { val fake = Project(Alias(colResolved.havingCondition, "fake")() :: Nil, aggregate.child) @@ -1450,6 +1450,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor *e.g. `SELECT col, current_date FROM t`. * 4. Resolves the columns to outer references with the outer plan if we are resolving subquery *expressions. + * 5. Resolves the columns to SQL variables. * * Some plan nodes have special column reference resolution logic, please read these sub-rules for * details: @@ -1568,7 +1569,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case g @ Generate(generator, _, _, _, _, _) if generator.resolved => g case g @ Generate(generator, join, outer, qualifier, output, child) => -val newG = resolveExpressionByPlanOutput(generator, child, throws = true, allowOuter = true) +val newG = resolveExpressionByPlanOutput( + generator, child, throws = true, includeLastResort = true) if (newG.fastEquals(generator)) { g } else { @@ -1584,7 +1586,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case plan => plan } val resolvedOrder = mg.dataOrder -.map(resolveExpressionByPlanOutput(_, planForResolve).asInstanceOf[SortOrder]) + .map(resolveExpressionByPlanOutput(_, planForResolve).asInstanceOf[SortOrder]) mg.copy(dataOrder = resolvedOrder) // Left and right sort expression have to be resolved against the respective child plan only @@ -1614,13 +1616,11 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // Special case for Project as it supports lateral column alias. case p: Project => -val resolvedNoOuter = p.projectList - .map(resolveExpressionByPlanChildren(_, p, allowOuter = false)) +val resolvedBasic = p.projectList.map(resolveExpressionByPlanChildren(_, p)) // Lateral column alias has higher priority than outer reference. -val resolvedWithLCA = resolveLateralColumnAlias(resolvedNoOuter) -val resolvedWithOuter = resolvedWithLCA.map(resolveOuterRef)
[spark] branch master updated: [SPARK-45114][PYTHON][DOCS] Adjust the `versionadded` and `versionchanged` information to the parameters
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8ee22d9358e [SPARK-45114][PYTHON][DOCS] Adjust the `versionadded` and `versionchanged` information to the parameters 8ee22d9358e is described below commit 8ee22d9358e945b0631abef123fbfd52c91505bc Author: Ruifeng Zheng AuthorDate: Mon Sep 11 19:41:28 2023 +0800 [SPARK-45114][PYTHON][DOCS] Adjust the `versionadded` and `versionchanged` information to the parameters ### What changes were proposed in this pull request? 1, for newly added parameters, using `versionadded` instead of `versionchanged`, to follow pandas https://github.com/pandas-dev/pandas/blob/cea0cc0a54725ed234e2f51cc21a1182674a6032/pandas/io/sql.py#L317 2, for newly changed parameters, move `versionchanged` under the corresponding parameter ### Why are the changes needed? for better doc ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? CI ### Was this patch authored or co-authored using generative AI tooling? NO Closes #42867 from zhengruifeng/py_doc_minor. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/catalog.py | 12 python/pyspark/sql/dataframe.py | 39 +++ 2 files changed, 23 insertions(+), 28 deletions(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 2c6ed28461f..e2b31e0e556 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -129,8 +129,7 @@ class Catalog: pattern : str The pattern that the catalog name needs to match. -.. versionchanged: 3.5.0 -Added ``pattern`` argument. +.. versionadded: 3.5.0 Returns --- @@ -201,8 +200,7 @@ class Catalog: pattern : str The pattern that the database name needs to match. -.. versionchanged: 3.5.0 -Adds ``pattern`` argument. +.. versionadded: 3.5.0 Returns --- @@ -325,8 +323,7 @@ class Catalog: pattern : str The pattern that the database name needs to match. -.. versionchanged: 3.5.0 -Adds ``pattern`` argument. +.. versionadded: 3.5.0 Returns --- @@ -455,8 +452,7 @@ class Catalog: pattern : str The pattern that the function name needs to match. -.. versionchanged: 3.5.0 -Adds ``pattern`` argument. +.. versionadded: 3.5.0 Returns --- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index eaed565ed0e..74da285ff1e 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -597,8 +597,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): level : int, optional, default None How many levels to print for nested schemas. -.. versionchanged:: 3.5.0 -Added Level parameter. +.. versionadded:: 3.5.0 Examples @@ -2864,14 +2863,14 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionchanged:: 3.4.0 Supports Spark Connect. -.. versionchanged:: 4.0.0 -Supports column ordinal. - Parameters -- cols : int, str, list or :class:`Column`, optional list of :class:`Column` or column names or column ordinals to sort by. +.. versionchanged:: 4.0.0 + Supports column ordinal. + Other Parameters ascending : bool or list, optional, default True @@ -2928,14 +2927,14 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionchanged:: 3.4.0 Supports Spark Connect. -.. versionchanged:: 4.0.0 -Supports column ordinal. - Parameters -- cols : int, str, list, or :class:`Column`, optional list of :class:`Column` or column names or column ordinals to sort by. +.. versionchanged:: 4.0.0 + Supports column ordinal. + Other Parameters ascending : bool or list, optional, default True @@ -3826,16 +3825,16 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionchanged:: 3.4.0 Supports Spark Connect. -.. versionchanged:: 4.0.0 -Supports column ordinal. - Parameters -- -cols : list, str or :class:`Column` +cols : list, str, int or
[spark] branch master updated: [SPARK-45113][PYTHON][DOCS] Refine docstrings of `collect_list/collect_set`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f41dc28fbda [SPARK-45113][PYTHON][DOCS] Refine docstrings of `collect_list/collect_set` f41dc28fbda is described below commit f41dc28fbda67920512c489f85eecbadf0184e6b Author: yangjie01 AuthorDate: Mon Sep 11 20:11:30 2023 +0900 [SPARK-45113][PYTHON][DOCS] Refine docstrings of `collect_list/collect_set` ### What changes were proposed in this pull request? This pr refine docstring of `collect_list/collect_set` and add some new examples. ### Why are the changes needed? To improve PySpark documentation ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass Github Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #42866 from LuciferYang/SPARK-45113. Authored-by: yangjie01 Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/functions.py | 106 +++- 1 file changed, 84 insertions(+), 22 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index f35477b7edc..d3ad7cfc84e 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -3617,33 +3617,64 @@ def kurtosis(col: "ColumnOrName") -> Column: @_try_remote_functions def collect_list(col: "ColumnOrName") -> Column: """ -Aggregate function: returns a list of objects with duplicates. +Aggregate function: Collects the values from a column into a list, +maintaining duplicates, and returns this list of objects. .. versionadded:: 1.6.0 .. versionchanged:: 3.4.0 Supports Spark Connect. -Notes -- -The function is non-deterministic because the order of collected results depends -on the order of the rows which may be non-deterministic after a shuffle. - Parameters -- col : :class:`~pyspark.sql.Column` or str -target column to compute on. +The target column on which the function is computed. Returns --- :class:`~pyspark.sql.Column` -list of objects with duplicates. +A new Column object representing a list of collected values, with duplicate values included. + +Notes +- +The function is non-deterministic as the order of collected results depends +on the order of the rows, which possibly becomes non-deterministic after shuffle operations. Examples ->>> df2 = spark.createDataFrame([(2,), (5,), (5,)], ('age',)) ->>> df2.agg(collect_list('age')).collect() -[Row(collect_list(age)=[2, 5, 5])] +Example 1: Collect values from a single column DataFrame + +>>> from pyspark.sql import functions as sf +>>> df = spark.createDataFrame([(2,), (5,), (5,)], ('age',)) +>>> df.select(sf.collect_list('age')).show() ++-+ +|collect_list(age)| ++-+ +|[2, 5, 5]| ++-+ + +Example 2: Collect values from a DataFrame with multiple columns + +>>> from pyspark.sql import functions as sf +>>> df = spark.createDataFrame([(1, "John"), (2, "John"), (3, "Ana")], ("id", "name")) +>>> df.groupBy("name").agg(sf.collect_list('id')).show() ++++ +|name|collect_list(id)| ++++ +|John| [1, 2]| +| Ana| [3]| ++++ + +Example 3: Collect values from a DataFrame and sort the result + +>>> from pyspark.sql import functions as sf +>>> df = spark.createDataFrame([(1,), (2,), (2,)], ('value',)) +>>> df.select(sf.array_sort(sf.collect_list('value')).alias('sorted_list')).show() ++---+ +|sorted_list| ++---+ +| [1, 2, 2]| ++---+ """ return _invoke_function_over_columns("collect_list", col) @@ -3677,33 +3708,64 @@ def array_agg(col: "ColumnOrName") -> Column: @_try_remote_functions def collect_set(col: "ColumnOrName") -> Column: """ -Aggregate function: returns a set of objects with duplicate elements eliminated. +Aggregate function: Collects the values from a column into a set, +eliminating duplicates, and returns this set of objects. .. versionadded:: 1.6.0 .. versionchanged:: 3.4.0 Supports Spark Connect. -Notes -- -The function is non-deterministic because the order of collected results depends -on the order of the rows which may be non-deterministic after a shuffle. - Parameters -- col : :class:`~pyspark.sql.Column` or str -target column to compute on. +The target column on which the function is computed. Returns ---