[spark] branch master updated: [SPARK-42158][SQL] Integrate `_LEGACY_ERROR_TEMP_1003` into `FIELD_NOT_FOUND`
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f373df8a757 [SPARK-42158][SQL] Integrate `_LEGACY_ERROR_TEMP_1003` into `FIELD_NOT_FOUND` f373df8a757 is described below commit f373df8a757e36ea84275c637087045d6cca3939 Author: itholic AuthorDate: Fri Jan 27 10:40:47 2023 +0300 [SPARK-42158][SQL] Integrate `_LEGACY_ERROR_TEMP_1003` into `FIELD_NOT_FOUND` ### What changes were proposed in this pull request? This PR proposes to integrate `_LEGACY_ERROR_TEMP_1003` into `FIELD_NOT_FOUND` ### Why are the changes needed? We should deduplicate the similar error classes into single error class by merging them. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Fixed exiting UTs. Closes #39706 from itholic/LEGACY_1003. Authored-by: itholic Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 5 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +- .../spark/sql/errors/QueryCompilationErrors.scala | 8 ++- .../spark/sql/connector/AlterTableTests.scala | 18 +- .../connector/V2CommandsCaseSensitivitySuite.scala | 64 +++--- 5 files changed, 65 insertions(+), 33 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 5d2e184874a..e6876751a22 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -2031,11 +2031,6 @@ "Try moving this class out of its parent class." ] }, - "_LEGACY_ERROR_TEMP_1003" : { -"message" : [ - "Couldn't find the reference column for at ." -] - }, "_LEGACY_ERROR_TEMP_1004" : { "message" : [ "Window specification is not defined in the WINDOW clause." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f0c22471afa..6f27c97ddf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -4053,9 +4053,8 @@ class Analyzer(override val catalogManager: CatalogManager) case Some(colName) => ResolvedFieldPosition(ColumnPosition.after(colName)) case None => -val name = if (resolvedParentName.isEmpty) "root" else resolvedParentName.quoted throw QueryCompilationErrors.referenceColNotFoundForAlterTableChangesError( - after, name) + col.colName, allFields) } case _ => ResolvedFieldPosition(u.position) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index c415fb91c5d..1a8c42b599e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -295,10 +295,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { } def referenceColNotFoundForAlterTableChangesError( - after: TableChange.After, parentName: String): Throwable = { + fieldName: String, fields: Array[String]): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1003", - messageParameters = Map("after" -> after.toString, "parentName" -> parentName)) + errorClass = "FIELD_NOT_FOUND", + messageParameters = Map( +"fieldName" -> toSQLId(fieldName), +"fields" -> fields.mkString(", "))) } def windowSpecificationNotDefinedError(windowName: String): Throwable = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index b69a0628f3e..2047212a4ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -160,7 +160,11 @@ trait AlterTableTests extends SharedSparkSession with QueryErrorsBase { val e1 = intercept[AnalysisException]( sql(s"ALTER TABLE $t ADD COLUMN c string AFTER non_exist")) - assert(e1.getMessage().contains("Couldn't find the reference column")) + checkError( +exception = e1, +errorClass = "FIELD_NOT_FOUND", +parameters = Map("fieldName" -> "`c`", "fields"
[spark] branch master updated: [SPARK-42207][INFRA] Update `build_and_test.yml` to use `Ubuntu 22.04`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 170a8bca357 [SPARK-42207][INFRA] Update `build_and_test.yml` to use `Ubuntu 22.04` 170a8bca357 is described below commit 170a8bca357e3057a1c37088960de31a261608b3 Author: Dongjoon Hyun AuthorDate: Thu Jan 26 21:19:06 2023 -0800 [SPARK-42207][INFRA] Update `build_and_test.yml` to use `Ubuntu 22.04` ### What changes were proposed in this pull request? This PR aims to update all jobs of `build_and_test.yml` (except `tpcds` job) to use `Ubuntu 22.04`. ### Why are the changes needed? `ubuntu-latest` now points to `ubuntu-22.04`. Since branch-3.4 is already created, we can upgrade this for Aapche Spark 3.5.0 safely. - https://github.com/actions/runner-images ![Screenshot 2023-01-26 at 7 54 33 PM](https://user-images.githubusercontent.com/9700541/215006304-2c081515-569f-4c4f-8e87-84f409d4e532.png) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. Closes #39762 from dongjoon-hyun/SPARK-42207. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .github/workflows/build_and_test.yml | 20 ++-- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index e4397554303..54b3d1d19d4 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -51,7 +51,7 @@ on: jobs: precondition: name: Check changes -runs-on: ubuntu-20.04 +runs-on: ubuntu-22.04 env: GITHUB_PREV_SHA: ${{ github.event.before }} outputs: @@ -127,8 +127,7 @@ jobs: name: "Build modules: ${{ matrix.modules }} ${{ matrix.comment }}" needs: precondition if: fromJson(needs.precondition.outputs.required).build == 'true' -# Ubuntu 20.04 is the latest LTS. The next LTS is 22.04. -runs-on: ubuntu-20.04 +runs-on: ubuntu-22.04 strategy: fail-fast: false matrix: @@ -319,7 +318,7 @@ jobs: # always run if pyspark == 'true', even infra-image is skip (such as non-master job) if: always() && fromJson(needs.precondition.outputs.required).pyspark == 'true' name: "Build modules: ${{ matrix.modules }}" -runs-on: ubuntu-20.04 +runs-on: ubuntu-22.04 container: image: ${{ needs.precondition.outputs.image_url }} strategy: @@ -428,7 +427,7 @@ jobs: # always run if sparkr == 'true', even infra-image is skip (such as non-master job) if: always() && fromJson(needs.precondition.outputs.required).sparkr == 'true' name: "Build modules: sparkr" -runs-on: ubuntu-20.04 +runs-on: ubuntu-22.04 container: image: ${{ needs.precondition.outputs.image_url }} env: @@ -500,7 +499,7 @@ jobs: # always run if lint == 'true', even infra-image is skip (such as non-master job) if: always() && fromJson(needs.precondition.outputs.required).lint == 'true' name: Linters, licenses, dependencies and documentation generation -runs-on: ubuntu-20.04 +runs-on: ubuntu-22.04 env: LC_ALL: C.UTF-8 LANG: C.UTF-8 @@ -636,7 +635,7 @@ jobs: java: - 11 - 17 -runs-on: ubuntu-20.04 +runs-on: ubuntu-22.04 steps: - name: Checkout Spark repository uses: actions/checkout@v3 @@ -686,7 +685,7 @@ jobs: needs: precondition if: fromJson(needs.precondition.outputs.required).scala-213 == 'true' name: Scala 2.13 build with SBT -runs-on: ubuntu-20.04 +runs-on: ubuntu-22.04 steps: - name: Checkout Spark repository uses: actions/checkout@v3 @@ -733,6 +732,7 @@ jobs: needs: precondition if: fromJson(needs.precondition.outputs.required).tpcds-1g == 'true' name: Run TPC-DS queries with SF=1 +# Pin to 'Ubuntu 20.04' due to 'databricks/tpcds-kit' compilation runs-on: ubuntu-20.04 env: SPARK_LOCAL_IP: localhost @@ -831,7 +831,7 @@ jobs: needs: precondition if: fromJson(needs.precondition.outputs.required).docker-integration-tests == 'true' name: Run Docker integration tests -runs-on: ubuntu-20.04 +runs-on: ubuntu-22.04 env: HADOOP_PROFILE: ${{ inputs.hadoop }} HIVE_PROFILE: hive2.3 @@ -896,7 +896,7 @@ jobs: needs: precondition if: fromJson(needs.precondition.outputs.required).k8s-integration-tests == 'true' name: Run Spark on Kubernetes Integration test -runs-on: ubuntu-20.04 +runs-on: ubuntu-22.04 steps: - name: Checkout Spark repository uses: actions/checkout@v3 - To unsubscribe, e-mail:
[spark] branch branch-3.4 updated: [SPARK-41757][CONNECT][PYTHON][FOLLOW-UP] Enable connect.functions.col doctest
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 985efd78e5e [SPARK-41757][CONNECT][PYTHON][FOLLOW-UP] Enable connect.functions.col doctest 985efd78e5e is described below commit 985efd78e5ee9937efcb78d79c5e2634789bf9c7 Author: Sandeep Singh AuthorDate: Fri Jan 27 11:31:15 2023 +0900 [SPARK-41757][CONNECT][PYTHON][FOLLOW-UP] Enable connect.functions.col doctest ### What changes were proposed in this pull request? To reenable the doc tests for `col` Function, this patch makes the string representation of the Column closer to the regular PySpark Column. This PR is a follow up to https://github.com/apache/spark/pull/39616 with enabling col doctests ### Why are the changes needed? Improve Coverage ### Does this PR introduce any user-facing change? No ### How was this patch tested? Reenabled doc tests. Closes #39761 from techaddict/SPARK-41757-2. Authored-by: Sandeep Singh Signed-off-by: Hyukjin Kwon (cherry picked from commit 2f49c1fed0e9962b2b29ea9017edccbd52a5ce8e) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/functions.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index ee7b45622b3..7c21f9280c2 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -2453,9 +2453,6 @@ def _test() -> None: del pyspark.sql.connect.functions.timestamp_seconds.__doc__ del pyspark.sql.connect.functions.unix_timestamp.__doc__ -# TODO(SPARK-41757): Fix String representation for Column class -del pyspark.sql.connect.functions.col.__doc__ - # TODO(SPARK-41812): Proper column names after join del pyspark.sql.connect.functions.count_distinct.__doc__ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41757][CONNECT][PYTHON][FOLLOW-UP] Enable connect.functions.col doctest
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2f49c1fed0e [SPARK-41757][CONNECT][PYTHON][FOLLOW-UP] Enable connect.functions.col doctest 2f49c1fed0e is described below commit 2f49c1fed0e9962b2b29ea9017edccbd52a5ce8e Author: Sandeep Singh AuthorDate: Fri Jan 27 11:31:15 2023 +0900 [SPARK-41757][CONNECT][PYTHON][FOLLOW-UP] Enable connect.functions.col doctest ### What changes were proposed in this pull request? To reenable the doc tests for `col` Function, this patch makes the string representation of the Column closer to the regular PySpark Column. This PR is a follow up to https://github.com/apache/spark/pull/39616 with enabling col doctests ### Why are the changes needed? Improve Coverage ### Does this PR introduce any user-facing change? No ### How was this patch tested? Reenabled doc tests. Closes #39761 from techaddict/SPARK-41757-2. Authored-by: Sandeep Singh Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/functions.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index ee7b45622b3..7c21f9280c2 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -2453,9 +2453,6 @@ def _test() -> None: del pyspark.sql.connect.functions.timestamp_seconds.__doc__ del pyspark.sql.connect.functions.unix_timestamp.__doc__ -# TODO(SPARK-41757): Fix String representation for Column class -del pyspark.sql.connect.functions.col.__doc__ - # TODO(SPARK-41812): Proper column names after join del pyspark.sql.connect.functions.count_distinct.__doc__ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-42201][BUILD] `build/sbt` should allow `SBT_OPTS` to override JVM memory setting
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new ab5109e24e1 [SPARK-42201][BUILD] `build/sbt` should allow `SBT_OPTS` to override JVM memory setting ab5109e24e1 is described below commit ab5109e24e142efd4e26a0dba2cd0e1e5a788b3d Author: Dongjoon Hyun AuthorDate: Thu Jan 26 16:39:17 2023 -0800 [SPARK-42201][BUILD] `build/sbt` should allow `SBT_OPTS` to override JVM memory setting ### What changes were proposed in this pull request? This PR aims to fix a bug which `build/sbt` doesn't allow JVM memory setting via `SBT_OPTS`. ### Why are the changes needed? `SBT_OPTS` is supposed to be used in this way in the community. https://github.com/apache/spark/blob/e30bb538e480940b1963eb14c3267662912d8584/appveyor.yml#L54 However, `SBT_OPTS` memory setting like the following is ignored because ` -Xms4096m -Xmx4096m -XX:ReservedCodeCacheSize=512m` is injected by default after `SBT_OPTS`. We should switch the order. ``` $ SBT_OPTS="-Xmx6g" build/sbt package ``` https://github.com/apache/spark/blob/e30bb538e480940b1963eb14c3267662912d8584/build/sbt-launch-lib.bash#L124 ### Does this PR introduce _any_ user-facing change? No. This is a dev-only change. ### How was this patch tested? Manually run the following. ``` $ SBT_OPTS="-Xmx6g" build/sbt package ``` While running the above command, check the JVM options. ``` $ ps aux | grep java dongjoon 36683 434.3 3.1 418465456 1031888 s001 R+1:11PM 0:19.86 /Users/dongjoon/.jenv/versions/temurin17/bin/java -Xms4096m -Xmx4096m -XX:ReservedCodeCacheSize=512m -Xmx6g -jar build/sbt-launch-1.8.2.jar package ``` Closes #39758 from dongjoon-hyun/SPARK-42201. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit 66ec1eb630a4682f5ad2ed2ee989ffcce9031608) Signed-off-by: Dongjoon Hyun --- build/sbt-launch-lib.bash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/sbt-launch-lib.bash b/build/sbt-launch-lib.bash index 8fb6672bddc..01ba6b929f9 100755 --- a/build/sbt-launch-lib.bash +++ b/build/sbt-launch-lib.bash @@ -183,8 +183,8 @@ run() { # run sbt execRunner "$java_cmd" \ -${SBT_OPTS:-$default_sbt_opts} \ $(get_mem_opts $sbt_mem) \ +${SBT_OPTS:-$default_sbt_opts} \ ${java_opts} \ ${java_args[@]} \ -jar "$sbt_jar" \ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-42201][BUILD] `build/sbt` should allow `SBT_OPTS` to override JVM memory setting
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 289e65061c1 [SPARK-42201][BUILD] `build/sbt` should allow `SBT_OPTS` to override JVM memory setting 289e65061c1 is described below commit 289e65061c1d7a07655d1754148f576f4b4538a5 Author: Dongjoon Hyun AuthorDate: Thu Jan 26 16:39:17 2023 -0800 [SPARK-42201][BUILD] `build/sbt` should allow `SBT_OPTS` to override JVM memory setting ### What changes were proposed in this pull request? This PR aims to fix a bug which `build/sbt` doesn't allow JVM memory setting via `SBT_OPTS`. ### Why are the changes needed? `SBT_OPTS` is supposed to be used in this way in the community. https://github.com/apache/spark/blob/e30bb538e480940b1963eb14c3267662912d8584/appveyor.yml#L54 However, `SBT_OPTS` memory setting like the following is ignored because ` -Xms4096m -Xmx4096m -XX:ReservedCodeCacheSize=512m` is injected by default after `SBT_OPTS`. We should switch the order. ``` $ SBT_OPTS="-Xmx6g" build/sbt package ``` https://github.com/apache/spark/blob/e30bb538e480940b1963eb14c3267662912d8584/build/sbt-launch-lib.bash#L124 ### Does this PR introduce _any_ user-facing change? No. This is a dev-only change. ### How was this patch tested? Manually run the following. ``` $ SBT_OPTS="-Xmx6g" build/sbt package ``` While running the above command, check the JVM options. ``` $ ps aux | grep java dongjoon 36683 434.3 3.1 418465456 1031888 s001 R+1:11PM 0:19.86 /Users/dongjoon/.jenv/versions/temurin17/bin/java -Xms4096m -Xmx4096m -XX:ReservedCodeCacheSize=512m -Xmx6g -jar build/sbt-launch-1.8.2.jar package ``` Closes #39758 from dongjoon-hyun/SPARK-42201. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit 66ec1eb630a4682f5ad2ed2ee989ffcce9031608) Signed-off-by: Dongjoon Hyun --- build/sbt-launch-lib.bash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/sbt-launch-lib.bash b/build/sbt-launch-lib.bash index 8fb6672bddc..01ba6b929f9 100755 --- a/build/sbt-launch-lib.bash +++ b/build/sbt-launch-lib.bash @@ -183,8 +183,8 @@ run() { # run sbt execRunner "$java_cmd" \ -${SBT_OPTS:-$default_sbt_opts} \ $(get_mem_opts $sbt_mem) \ +${SBT_OPTS:-$default_sbt_opts} \ ${java_opts} \ ${java_args[@]} \ -jar "$sbt_jar" \ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42201][BUILD] `build/sbt` should allow `SBT_OPTS` to override JVM memory setting
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new d47974c5f12 [SPARK-42201][BUILD] `build/sbt` should allow `SBT_OPTS` to override JVM memory setting d47974c5f12 is described below commit d47974c5f12322d576d9821b6c66d7db675938c2 Author: Dongjoon Hyun AuthorDate: Thu Jan 26 16:39:17 2023 -0800 [SPARK-42201][BUILD] `build/sbt` should allow `SBT_OPTS` to override JVM memory setting ### What changes were proposed in this pull request? This PR aims to fix a bug which `build/sbt` doesn't allow JVM memory setting via `SBT_OPTS`. ### Why are the changes needed? `SBT_OPTS` is supposed to be used in this way in the community. https://github.com/apache/spark/blob/e30bb538e480940b1963eb14c3267662912d8584/appveyor.yml#L54 However, `SBT_OPTS` memory setting like the following is ignored because ` -Xms4096m -Xmx4096m -XX:ReservedCodeCacheSize=512m` is injected by default after `SBT_OPTS`. We should switch the order. ``` $ SBT_OPTS="-Xmx6g" build/sbt package ``` https://github.com/apache/spark/blob/e30bb538e480940b1963eb14c3267662912d8584/build/sbt-launch-lib.bash#L124 ### Does this PR introduce _any_ user-facing change? No. This is a dev-only change. ### How was this patch tested? Manually run the following. ``` $ SBT_OPTS="-Xmx6g" build/sbt package ``` While running the above command, check the JVM options. ``` $ ps aux | grep java dongjoon 36683 434.3 3.1 418465456 1031888 s001 R+1:11PM 0:19.86 /Users/dongjoon/.jenv/versions/temurin17/bin/java -Xms4096m -Xmx4096m -XX:ReservedCodeCacheSize=512m -Xmx6g -jar build/sbt-launch-1.8.2.jar package ``` Closes #39758 from dongjoon-hyun/SPARK-42201. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit 66ec1eb630a4682f5ad2ed2ee989ffcce9031608) Signed-off-by: Dongjoon Hyun --- build/sbt-launch-lib.bash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/sbt-launch-lib.bash b/build/sbt-launch-lib.bash index 8fb6672bddc..01ba6b929f9 100755 --- a/build/sbt-launch-lib.bash +++ b/build/sbt-launch-lib.bash @@ -183,8 +183,8 @@ run() { # run sbt execRunner "$java_cmd" \ -${SBT_OPTS:-$default_sbt_opts} \ $(get_mem_opts $sbt_mem) \ +${SBT_OPTS:-$default_sbt_opts} \ ${java_opts} \ ${java_args[@]} \ -jar "$sbt_jar" \ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e30bb538e48 -> 66ec1eb630a)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from e30bb538e48 [SPARK-42173][CORE] RpcAddress equality can fail add 66ec1eb630a [SPARK-42201][BUILD] `build/sbt` should allow `SBT_OPTS` to override JVM memory setting No new revisions were added by this update. Summary of changes: build/sbt-launch-lib.bash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42173][CORE] RpcAddress equality can fail
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 61c53cd702e [SPARK-42173][CORE] RpcAddress equality can fail 61c53cd702e is described below commit 61c53cd702e51affa4436d77b6aaf7fc25cb7808 Author: Holden Karau AuthorDate: Thu Jan 26 09:12:48 2023 -0800 [SPARK-42173][CORE] RpcAddress equality can fail ### What changes were proposed in this pull request? When constructing an RpcAddress use InetUtils to get a consistently formatted IPv6 address if the env is for an IPv6 address. ### Why are the changes needed? We use RpcAddress equality for various tasks involving executors and a mismatch of equality can cause interesting errors. ### Does this PR introduce _any_ user-facing change? Log messages might change from sometimes having all the 0s in a v6 address present to not. ### How was this patch tested? Existing tests + new unit test showing that [::0:1] is formatted to [::1] Closes #39728 from holdenk/SPARK-42173-ipv6-sparse. Authored-by: Holden Karau Signed-off-by: Dongjoon Hyun (cherry picked from commit e30bb538e480940b1963eb14c3267662912d8584) Signed-off-by: Dongjoon Hyun --- .../main/scala/org/apache/spark/rpc/RpcAddress.scala | 15 ++- core/src/main/scala/org/apache/spark/util/Utils.scala | 18 ++ .../scala/org/apache/spark/rpc/RpcAddressSuite.scala | 10 ++ 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala index 675dc24206a..1fa22451e5d 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala @@ -23,9 +23,7 @@ import org.apache.spark.util.Utils /** * Address for an RPC environment, with hostname and port. */ -private[spark] case class RpcAddress(_host: String, port: Int) { - - lazy val host: String = Utils.addBracketsIfNeeded(_host) +private[spark] case class RpcAddress(host: String, port: Int) { def hostPort: String = host + ":" + port @@ -38,15 +36,22 @@ private[spark] case class RpcAddress(_host: String, port: Int) { private[spark] object RpcAddress { + def apply(host: String, port: Int): RpcAddress = { +new RpcAddress( + Utils.normalizeIpIfNeeded(host), + port +) + } + /** Return the [[RpcAddress]] represented by `uri`. */ def fromUrlString(uri: String): RpcAddress = { val uriObj = new java.net.URI(uri) -RpcAddress(uriObj.getHost, uriObj.getPort) +apply(uriObj.getHost, uriObj.getPort) } /** Returns the [[RpcAddress]] encoded in the form of "spark://host:port" */ def fromSparkURL(sparkUrl: String): RpcAddress = { val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) -RpcAddress(host, port) +apply(host, port) } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fb073595147..9bf45ed3776 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1109,6 +1109,24 @@ private[spark] object Utils extends Logging { } } + /** + * Normalize IPv6 IPs and no-op on all other hosts. + */ + private[spark] def normalizeIpIfNeeded(host: String): String = { +// Is this a v6 address. We ask users to add [] around v6 addresses as strs but +// there not always there. If it's just 0-9 and : and [] we treat it as a v6 address. +// This means some invalid addresses are treated as v6 addresses, but since they are +// not valid hostnames it doesn't matter. +// See https://www.rfc-editor.org/rfc/rfc1123#page-13 for context around valid hostnames. +val addressRe = """^\[{0,1}([0-9:]+?:[0-9]*)\]{0,1}$""".r +host match { + case addressRe(unbracketed) => + addBracketsIfNeeded(InetAddresses.toAddrString(InetAddresses.forString(unbracketed))) + case _ => +host +} + } + /** * Checks if the host contains only valid hostname/ip without port * NOTE: Incase of IPV6 ip it should be enclosed inside [] diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala index 0f7c9d71330..9fb08c79420 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala @@ -70,4 +70,14 @@ class RpcAddressSuite extends SparkFunSuite { val address = RpcAddress("::1", 1234) assert(address.toSparkURL == "spark://[::1]:1234") } + + test("SPARK-42173: Consistent Sparse Mapping") { +val
[spark] branch master updated: [SPARK-42173][CORE] RpcAddress equality can fail
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e30bb538e48 [SPARK-42173][CORE] RpcAddress equality can fail e30bb538e48 is described below commit e30bb538e480940b1963eb14c3267662912d8584 Author: Holden Karau AuthorDate: Thu Jan 26 09:12:48 2023 -0800 [SPARK-42173][CORE] RpcAddress equality can fail ### What changes were proposed in this pull request? When constructing an RpcAddress use InetUtils to get a consistently formatted IPv6 address if the env is for an IPv6 address. ### Why are the changes needed? We use RpcAddress equality for various tasks involving executors and a mismatch of equality can cause interesting errors. ### Does this PR introduce _any_ user-facing change? Log messages might change from sometimes having all the 0s in a v6 address present to not. ### How was this patch tested? Existing tests + new unit test showing that [::0:1] is formatted to [::1] Closes #39728 from holdenk/SPARK-42173-ipv6-sparse. Authored-by: Holden Karau Signed-off-by: Dongjoon Hyun --- .../main/scala/org/apache/spark/rpc/RpcAddress.scala | 15 ++- core/src/main/scala/org/apache/spark/util/Utils.scala | 18 ++ .../scala/org/apache/spark/rpc/RpcAddressSuite.scala | 10 ++ 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala index 675dc24206a..1fa22451e5d 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala @@ -23,9 +23,7 @@ import org.apache.spark.util.Utils /** * Address for an RPC environment, with hostname and port. */ -private[spark] case class RpcAddress(_host: String, port: Int) { - - lazy val host: String = Utils.addBracketsIfNeeded(_host) +private[spark] case class RpcAddress(host: String, port: Int) { def hostPort: String = host + ":" + port @@ -38,15 +36,22 @@ private[spark] case class RpcAddress(_host: String, port: Int) { private[spark] object RpcAddress { + def apply(host: String, port: Int): RpcAddress = { +new RpcAddress( + Utils.normalizeIpIfNeeded(host), + port +) + } + /** Return the [[RpcAddress]] represented by `uri`. */ def fromUrlString(uri: String): RpcAddress = { val uriObj = new java.net.URI(uri) -RpcAddress(uriObj.getHost, uriObj.getPort) +apply(uriObj.getHost, uriObj.getPort) } /** Returns the [[RpcAddress]] encoded in the form of "spark://host:port" */ def fromSparkURL(sparkUrl: String): RpcAddress = { val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) -RpcAddress(host, port) +apply(host, port) } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fb073595147..9bf45ed3776 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1109,6 +1109,24 @@ private[spark] object Utils extends Logging { } } + /** + * Normalize IPv6 IPs and no-op on all other hosts. + */ + private[spark] def normalizeIpIfNeeded(host: String): String = { +// Is this a v6 address. We ask users to add [] around v6 addresses as strs but +// there not always there. If it's just 0-9 and : and [] we treat it as a v6 address. +// This means some invalid addresses are treated as v6 addresses, but since they are +// not valid hostnames it doesn't matter. +// See https://www.rfc-editor.org/rfc/rfc1123#page-13 for context around valid hostnames. +val addressRe = """^\[{0,1}([0-9:]+?:[0-9]*)\]{0,1}$""".r +host match { + case addressRe(unbracketed) => + addBracketsIfNeeded(InetAddresses.toAddrString(InetAddresses.forString(unbracketed))) + case _ => +host +} + } + /** * Checks if the host contains only valid hostname/ip without port * NOTE: Incase of IPV6 ip it should be enclosed inside [] diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala index 0f7c9d71330..9fb08c79420 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala @@ -70,4 +70,14 @@ class RpcAddressSuite extends SparkFunSuite { val address = RpcAddress("::1", 1234) assert(address.toSparkURL == "spark://[::1]:1234") } + + test("SPARK-42173: Consistent Sparse Mapping") { +val address = RpcAddress("::0:1", 1234) +assert(address.toSparkURL == "spark://[::1]:1234") + } + +
[spark] branch master updated: [SPARK-42195][INFRA] Add Daily Scala 2.13 Github Action Job for branch-3.4
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 97349986daa [SPARK-42195][INFRA] Add Daily Scala 2.13 Github Action Job for branch-3.4 97349986daa is described below commit 97349986daa5aeacaa320f7187681a72c4830b13 Author: yangjie01 AuthorDate: Thu Jan 26 20:58:24 2023 +0900 [SPARK-42195][INFRA] Add Daily Scala 2.13 Github Action Job for branch-3.4 ### What changes were proposed in this pull request? Add Github action test job for branch-3.4 ### Why are the changes needed? Daily test for branch-3.4 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test in my fork Closes #39749 from LuciferYang/branch34-ga. Authored-by: yangjie01 Signed-off-by: Hyukjin Kwon --- .github/workflows/build_branch34.yml | 49 1 file changed, 49 insertions(+) diff --git a/.github/workflows/build_branch34.yml b/.github/workflows/build_branch34.yml new file mode 100644 index 000..e7d66ad9912 --- /dev/null +++ b/.github/workflows/build_branch34.yml @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: "Build (branch-3.4, Scala 2.13, Hadoop 3, JDK 8)" + +on: + schedule: +- cron: '0 9 * * *' + +jobs: + run-build: +permissions: + packages: write +name: Run +uses: ./.github/workflows/build_and_test.yml +if: github.repository == 'apache/spark' +with: + java: 8 + branch: branch-3.4 + hadoop: hadoop3 + envs: >- +{ + "SCALA_PROFILE": "scala2.13" +} + jobs: >- +{ + "build": "true", + "pyspark": "true", + "sparkr": "true", + "tpcds-1g": "true", + "docker-integration-tests": "true", + "lint" : "true" +} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42197][CONNECT] Reuses JVM initialization, and separate configuration groups to set in remote local mode
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 66e13120698 [SPARK-42197][CONNECT] Reuses JVM initialization, and separate configuration groups to set in remote local mode 66e13120698 is described below commit 66e131206987589b2b961b9c5f3bc57154b16cb3 Author: Hyukjin Kwon AuthorDate: Thu Jan 26 20:57:00 2023 +0900 [SPARK-42197][CONNECT] Reuses JVM initialization, and separate configuration groups to set in remote local mode ### What changes were proposed in this pull request? This PR proposes to refactor `_start_connect_server` by: 1. Reusing `SparkContext._ensure_initialized` 2. Separating the configuration groups to be set by default or overwrite 3. Piggyback a fix of removing `connect_not_compiled_message` which is useless in fact because Spark Connect jars are always compiled together by default `sbt package` or `mvn package` ### Why are the changes needed? To make the codes easier to read. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unittests should cover them. Closes #39751 from HyukjinKwon/cleanup-conf. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit b3f5f81f43fe2c54c575d4b013ee1f91c20542b3) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/session.py | 62 -- python/pyspark/testing/connectutils.py | 17 -- 2 files changed, 29 insertions(+), 50 deletions(-) diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index 1f11a30ade2..7769917e412 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -42,7 +42,6 @@ from pandas.api.types import ( # type: ignore[attr-defined] ) from pyspark import SparkContext, SparkConf, __version__ -from pyspark.java_gateway import launch_gateway from pyspark.sql.connect.client import SparkConnectClient from pyspark.sql.connect.dataframe import DataFrame from pyspark.sql.connect.plan import SQL, Range, LocalRelation @@ -456,7 +455,7 @@ class SparkSession: @staticmethod def _start_connect_server(master: str, opts: Dict[str, Any]) -> None: """ -Starts the Spark Connect server given the master. +Starts the Spark Connect server given the master (thread-unsafe). At the high level, there are two cases. The first case is development case, e.g., you locally build Apache Spark, and run ``SparkSession.builder.remote("local")``: @@ -470,7 +469,7 @@ class SparkSession: 3. Starts a JVM (without Spark Context) first, and adds the Spark Connect server jars into the current class loader. Otherwise, Spark Context with ``spark.plugins`` cannot be initialized because the JVM is already running without the jars in - the class path before executing this Python process for driver side (in case of + the classpath before executing this Python process for driver side (in case of PySpark application submission). 4. Starts a regular Spark session that automatically starts a Spark Connect server @@ -492,23 +491,30 @@ class SparkSession: """ session = PySparkSession._instantiatedSession if session is None or session._sc._jsc is None: -conf = SparkConf() -for k, v in opts.items(): -conf.set(k, v) - -# Do not need to worry about the existing configurations because -# Py4J gateway is not created yet, and `conf` instance is empty here. -# The configurations belows are manually manipulated later to respect -# the user-specified configuration first right after Py4J gateway creation. -conf.set("spark.master", master) -conf.set("spark.plugins", "org.apache.spark.sql.connect.SparkConnectPlugin") -conf.set("spark.local.connect", "1") + +# Configurations to be overwritten +overwrite_conf = opts +overwrite_conf["spark.master"] = master +overwrite_conf["spark.local.connect"] = "1" + +# Configurations to be set if unset. +default_conf = {"spark.plugins": "org.apache.spark.sql.connect.SparkConnectPlugin"} + +def create_conf(**kwargs: Any) -> SparkConf: +conf = SparkConf(**kwargs) +for k, v in overwrite_conf.items(): +conf.set(k, v) +for k, v in default_conf.items(): +if not conf.contains(k): +conf.set(k, v) +return conf # Check if we're
[spark] branch master updated (dbd667e7bc5 -> b3f5f81f43f)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from dbd667e7bc5 [SPARK-42126][PYTHON][CONNECT] Accept return type in DDL strings for Python Scalar UDFs in Spark Connect add b3f5f81f43f [SPARK-42197][CONNECT] Reuses JVM initialization, and separate configuration groups to set in remote local mode No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/session.py | 62 -- python/pyspark/testing/connectutils.py | 17 -- 2 files changed, 29 insertions(+), 50 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42126][PYTHON][CONNECT] Accept return type in DDL strings for Python Scalar UDFs in Spark Connect
This is an automated email from the ASF dual-hosted git repository. xinrong pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 79e8df84309 [SPARK-42126][PYTHON][CONNECT] Accept return type in DDL strings for Python Scalar UDFs in Spark Connect 79e8df84309 is described below commit 79e8df84309ed54d0c3fc7face414e6c440daa81 Author: Xinrong Meng AuthorDate: Thu Jan 26 19:15:13 2023 +0800 [SPARK-42126][PYTHON][CONNECT] Accept return type in DDL strings for Python Scalar UDFs in Spark Connect ### What changes were proposed in this pull request? Accept return type in DDL strings for Python Scalar UDFs in Spark Connect. The approach proposed in this PR is a workaround to parse DataType from DDL strings. We should think of a more elegant alternative to replace that later. ### Why are the changes needed? To reach parity with vanilla PySpark. ### Does this PR introduce _any_ user-facing change? Yes. Return type in DDL strings are accepted now. ### How was this patch tested? Unit tests. Closes #39739 from xinrong-meng/datatype_ddl. Authored-by: Xinrong Meng Signed-off-by: Xinrong Meng (cherry picked from commit dbd667e7bc5fee443b8a39ca56d4cf3dd1bb2bae) Signed-off-by: Xinrong Meng --- python/pyspark/sql/connect/udf.py| 20 +++- .../sql/tests/connect/test_connect_function.py | 8 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/udf.py b/python/pyspark/sql/connect/udf.py index 4a465084838..d0eb2fdfe6c 100644 --- a/python/pyspark/sql/connect/udf.py +++ b/python/pyspark/sql/connect/udf.py @@ -28,6 +28,7 @@ from pyspark.sql.connect.expressions import ( ) from pyspark.sql.connect.column import Column from pyspark.sql.types import DataType, StringType +from pyspark.sql.utils import is_remote if TYPE_CHECKING: @@ -90,7 +91,24 @@ class UserDefinedFunction: ) self.func = func -self._returnType = returnType + +if isinstance(returnType, str): +# Currently we don't have a way to have a current Spark session in Spark Connect, and +# pyspark.sql.SparkSession has a centralized logic to control the session creation. +# So uses pyspark.sql.SparkSession for now. Should replace this to using the current +# Spark session for Spark Connect in the future. +from pyspark.sql import SparkSession as PySparkSession + +assert is_remote() +return_type_schema = ( # a workaround to parse the DataType from DDL strings +PySparkSession.builder.getOrCreate() +.createDataFrame(data=[], schema=returnType) +.schema +) +assert len(return_type_schema.fields) == 1, "returnType should be singular" +self._returnType = return_type_schema.fields[0].dataType +else: +self._returnType = returnType self._name = name or ( func.__name__ if hasattr(func, "__name__") else func.__class__.__name__ ) diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py b/python/pyspark/sql/tests/connect/test_connect_function.py index 7042a7e8e6f..50fadb49ed4 100644 --- a/python/pyspark/sql/tests/connect/test_connect_function.py +++ b/python/pyspark/sql/tests/connect/test_connect_function.py @@ -2299,6 +2299,14 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, PandasOnSparkTestUtils, S cdf.withColumn("A", CF.udf(lambda x: x + 1)(cdf.a)).toPandas(), sdf.withColumn("A", SF.udf(lambda x: x + 1)(sdf.a)).toPandas(), ) +self.assert_eq( # returnType as DDL strings +cdf.withColumn("C", CF.udf(lambda x: len(x), "int")(cdf.c)).toPandas(), +sdf.withColumn("C", SF.udf(lambda x: len(x), "int")(sdf.c)).toPandas(), +) +self.assert_eq( # returnType as DataType +cdf.withColumn("C", CF.udf(lambda x: len(x), IntegerType())(cdf.c)).toPandas(), +sdf.withColumn("C", SF.udf(lambda x: len(x), IntegerType())(sdf.c)).toPandas(), +) # as a decorator @CF.udf(StringType()) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-42126][PYTHON][CONNECT] Accept return type in DDL strings for Python Scalar UDFs in Spark Connect
This is an automated email from the ASF dual-hosted git repository. xinrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new dbd667e7bc5 [SPARK-42126][PYTHON][CONNECT] Accept return type in DDL strings for Python Scalar UDFs in Spark Connect dbd667e7bc5 is described below commit dbd667e7bc5fee443b8a39ca56d4cf3dd1bb2bae Author: Xinrong Meng AuthorDate: Thu Jan 26 19:15:13 2023 +0800 [SPARK-42126][PYTHON][CONNECT] Accept return type in DDL strings for Python Scalar UDFs in Spark Connect ### What changes were proposed in this pull request? Accept return type in DDL strings for Python Scalar UDFs in Spark Connect. The approach proposed in this PR is a workaround to parse DataType from DDL strings. We should think of a more elegant alternative to replace that later. ### Why are the changes needed? To reach parity with vanilla PySpark. ### Does this PR introduce _any_ user-facing change? Yes. Return type in DDL strings are accepted now. ### How was this patch tested? Unit tests. Closes #39739 from xinrong-meng/datatype_ddl. Authored-by: Xinrong Meng Signed-off-by: Xinrong Meng --- python/pyspark/sql/connect/udf.py| 20 +++- .../sql/tests/connect/test_connect_function.py | 8 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/udf.py b/python/pyspark/sql/connect/udf.py index 4a465084838..d0eb2fdfe6c 100644 --- a/python/pyspark/sql/connect/udf.py +++ b/python/pyspark/sql/connect/udf.py @@ -28,6 +28,7 @@ from pyspark.sql.connect.expressions import ( ) from pyspark.sql.connect.column import Column from pyspark.sql.types import DataType, StringType +from pyspark.sql.utils import is_remote if TYPE_CHECKING: @@ -90,7 +91,24 @@ class UserDefinedFunction: ) self.func = func -self._returnType = returnType + +if isinstance(returnType, str): +# Currently we don't have a way to have a current Spark session in Spark Connect, and +# pyspark.sql.SparkSession has a centralized logic to control the session creation. +# So uses pyspark.sql.SparkSession for now. Should replace this to using the current +# Spark session for Spark Connect in the future. +from pyspark.sql import SparkSession as PySparkSession + +assert is_remote() +return_type_schema = ( # a workaround to parse the DataType from DDL strings +PySparkSession.builder.getOrCreate() +.createDataFrame(data=[], schema=returnType) +.schema +) +assert len(return_type_schema.fields) == 1, "returnType should be singular" +self._returnType = return_type_schema.fields[0].dataType +else: +self._returnType = returnType self._name = name or ( func.__name__ if hasattr(func, "__name__") else func.__class__.__name__ ) diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py b/python/pyspark/sql/tests/connect/test_connect_function.py index 7042a7e8e6f..50fadb49ed4 100644 --- a/python/pyspark/sql/tests/connect/test_connect_function.py +++ b/python/pyspark/sql/tests/connect/test_connect_function.py @@ -2299,6 +2299,14 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, PandasOnSparkTestUtils, S cdf.withColumn("A", CF.udf(lambda x: x + 1)(cdf.a)).toPandas(), sdf.withColumn("A", SF.udf(lambda x: x + 1)(sdf.a)).toPandas(), ) +self.assert_eq( # returnType as DDL strings +cdf.withColumn("C", CF.udf(lambda x: len(x), "int")(cdf.c)).toPandas(), +sdf.withColumn("C", SF.udf(lambda x: len(x), "int")(sdf.c)).toPandas(), +) +self.assert_eq( # returnType as DataType +cdf.withColumn("C", CF.udf(lambda x: len(x), IntegerType())(cdf.c)).toPandas(), +sdf.withColumn("C", SF.udf(lambda x: len(x), IntegerType())(sdf.c)).toPandas(), +) # as a decorator @CF.udf(StringType()) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org