(spark) branch master updated: [SPARK-46754][SQL][AVRO] Fix compression code resolution in avro table definition and write options
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 461026c62ba1 [SPARK-46754][SQL][AVRO] Fix compression code resolution in avro table definition and write options 461026c62ba1 is described below commit 461026c62ba19e4248d529c4971d3ba74fba2a2d Author: Kent Yao AuthorDate: Thu Jan 18 15:53:04 2024 +0800 [SPARK-46754][SQL][AVRO] Fix compression code resolution in avro table definition and write options ### What changes were proposed in this pull request? This PR fixes the case sensitivity of 'compression' in the avro table definition and the write options, in order to make it consistent with other file sources. Also, the current logic for dealing invalid codec names is unreachable. ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? yes, 'compression'='Xz', 'compression'='XZ' now works as well as 'compression'='xz' ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #44780 from yaooqinn/SPARK-46754. Authored-by: Kent Yao Signed-off-by: Kent Yao --- .../org/apache/spark/sql/avro/AvroUtils.scala | 37 -- .../org/apache/spark/sql/avro/AvroCodecSuite.scala | 30 ++ 2 files changed, 43 insertions(+), 24 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index 25e6aec4d84a..3910cf540628 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.mapreduce.Job -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkIllegalArgumentException} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.AvroCompressionCodec._ @@ -102,22 +102,25 @@ private[sql] object AvroUtils extends Logging { AvroJob.setOutputKeySchema(job, outputAvroSchema) -if (parsedOptions.compression == UNCOMPRESSED.lowerCaseName()) { - job.getConfiguration.setBoolean("mapred.output.compress", false) -} else { - job.getConfiguration.setBoolean("mapred.output.compress", true) - logInfo(s"Compressing Avro output using the ${parsedOptions.compression} codec") - val codec = AvroCompressionCodec.fromString(parsedOptions.compression) match { -case DEFLATE => - val deflateLevel = sqlConf.avroDeflateLevel - logInfo(s"Avro compression level $deflateLevel will be used for " + -s"${DEFLATE.getCodecName()} codec.") - job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel) - DEFLATE.getCodecName() -case codec @ (SNAPPY | BZIP2 | XZ | ZSTANDARD) => codec.getCodecName() -case unknown => throw new IllegalArgumentException(s"Invalid compression codec: $unknown") - } - job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, codec) +parsedOptions.compression.toLowerCase(Locale.ROOT) match { + case codecName if AvroCompressionCodec.values().exists(c => c.lowerCaseName() == codecName) => +AvroCompressionCodec.fromString(codecName) match { + case UNCOMPRESSED => +job.getConfiguration.setBoolean("mapred.output.compress", false) + case compressed => +job.getConfiguration.setBoolean("mapred.output.compress", true) +job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, compressed.getCodecName) +if (compressed == DEFLATE) { + val deflateLevel = sqlConf.avroDeflateLevel + logInfo(s"Compressing Avro output using the $codecName codec at level $deflateLevel") + job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel) +} else { + logInfo(s"Compressing Avro output using the $codecName codec") +} +} + case unknown => +throw new SparkIllegalArgumentException( + "CODEC_SHORT_NAME_NOT_FOUND", Map("codecName" -> unknown)) } new AvroOutputWriterFactory(dataSchema, diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala index ec3753b84a55..933b3f989ef7 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala +++
(spark) branch master updated: [SPARK-46618][SQL] Improve error messages for DATA_SOURCE_NOT_FOUND error
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 72e3f06c7d94 [SPARK-46618][SQL] Improve error messages for DATA_SOURCE_NOT_FOUND error 72e3f06c7d94 is described below commit 72e3f06c7d94a69f9ff218ccb560ffa5c0f14884 Author: allisonwang-db AuthorDate: Thu Jan 18 10:47:50 2024 +0300 [SPARK-46618][SQL] Improve error messages for DATA_SOURCE_NOT_FOUND error ### What changes were proposed in this pull request? This PR improves the error messages for the `DATA_SOURCE_NOT_FOUND` error. ### Why are the changes needed? To make the error messages more user-friendly and update to date. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #44620 from allisonwang-db/spark-46618-not-found-err. Authored-by: allisonwang-db Signed-off-by: Max Gekk --- common/utils/src/main/resources/error/error-classes.json | 2 +- docs/sql-error-conditions.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 2fa86de3daa3..601110309ddb 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -895,7 +895,7 @@ }, "DATA_SOURCE_NOT_FOUND" : { "message" : [ - "Failed to find the data source: . Please find packages at `https://spark.apache.org/third-party-projects.html`.; + "Failed to find the data source: . Make sure the provider name is correct and the package is properly registered and compatible with your Spark version." ], "sqlState" : "42K02" }, diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 35b50d6c6e4f..008346a018e3 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -496,7 +496,7 @@ Data source '``' not found. Please make sure the data source is regist [SQLSTATE: 42K02](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) -Failed to find the data source: ``. Please find packages at `https://spark.apache.org/third-party-projects.html`. +Failed to find the data source: ``. Make sure the provider name is correct and the package is properly registered and compatible with your Spark version. ### DATA_SOURCE_TABLE_SCHEMA_MISMATCH - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46757][INFRA] Reduce the number of layers of testing dockerfile
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 92035e3a983b [SPARK-46757][INFRA] Reduce the number of layers of testing dockerfile 92035e3a983b is described below commit 92035e3a983b4f1105dcf040fd041ac990f5a83d Author: Ruifeng Zheng AuthorDate: Thu Jan 18 15:41:17 2024 +0800 [SPARK-46757][INFRA] Reduce the number of layers of testing dockerfile ### What changes were proposed in this pull request? Reduce the number of layers of testing dockerfile ### Why are the changes needed? to address https://github.com/apache/spark/pull/44768#pullrequestreview-1827558821 ### Does this PR introduce _any_ user-facing change? no, infra-only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #44781 from zhengruifeng/infra_docker_layers. Authored-by: Ruifeng Zheng Signed-off-by: Kent Yao --- dev/infra/Dockerfile | 50 +++--- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/dev/infra/Dockerfile b/dev/infra/Dockerfile index 54f62bbc8202..976f94251d7a 100644 --- a/dev/infra/Dockerfile +++ b/dev/infra/Dockerfile @@ -69,15 +69,14 @@ RUN gpg --keyserver hkps://keyserver.ubuntu.com --recv-key E298A3A825C0D65DFD57C RUN gpg -a --export E084DAB9 | apt-key add - RUN add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/' +# See more in SPARK-39959, roxygen2 < 7.2.1 RUN Rscript -e "install.packages(c('devtools', 'knitr', 'markdown', \ 'rmarkdown', 'testthat', 'devtools', 'e1071', 'survival', 'arrow', \ -'ggplot2', 'mvtnorm', 'statmod', 'xml2'), repos='https://cloud.r-project.org/')" - -# See more in SPARK-39959, roxygen2 < 7.2.1 -RUN Rscript -e "devtools::install_version('roxygen2', version='7.2.0', repos='https://cloud.r-project.org')" -RUN Rscript -e "devtools::install_version('lintr', version='2.0.1', repos='https://cloud.r-project.org')" -RUN Rscript -e "devtools::install_version('pkgdown', version='2.0.1', repos='https://cloud.r-project.org')" -RUN Rscript -e "devtools::install_version('preferably', version='0.4', repos='https://cloud.r-project.org')" +'ggplot2', 'mvtnorm', 'statmod', 'xml2'), repos='https://cloud.r-project.org/')" && \ +Rscript -e "devtools::install_version('roxygen2', version='7.2.0', repos='https://cloud.r-project.org')" && \ +Rscript -e "devtools::install_version('lintr', version='2.0.1', repos='https://cloud.r-project.org')" && \ +Rscript -e "devtools::install_version('pkgdown', version='2.0.1', repos='https://cloud.r-project.org')" && \ +Rscript -e "devtools::install_version('preferably', version='0.4', repos='https://cloud.r-project.org')" # See more in SPARK-39735 ENV R_LIBS_SITE "/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library" @@ -99,12 +98,11 @@ ARG BASIC_PIP_PKGS="numpy pyarrow>=14.0.0 six==1.16.0 pandas<=2.1.4 scipy plotly # Python deps for Spark Connect ARG CONNECT_PIP_PKGS="grpcio==1.59.3 grpcio-status==1.59.3 protobuf==4.25.1 googleapis-common-protos==1.56.4" - -RUN python3.9 -m pip install $BASIC_PIP_PKGS unittest-xml-reporting $CONNECT_PIP_PKGS # Add torch as a testing dependency for TorchDistributor and DeepspeedTorchDistributor -RUN python3.9 -m pip install 'torch<=2.0.1' torchvision --index-url https://download.pytorch.org/whl/cpu -RUN python3.9 -m pip install deepspeed torcheval -RUN python3.9 -m pip cache purge +RUN python3.9 -m pip install $BASIC_PIP_PKGS unittest-xml-reporting $CONNECT_PIP_PKGS && \ +python3.9 -m pip install 'torch<=2.0.1' torchvision --index-url https://download.pytorch.org/whl/cpu && \ +python3.9 -m pip install deepspeed torcheval && \ +python3.9 -m pip cache purge # Install Python 3.10 at the last stage to avoid breaking Python 3.9 RUN add-apt-repository ppa:deadsnakes/ppa @@ -112,10 +110,10 @@ RUN apt-get update && apt-get install -y \ python3.10 python3.10-distutils \ && rm -rf /var/lib/apt/lists/* RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.10 -RUN python3.10 -m pip install $BASIC_PIP_PKGS unittest-xml-reporting $CONNECT_PIP_PKGS -RUN python3.10 -m pip install 'torch<=2.0.1' torchvision --index-url https://download.pytorch.org/whl/cpu -RUN python3.10 -m pip install deepspeed torcheval -RUN python3.10 -m pip cache purge +RUN python3.10 -m pip install $BASIC_PIP_PKGS unittest-xml-reporting $CONNECT_PIP_PKGS && \ +python3.10 -m pip install 'torch<=2.0.1' torchvision --index-url https://download.pytorch.org/whl/cpu && \ +python3.10 -m pip install deepspeed torcheval && \ +python3.10 -m pip cache purge # Install Python 3.11 at the last stage to avoid breaking the existing Python
(spark) branch master updated: [SPARK-46745][INFRA] Purge pip cache in dockerfile
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new da0c31c629ca [SPARK-46745][INFRA] Purge pip cache in dockerfile da0c31c629ca is described below commit da0c31c629ca378704cb7f97c9983be9b6ca96c4 Author: Ruifeng Zheng AuthorDate: Wed Jan 17 20:11:11 2024 -0800 [SPARK-46745][INFRA] Purge pip cache in dockerfile ### What changes were proposed in this pull request? Purge pip cache in dockerfile ### Why are the changes needed? to save 4~5G disk space: before https://github.com/zhengruifeng/spark/actions/runs/7541725028/job/20530432798 ``` #45 [39/39] RUN df -h #45 0.090 Filesystem Size Used Avail Use% Mounted on #45 0.090 overlay 84G 70G 15G 83% / #45 0.090 tmpfs64M 0 64M 0% /dev #45 0.090 shm 64M 0 64M 0% /dev/shm #45 0.090 /dev/root84G 70G 15G 83% /etc/resolv.conf #45 0.090 tmpfs 7.9G 0 7.9G 0% /proc/acpi #45 0.090 tmpfs 7.9G 0 7.9G 0% /sys/firmware #45 0.090 tmpfs 7.9G 0 7.9G 0% /proc/scsi #45 DONE 2.0s ``` after https://github.com/zhengruifeng/spark/actions/runs/7549204209/job/20552796796 ``` #48 [42/43] RUN python3.12 -m pip cache purge #48 0.670 Files removed: 392 #48 DONE 0.7s #49 [43/43] RUN df -h #49 0.075 Filesystem Size Used Avail Use% Mounted on #49 0.075 overlay 84G 65G 19G 79% / #49 0.075 tmpfs64M 0 64M 0% /dev #49 0.075 shm 64M 0 64M 0% /dev/shm #49 0.075 /dev/root84G 65G 19G 79% /etc/resolv.conf #49 0.075 tmpfs 7.9G 0 7.9G 0% /proc/acpi #49 0.075 tmpfs 7.9G 0 7.9G 0% /sys/firmware #49 0.075 tmpfs 7.9G 0 7.9G 0% /proc/scsi ``` ### Does this PR introduce _any_ user-facing change? no, infra-only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #44768 from zhengruifeng/infra_docker_cleanup. Authored-by: Ruifeng Zheng Signed-off-by: Dongjoon Hyun --- .github/workflows/build_and_test.yml | 4 dev/infra/Dockerfile | 8 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 493ed0c413a9..51bbdb9fcb35 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -417,10 +417,6 @@ jobs: - name: Free up disk space shell: 'script -q -e -c "bash {0}"' run: | -if [[ "$MODULES_TO_TEST" != *"pyspark-ml"* ]] && [[ "$BRANCH" != "branch-3.5" ]]; then - # uninstall libraries dedicated for ML testing - python3.9 -m pip uninstall -y torch torchvision torcheval torchtnt tensorboard mlflow deepspeed -fi if [ -f ./dev/free_disk_space_container ]; then ./dev/free_disk_space_container fi diff --git a/dev/infra/Dockerfile b/dev/infra/Dockerfile index 78814ace9b2e..54f62bbc8202 100644 --- a/dev/infra/Dockerfile +++ b/dev/infra/Dockerfile @@ -19,7 +19,7 @@ # See also in https://hub.docker.com/_/ubuntu FROM ubuntu:focal-20221019 -ENV FULL_REFRESH_DATE 20231117 +ENV FULL_REFRESH_DATE 20240117 ENV DEBIAN_FRONTEND noninteractive ENV DEBCONF_NONINTERACTIVE_SEEN true @@ -104,6 +104,7 @@ RUN python3.9 -m pip install $BASIC_PIP_PKGS unittest-xml-reporting $CONNECT_PIP # Add torch as a testing dependency for TorchDistributor and DeepspeedTorchDistributor RUN python3.9 -m pip install 'torch<=2.0.1' torchvision --index-url https://download.pytorch.org/whl/cpu RUN python3.9 -m pip install deepspeed torcheval +RUN python3.9 -m pip cache purge # Install Python 3.10 at the last stage to avoid breaking Python 3.9 RUN add-apt-repository ppa:deadsnakes/ppa @@ -114,6 +115,7 @@ RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.10 RUN python3.10 -m pip install $BASIC_PIP_PKGS unittest-xml-reporting $CONNECT_PIP_PKGS RUN python3.10 -m pip install 'torch<=2.0.1' torchvision --index-url https://download.pytorch.org/whl/cpu RUN python3.10 -m pip install deepspeed torcheval +RUN python3.10 -m pip cache purge # Install Python 3.11 at the last stage to avoid breaking the existing Python installations RUN add-apt-repository ppa:deadsnakes/ppa @@ -124,6 +126,7 @@ RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.11 RUN python3.11 -m pip install $BASIC_PIP_PKGS unittest-xml-reporting $CONNECT_PIP_PKGS RUN python3.11 -m pip install 'torch<=2.0.1'
(spark) branch master updated: [SPARK-45593][BUILD] Building a runnable distribution from master code running spark-sql raise error
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 44d2c86e71fc [SPARK-45593][BUILD] Building a runnable distribution from master code running spark-sql raise error 44d2c86e71fc is described below commit 44d2c86e71fca7044e6d5d9e9222eecff17c360c Author: yikaifei AuthorDate: Thu Jan 18 11:32:01 2024 +0800 [SPARK-45593][BUILD] Building a runnable distribution from master code running spark-sql raise error ### What changes were proposed in this pull request? Fix a build issue, when building a runnable distribution from master code running spark-sql raise error: ``` Caused by: java.lang.ClassNotFoundException: org.sparkproject.guava.util.concurrent.internal.InternalFutureFailureAccess at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ... 58 more ``` the problem is due to a gauva dependency in spark-connect-common POM that **conflicts** with the shade plugin of the parent pom. - the spark-connect-common contains `connect.guava.version` version of guava, and it is relocation as `${spark.shade.packageName}.guava` not the `${spark.shade.packageName}.connect.guava`; - The spark-network-common also contains guava related classes, it has also been relocation is `${spark.shade.packageName}.guava`, but guava version `${guava.version}`; - As a result, in the presence of different versions of the classpath org.sparkproject.guava.xx; In addition, after investigation, it seems that module spark-connect-common is not related to guava, so we can remove guava dependency from spark-connect-common. ### Why are the changes needed? Building a runnable distribution from master code is not runnable. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? I ran the build command output a runnable distribution package manually for the tests; Build command: ``` ./dev/make-distribution.sh --name ui --pip --tgz -Phive -Phive-thriftserver -Pyarn -Pconnect ``` Test result: https://github.com/apache/spark/assets/51110188/aefbc433-ea5c-4287-8ebd-367806043ac8;> I also checked the `org.sparkproject.guava.cache.LocalCache` from jars dir; Before: ``` ➜ jars grep -lr 'org.sparkproject.guava.cache.LocalCache' ./ .//spark-connect_2.13-4.0.0-SNAPSHOT.jar .//spark-network-common_2.13-4.0.0-SNAPSHOT.jar .//spark-connect-common_2.13-4.0.0-SNAPSHOT.jar ``` Now: ``` ➜ jars grep -lr 'org.sparkproject.guava.cache.LocalCache' ./ .//spark-network-common_2.13-4.0.0-SNAPSHOT.jar ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #43436 from Yikf/SPARK-45593. Authored-by: yikaifei Signed-off-by: yangjie01 --- assembly/pom.xml | 6 ++ connector/connect/client/jvm/pom.xml | 8 +--- connector/connect/common/pom.xml | 34 ++ connector/connect/server/pom.xml | 25 - 4 files changed, 41 insertions(+), 32 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 77ff87c17f52..cd8c3fca9d23 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -149,6 +149,12 @@ org.apache.spark spark-connect_${scala.binary.version} ${project.version} + + + org.apache.spark + spark-connect-common_${scala.binary.version} + + org.apache.spark diff --git a/connector/connect/client/jvm/pom.xml b/connector/connect/client/jvm/pom.xml index 8057a33df178..9bedebf523a7 100644 --- a/connector/connect/client/jvm/pom.xml +++ b/connector/connect/client/jvm/pom.xml @@ -51,15 +51,9 @@ ${project.version} - - com.google.guava - guava - ${connect.guava.version} - compile - com.google.protobuf protobuf-java diff --git a/connector/connect/common/pom.xml b/connector/connect/common/pom.xml index a374646f8f29..336d83e04c15 100644 --- a/connector/connect/common/pom.xml +++ b/connector/connect/common/pom.xml @@ -47,6 +47,11 @@ com.google.protobuf protobuf-java + com.google.guava guava @@ -145,6 +150,35 @@ + +org.apache.maven.plugins +
(spark) branch master updated: [SPARK-46644] Change add and merge in SQLMetric to use isZero
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 89727bfa7529 [SPARK-46644] Change add and merge in SQLMetric to use isZero 89727bfa7529 is described below commit 89727bfa7529aa28d85e5d9a58b21d8aa035a23f Author: Davin Tjong AuthorDate: Thu Jan 18 11:01:37 2024 +0800 [SPARK-46644] Change add and merge in SQLMetric to use isZero ### What changes were proposed in this pull request? A previous refactor mistakenly used `isValid` for add. Since `defaultValidValue` was always `0`, this didn't cause any correctness issues. What we really want to do for add (and merge) is `if (isZero) _value = 0`. Also removing `isValid` since its redundant, if `defaultValidValue` is always `0`. ### Why are the changes needed? There are no correctness errors, but this is confusing and error-prone. A negative `defaultValidValue` was intended to allow creating optional metrics. With the previous behavior this would incorrectly add the sentinel value. `defaultValidValue` is supposed to determine what value is exposed to the user. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Running the tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44649 from davintjong-db/sql-metric-add-fix. Authored-by: Davin Tjong Signed-off-by: Wenchen Fan --- .../spark/sql/execution/metric/SQLMetrics.scala| 50 -- .../sql/execution/metric/SQLMetricsSuite.scala | 11 +++-- 2 files changed, 32 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 8cd28f9a06a4..a246b47fe655 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -39,21 +39,21 @@ import org.apache.spark.util.AccumulatorContext.internOption */ class SQLMetric( val metricType: String, -initValue: Long = 0L, -defaultValidValue: Long = 0L) extends AccumulatorV2[Long, Long] { - // initValue defines the initial value of the metric. defaultValidValue defines the lowest value - // considered valid. If a SQLMetric is invalid, it is set to defaultValidValue upon receiving any - // updates, and it also reports defaultValidValue as its value to avoid exposing it to the user - // programatically. +initValue: Long = 0L) extends AccumulatorV2[Long, Long] { + // initValue defines the initial value of the metric. 0 is the lowest value considered valid. + // If a SQLMetric is invalid, it is set to 0 upon receiving any updates, and it also reports + // 0 as its value to avoid exposing it to the user programmatically. // - // For many SQLMetrics, we use initValue = -1 and defaultValidValue = 0 to indicate that the - // metric is by default invalid. At the end of a task, we will update the metric making it valid, - // and the invalid metrics will be filtered out when calculating min, max, etc. as a workaround + // For many SQLMetrics, we use initValue = -1 to indicate that the metric is by default invalid. + // At the end of a task, we will update the metric making it valid, and the invalid metrics will + // be filtered out when calculating min, max, etc. as a workaround // for SPARK-11013. + assert(initValue <= 0) + // _value will always be either initValue or non-negative. private var _value = initValue override def copy(): SQLMetric = { -val newAcc = new SQLMetric(metricType, initValue, defaultValidValue) +val newAcc = new SQLMetric(metricType, initValue) newAcc._value = _value newAcc } @@ -62,8 +62,8 @@ class SQLMetric( override def merge(other: AccumulatorV2[Long, Long]): Unit = other match { case o: SQLMetric => - if (o.isValid) { -if (!isValid) _value = defaultValidValue + if (!o.isZero) { +if (isZero) _value = 0 _value += o.value } case _ => throw QueryExecutionErrors.cannotMergeClassWithOtherClassError( @@ -73,28 +73,32 @@ class SQLMetric( // This is used to filter out metrics. Metrics with value equal to initValue should // be filtered out, since they are either invalid or safe to filter without changing // the aggregation defined in [[SQLMetrics.stringValue]]. - // Note that we don't use defaultValidValue here since we may want to collect - // defaultValidValue metrics for calculating min, max, etc. See SPARK-11013. + // Note that we don't use 0 here since we may want to collect 0 metrics for + // calculating min, max, etc. See
(spark) branch master updated: [SPARK-46751][PYTHON][TESTS] Skip test_datasource if PyArrow is not installed
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 751a81b23bd3 [SPARK-46751][PYTHON][TESTS] Skip test_datasource if PyArrow is not installed 751a81b23bd3 is described below commit 751a81b23bd35ebe6a3fc1f328405c5d57291a0c Author: Hyukjin Kwon AuthorDate: Wed Jan 17 18:29:08 2024 -0800 [SPARK-46751][PYTHON][TESTS] Skip test_datasource if PyArrow is not installed ### What changes were proposed in this pull request? This PR proposes to skip `test_datasource` if PyArrow is not installed because it requires `mapInArrow` that needs PyArrow. ### Why are the changes needed? To make the build pass with the env that does not have PyArrow installed. Currently scheduled job fails (with PyPy3): https://github.com/apache/spark/actions/runs/7557652490/job/20577472214 ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Scheduled jobs should test it out. I also manually tested it. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44776 from HyukjinKwon/SPARK-46751. Authored-by: Hyukjin Kwon Signed-off-by: Dongjoon Hyun --- python/pyspark/sql/tests/test_python_datasource.py | 5 + 1 file changed, 5 insertions(+) diff --git a/python/pyspark/sql/tests/test_python_datasource.py b/python/pyspark/sql/tests/test_python_datasource.py index 79414cb7ed69..6ba4b68b02ba 100644 --- a/python/pyspark/sql/tests/test_python_datasource.py +++ b/python/pyspark/sql/tests/test_python_datasource.py @@ -29,11 +29,16 @@ from pyspark.sql.datasource import ( CaseInsensitiveDict, ) from pyspark.sql.types import Row, StructType +from pyspark.testing.sqlutils import ( +have_pyarrow, +pyarrow_requirement_message, +) from pyspark.testing import assertDataFrameEqual from pyspark.testing.sqlutils import ReusedSQLTestCase from pyspark.testing.utils import SPARK_HOME +@unittest.skipIf(not have_pyarrow, pyarrow_requirement_message) class BasePythonDataSourceTestsMixin: def test_basic_data_source_class(self): class MyDataSource(DataSource): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46748][CORE] Remove `*slave*.sh` scripts
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4d7b1ba986bc [SPARK-46748][CORE] Remove `*slave*.sh` scripts 4d7b1ba986bc is described below commit 4d7b1ba986bcbee91cce43084b043cff3da7d67c Author: Dongjoon Hyun AuthorDate: Wed Jan 17 18:26:32 2024 -0800 [SPARK-46748][CORE] Remove `*slave*.sh` scripts ### What changes were proposed in this pull request? As a part of SPARK-45869 (Revisit and Improve Spark Standalone Cluster), this PR aims to remove `*slave*` scripts from `sbin` directory for Apache Spark 4.0.0 codebase and binary distributions. ``` spark-3.5.0-bin-hadoop3:$ ls -al sbin/*slave* -rwxr-xr-x 1 dongjoon staff 981 Sep 8 19:08 sbin/decommission-slave.sh -rwxr-xr-x 1 dongjoon staff 957 Sep 8 19:08 sbin/slaves.sh -rwxr-xr-x 1 dongjoon staff 967 Sep 8 19:08 sbin/start-slave.sh -rwxr-xr-x 1 dongjoon staff 969 Sep 8 19:08 sbin/start-slaves.sh -rwxr-xr-x 1 dongjoon staff 965 Sep 8 19:08 sbin/stop-slave.sh -rwxr-xr-x 1 dongjoon staff 967 Sep 8 19:08 sbin/stop-slaves.sh ``` ### Why are the changes needed? `*slave*.sh` scripts are deprecated at Apache Spark 3.1.0 (March, 2021) via SPARK-32004 (July 2020). ### Does this PR introduce _any_ user-facing change? Yes, but - these are only wrapper scripts for legacy environments and were removed from all documents. - the new alternative corresponding scripts have been documented instead and used for last 3 years. - we can simplify the `sbin` directory of binary distributions for a better UX. - Apache Spark 4.0.0 is a good and the last chance to clean up these. ### How was this patch tested? Pass the CI and manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44773 from dongjoon-hyun/SPARK-46748. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- sbin/decommission-slave.sh | 23 --- sbin/slaves.sh | 23 --- sbin/start-slave.sh| 23 --- sbin/start-slaves.sh | 23 --- sbin/stop-slave.sh | 23 --- sbin/stop-slaves.sh| 23 --- 6 files changed, 138 deletions(-) diff --git a/sbin/decommission-slave.sh b/sbin/decommission-slave.sh deleted file mode 100755 index 858bede1d287.. --- a/sbin/decommission-slave.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" - ->&2 echo "This script is deprecated, use decommission-worker.sh" -"${DIR}/decommission-worker.sh" "$@" diff --git a/sbin/slaves.sh b/sbin/slaves.sh deleted file mode 100755 index b92007ecdfad.. --- a/sbin/slaves.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" - ->&2 echo "This script is deprecated, use workers.sh" -"${DIR}/workers.sh" "$@" diff --git a/sbin/start-slave.sh b/sbin/start-slave.sh deleted file mode 100755 index
(spark) branch master updated: [SPARK-46749][DOCS] Document `SPARK_LOG_*` and `SPARK_PID_DIR`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new be1e8d8792ed [SPARK-46749][DOCS] Document `SPARK_LOG_*` and `SPARK_PID_DIR` be1e8d8792ed is described below commit be1e8d8792edea737bd2800fe75998a807fba141 Author: Dongjoon Hyun AuthorDate: Thu Jan 18 09:46:31 2024 +0900 [SPARK-46749][DOCS] Document `SPARK_LOG_*` and `SPARK_PID_DIR` ### What changes were proposed in this pull request? This PR aims to document the following three environment variables for `Spark Standalone` cluster. - SPARK_LOG_DIR - SPARK_LOG_MAX_FILES - SPARK_PID_DIR ### Why are the changes needed? So far, the users need to look at the `spark-env.sh.template` or `spark-daemon.sh` files to see the descriptions and the default values. We had better document it officially. https://github.com/apache/spark/blob/9a2f39318e3af8b3817dc5e4baf52e548d82063c/conf/spark-env.sh.template#L67-L69 https://github.com/apache/spark/blob/9a2f39318e3af8b3817dc5e4baf52e548d82063c/sbin/spark-daemon.sh#L25-L28 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Generate HTML docs. ![Screenshot 2024-01-17 at 10 38 09 AM](https://github.com/apache/spark/assets/9700541/7b6106dc-5105-4653-94aa-0fc05af5a762) ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44774 from dongjoon-hyun/SPARK-46749. Authored-by: Dongjoon Hyun Signed-off-by: Hyukjin Kwon --- docs/spark-standalone.md | 12 1 file changed, 12 insertions(+) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 0bc73978570b..b9e3bb5d3f7f 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -142,6 +142,18 @@ You can optionally configure the cluster further by setting environment variable comma-separated list of multiple directories on different disks. + +SPARK_LOG_DIR +Where log files are stored. (default: SPARK_HOME/logs). + + +SPARK_LOG_MAX_FILES +The maximum number of log files (default: 5). + + +SPARK_PID_DIR +Where pid files are stored. (default: /tmp). + SPARK_WORKER_CORES Total number of cores to allow Spark applications to use on the machine (default: all available cores). - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46750][CONNECT][PYTHON] DataFrame APIs code clean up
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 764cb3e07364 [SPARK-46750][CONNECT][PYTHON] DataFrame APIs code clean up 764cb3e07364 is described below commit 764cb3e073644b9d543502d8951c47e41ba0f46b Author: Ruifeng Zheng AuthorDate: Thu Jan 18 08:17:29 2024 +0800 [SPARK-46750][CONNECT][PYTHON] DataFrame APIs code clean up ### What changes were proposed in this pull request? 1, unify the import; 2, delete unused helper functions and variables; ### Why are the changes needed? code clean up ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #44771 from zhengruifeng/py_df_cleanup. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/dataframe.py | 38 ++--- python/pyspark/sql/connect/group.py | 15 - 2 files changed, 15 insertions(+), 38 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 7ee27065208c..0cf6c0921f78 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -72,19 +72,12 @@ from pyspark.sql.connect.readwriter import DataFrameWriter, DataFrameWriterV2 from pyspark.sql.connect.streaming.readwriter import DataStreamWriter from pyspark.sql.connect.column import Column from pyspark.sql.connect.expressions import ( +SortOrder, ColumnReference, UnresolvedRegex, UnresolvedStar, ) -from pyspark.sql.connect.functions.builtin import ( -_to_col, -_invoke_function, -col, -lit, -udf, -struct, -expr as sql_expression, -) +from pyspark.sql.connect.functions import builtin as F from pyspark.sql.pandas.types import from_arrow_schema @@ -199,9 +192,9 @@ class DataFrame: expr = expr[0] # type: ignore[assignment] for element in expr: if isinstance(element, str): -sql_expr.append(sql_expression(element)) +sql_expr.append(F.expr(element)) else: -sql_expr.extend([sql_expression(e) for e in element]) +sql_expr.extend([F.expr(e) for e in element]) return DataFrame(plan.Project(self._plan, *sql_expr), session=self._session) @@ -215,7 +208,7 @@ class DataFrame: ) if len(exprs) == 1 and isinstance(exprs[0], dict): -measures = [_invoke_function(f, col(e)) for e, f in exprs[0].items()] +measures = [F._invoke_function(f, F.col(e)) for e, f in exprs[0].items()] return self.groupBy().agg(*measures) else: # other expressions @@ -259,7 +252,7 @@ class DataFrame: sparkSession.__doc__ = PySparkDataFrame.sparkSession.__doc__ def count(self) -> int: -table, _ = self.agg(_invoke_function("count", lit(1)))._to_table() +table, _ = self.agg(F._invoke_function("count", F.lit(1)))._to_table() return table[0][0].as_py() count.__doc__ = PySparkDataFrame.count.__doc__ @@ -352,8 +345,6 @@ class DataFrame: self, numPartitions: Union[int, "ColumnOrName"], *cols: "ColumnOrName" ) -> "DataFrame": def _convert_col(col: "ColumnOrName") -> "ColumnOrName": -from pyspark.sql.connect.expressions import SortOrder, ColumnReference - if isinstance(col, Column): if isinstance(col._expr, SortOrder): return col @@ -471,7 +462,7 @@ class DataFrame: def filter(self, condition: Union[Column, str]) -> "DataFrame": if isinstance(condition, str): -expr = sql_expression(condition) +expr = F.expr(condition) else: expr = condition return DataFrame(plan.Filter(child=self._plan, filter=expr), session=self._session) @@ -713,7 +704,7 @@ class DataFrame: ) else: _c = c # type: ignore[assignment] -_cols.append(_to_col(cast("ColumnOrName", _c))) +_cols.append(F._to_col(cast("ColumnOrName", _c))) ascending = kwargs.get("ascending", True) if isinstance(ascending, (bool, int)): @@ -1652,8 +1643,6 @@ class DataFrame: def sampleBy( self, col: "ColumnOrName", fractions: Dict[Any, float], seed: Optional[int] = None ) -> "DataFrame": -from pyspark.sql.connect.expressions import ColumnReference - if isinstance(col, str): col = Column(ColumnReference(col)) elif not isinstance(col, Column): @@ -1754,7 +1743,7 @@ class DataFrame: elif isinstance(item, (list,
(spark) branch branch-3.5 updated: [SPARK-46663][PYTHON][3.5] Disable memory profiler for pandas UDFs with iterators
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new d083da76b4d6 [SPARK-46663][PYTHON][3.5] Disable memory profiler for pandas UDFs with iterators d083da76b4d6 is described below commit d083da76b4d6b4f1351f2b4597840e2cc1a8683a Author: Xinrong Meng AuthorDate: Thu Jan 18 09:01:05 2024 +0900 [SPARK-46663][PYTHON][3.5] Disable memory profiler for pandas UDFs with iterators ### What changes were proposed in this pull request? When using pandas UDFs with iterators, if users enable the profiling spark conf, a warning indicating non-support should be raised, and profiling should be disabled. However, currently, after raising the not-supported warning, the memory profiler is still being enabled. The PR proposed to fix that. ### Why are the changes needed? A bug fix to eliminate misleading behavior. ### Does this PR introduce _any_ user-facing change? The noticeable changes will affect only those using the PySpark shell. This is because, in the PySpark shell, the memory profiler will raise an error, which in turn blocks the execution of the UDF. ### How was this patch tested? Manual test. ### Was this patch authored or co-authored using generative AI tooling? Setup: ```py $ ./bin/pyspark --conf spark.python.profile=true >>> from typing import Iterator >>> from pyspark.sql.functions import * >>> import pandas as pd >>> pandas_udf("long") ... def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: ... for s in iterator: ... yield s + 1 ... >>> df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"])) ``` Before: ``` >>> df.select(plus_one(df.v)).show() UserWarning: Profiling UDFs with iterators input/output is not supported. Traceback (most recent call last): ... OSError: could not get source code ``` After: ``` >>> df.select(plus_one(df.v)).show() /Users/xinrong.meng/spark/python/pyspark/sql/udf.py:417: UserWarning: Profiling UDFs with iterators input/output is not supported. +---+ |plus_one(v)| +---+ | 2| | 3| | 4| +---+ ``` Closes #44760 from xinrong-meng/PR_TOOL_PICK_PR_44668_BRANCH-3.5. Authored-by: Xinrong Meng Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/test_udf_profiler.py | 53 ++- python/pyspark/sql/udf.py | 32 2 files changed, 67 insertions(+), 18 deletions(-) diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 136f423d0a35..019e502ec67c 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -19,11 +19,19 @@ import tempfile import unittest import os import sys +import warnings from io import StringIO +from typing import Iterator from pyspark import SparkConf from pyspark.sql import SparkSession -from pyspark.sql.functions import udf +from pyspark.sql.functions import udf, pandas_udf +from pyspark.testing.sqlutils import ( +have_pandas, +have_pyarrow, +pandas_requirement_message, +pyarrow_requirement_message, +) from pyspark.profiler import UDFBasicProfiler @@ -101,6 +109,49 @@ class UDFProfilerTests(unittest.TestCase): df = self.spark.range(10) df.select(add1("id"), add2("id"), add1("id")).collect() +# Unsupported +def exec_pandas_udf_iter_to_iter(self): +import pandas as pd + +@pandas_udf("int") +def iter_to_iter(batch_ser: Iterator[pd.Series]) -> Iterator[pd.Series]: +for ser in batch_ser: +yield ser + 1 + +self.spark.range(10).select(iter_to_iter("id")).collect() + +# Unsupported +def exec_map(self): +import pandas as pd + +def map(pdfs: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]: +for pdf in pdfs: +yield pdf[pdf.id == 1] + +df = self.spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0)], ("id", "v")) +df.mapInPandas(map, schema=df.schema).collect() + +@unittest.skipIf(not have_pandas, pandas_requirement_message) # type: ignore +@unittest.skipIf(not have_pyarrow, pyarrow_requirement_message) # type: ignore +def test_unsupported(self): +with warnings.catch_warnings(record=True) as warns: +warnings.simplefilter("always") +self.exec_pandas_udf_iter_to_iter() +user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)] +self.assertTrue(len(user_warns) > 0) +
(spark) branch master updated: [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d8703dd1c7bb [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler d8703dd1c7bb is described below commit d8703dd1c7bbdac24dbcbf3e3eeb9c54595d53c3 Author: Takuya UESHIN AuthorDate: Thu Jan 18 08:59:27 2024 +0900 [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler ### What changes were proposed in this pull request? Basic support of SparkSession based Python UDF profiler. To enable the profiler, use a SQL conf `spark.sql.pyspark.udf.profiler`: - `"perf"`: enable cProfiler - `"memory"`: enable memory-profiler (TODO: [SPARK-46687](https://issues.apache.org/jira/browse/SPARK-46687)) ```py from pyspark.sql.functions import * spark.conf.set("spark.sql.pyspark.udf.profiler", "perf") # enable cProfiler udf("string") def f(x): return str(x) df = spark.range(10).select(f(col("id"))) df.collect() pandas_udf("string") def g(x): return x.astype("string") df = spark.range(10).select(g(col("id"))) spark.conf.unset("spark.sql.pyspark.udf.profiler") # disable df.collect() # won't profile spark.showPerfProfiles() # show the result for only the first collect. ``` ### Why are the changes needed? The existing UDF profilers are SparkContext based, which can't support Spark Connect. We should introduce SparkSession based profilers and support Spark Connect. ### Does this PR introduce _any_ user-facing change? Yes, SparkSession-based UDF profilers will be available. ### How was this patch tested? Added the related tests, manually, and existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44697 from ueshin/issues/SPARK-46686/profiler. Authored-by: Takuya UESHIN Signed-off-by: Hyukjin Kwon --- .../connect/execution/ExecuteThreadRunner.scala| 22 +- .../sql/connect/planner/SparkConnectPlanner.scala | 8 +- .../spark/sql/connect/service/SessionHolder.scala | 10 + .../org/apache/spark/api/python/PythonRDD.scala| 11 +- .../org/apache/spark/api/python/PythonRunner.scala | 5 +- .../spark/api/python/PythonWorkerUtils.scala | 4 +- dev/sparktestsupport/modules.py| 1 + python/pyspark/accumulators.py | 4 + python/pyspark/profiler.py | 3 +- python/pyspark/sql/_typing.pyi | 5 + python/pyspark/sql/connect/client/core.py | 13 +- python/pyspark/sql/connect/profiler.py | 41 python/pyspark/sql/connect/session.py | 10 + python/pyspark/sql/profiler.py | 176 python/pyspark/sql/session.py | 8 + .../sql/tests/connect/test_parity_udf_profiler.py | 59 ++ python/pyspark/sql/tests/test_udf_profiler.py | 232 +++-- python/pyspark/worker.py | 97 - .../org/apache/spark/sql/internal/SQLConf.scala| 13 ++ .../v2/python/UserDefinedPythonDataSource.scala| 2 +- .../execution/python/AggregateInPandasExec.scala | 12 +- .../ApplyInPandasWithStatePythonRunner.scala | 8 +- .../sql/execution/python/ArrowEvalPythonExec.scala | 11 +- .../sql/execution/python/ArrowPythonRunner.scala | 20 +- .../sql/execution/python/BatchEvalPythonExec.scala | 12 +- .../execution/python/BatchEvalPythonUDTFExec.scala | 2 +- .../python/CoGroupedArrowPythonRunner.scala| 11 +- .../python/EvalPythonEvaluatorFactory.scala| 11 +- .../python/FlatMapCoGroupsInBatchExec.scala| 9 +- .../python/FlatMapGroupsInBatchExec.scala | 9 +- .../FlatMapGroupsInPandasWithStateExec.scala | 6 +- .../python/MapInBatchEvaluatorFactory.scala| 7 +- .../sql/execution/python/MapInBatchExec.scala | 5 +- .../sql/execution/python/PythonUDFRunner.scala | 52 +++-- .../python/WindowInPandasEvaluatorFactory.scala| 14 +- .../sql/execution/python/WindowInPandasExec.scala | 3 +- 36 files changed, 797 insertions(+), 119 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala index 0ecdc4bdef96..41146e4ef688 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala +++
(spark) branch master updated: [SPARK-46746][SQL][AVRO] Attach codec extension to avro datasource files
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9a2f39318e3a [SPARK-46746][SQL][AVRO] Attach codec extension to avro datasource files 9a2f39318e3a is described below commit 9a2f39318e3af8b3817dc5e4baf52e548d82063c Author: Kent Yao AuthorDate: Wed Jan 17 07:20:32 2024 -0800 [SPARK-46746][SQL][AVRO] Attach codec extension to avro datasource files ### What changes were proposed in this pull request? This PR attaches codec extension to avro datasource files. ``` part-0-2d4a2c78-a62a-4f7d-a286-5572dcdefade-c000.zstandard.avro part-0-74c04de5-c991-4a40-8740-8d472f4ce2ec-c000.avro part-0-965d0e93-9f86-40f9-8544-d71d14cc9787-c000.xz.avro part-2-965d0e93-9f86-40f9-8544-d71d14cc9787-c000.snappy.avro ``` ### Why are the changes needed? Feature parity with parquet and orc file sources, which is useful to differentiate compression codecs of Avro files ### Does this PR introduce _any_ user-facing change? No, this more likely belong to underlying data storage layer ### How was this patch tested? new unit tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #44770 from yaooqinn/SPARK-46746. Authored-by: Kent Yao Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/avro/AvroOutputWriterFactory.scala | 10 +- .../scala/org/apache/spark/sql/avro/AvroCodecSuite.scala | 16 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala index 3eba013c1435..15c76ec358ed 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.avro import org.apache.avro.Schema +import org.apache.avro.mapreduce.AvroJob import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} @@ -38,7 +39,14 @@ private[sql] class AvroOutputWriterFactory( private lazy val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString) - override def getFileExtension(context: TaskAttemptContext): String = ".avro" + override def getFileExtension(context: TaskAttemptContext): String = { +val codec = context.getConfiguration.get(AvroJob.CONF_OUTPUT_CODEC) +if (codec == null || codec.equalsIgnoreCase("null")) { + ".avro" +} else { + s".$codec.avro" +} + } override def newInstance( path: String, diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala index 4e4942e1b2e2..ec3753b84a55 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala @@ -26,4 +26,20 @@ class AvroCodecSuite extends FileSourceCodecSuite { override val codecConfigName: String = SQLConf.AVRO_COMPRESSION_CODEC.key override protected def availableCodecs = AvroCompressionCodec.values().map(_.lowerCaseName()).iterator.to(Seq) + + availableCodecs.foreach { codec => +test(s"SPARK-46746: attach codec name to avro files - codec $codec") { + withTable("avro_t") { +sql( + s"""CREATE TABLE avro_t + | USING $format OPTIONS('compression'='$codec') + | AS SELECT 1 as id + | """.stripMargin) +spark.table("avro_t") + .inputFiles.foreach { file => +assert(file.endsWith(s"$codec.avro".stripPrefix("uncompressed"))) + } + } +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46525][DOCKER][TESTS][FOLLOWUP] Fix docker-integration-tests on Apple Silicon for db2 and oracle with third-party docker environments
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e3fd64b96765 [SPARK-46525][DOCKER][TESTS][FOLLOWUP] Fix docker-integration-tests on Apple Silicon for db2 and oracle with third-party docker environments e3fd64b96765 is described below commit e3fd64b96765dc04994798e79161026bd1361230 Author: Kent Yao AuthorDate: Wed Jan 17 07:19:06 2024 -0800 [SPARK-46525][DOCKER][TESTS][FOLLOWUP] Fix docker-integration-tests on Apple Silicon for db2 and oracle with third-party docker environments ### What changes were proposed in this pull request? SPARK-46525 makes docker-integration-tests pass most tests on Apple Silicon except DB2 and Oracle. This PR modifies DockerJDBCIntegrationSuite to make it compatible with some of the third-party docker environments, such as the Colima docker environment. Developers can quickly bring up these tests for local testing after a simple pre-setup process. ### Why are the changes needed? Make it possible to test and debug locally for developers on Apple Silicon platforms. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? - Passed OracleIntegrationSuite on my Apple Silicon Mac locally ``` [info] OracleIntegrationSuite: [info] - SPARK-33034: ALTER TABLE ... add new columns (21 seconds, 839 milliseconds) [info] [info] Run completed in 3 minutes, 16 seconds. [info] Total number of tests run: 26 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 26, failed 0, canceled 0, ignored 11, pending 0 [info] All tests passed. [success] Total time: 326 s (05:26), completed Jan 5, 2024, 7:10:36 PM ``` - Containers ran normally on my Apple Silicon Mac locally during and after tests above ``` 5ea7cc54165b ibmcom/db2:11.5.6.0a "/var/db2_setup/lib/…" 36 minutes ago Up 36 minutes 22/tcp, 55000/tcp, 60006-60007/tcp, 0.0.0.0:57898->5/tcp, :::57898->5/tcp strange_ritchie d31122b8a504 gvenzl/oracle-free:23.3 "container-entrypoin…" About an hour ago Up About an hour 0.0.0.0:64193->1521/tcp, :::64193->1521/tcp priceless_wright 75f9943fd4b6 mariadb:10.5.12 "/docker-entrypoint/…" 2 hours ago Up 2 hours 0.0.0.0:55052->3306/tcp, :::55052->3306/tcp angry_ganguly ``` ### Was this patch authored or co-authored using generative AI tooling? no Closes #44612 from yaooqinn/SPARK-46525-F. Authored-by: Kent Yao Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala | 3 ++- .../test/scala/org/apache/spark/sql/jdbc/OracleDatabaseOnDocker.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala index fde228939dd4..4b5820c2302f 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala @@ -156,7 +156,7 @@ abstract class DockerJDBCIntegrationSuite .newHostConfig() .withNetworkMode("bridge") .withPrivileged(db.privileged) - .withPortBindings(PortBinding.parse(s"$dockerIp:$externalPort:${db.jdbcPort}")) +.withPortBindings(PortBinding.parse(s"$externalPort:${db.jdbcPort}")) if (db.usesIpc) { hostConfig.withIpcMode("host") @@ -197,6 +197,7 @@ abstract class DockerJDBCIntegrationSuite } } catch { case NonFatal(e) => +logError(s"Failed to initialize Docker container for ${this.getClass.getName}", e) try { afterAll() } finally { diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleDatabaseOnDocker.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleDatabaseOnDocker.scala index 98d57d6a7475..fa563ae54a96 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleDatabaseOnDocker.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleDatabaseOnDocker.scala @@ -57,7 +57,7 @@ class OracleDatabaseOnDocker extends DatabaseOnDocker with Logging { val newBind = new Bind( dir.getAbsolutePath, new Volume("/docker-entrypoint-initdb.d"), -AccessMode.ro) +
(spark) branch master updated: [SPARK-46739][SQL] Add the error class `UNSUPPORTED_CALL`
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bccbdb72b004 [SPARK-46739][SQL] Add the error class `UNSUPPORTED_CALL` bccbdb72b004 is described below commit bccbdb72b004f90ed8605d5bcaaf8b4605d8d099 Author: Max Gekk AuthorDate: Wed Jan 17 15:41:01 2024 +0300 [SPARK-46739][SQL] Add the error class `UNSUPPORTED_CALL` ### What changes were proposed in this pull request? In the PR, I propose to add new error class for unsupported method calls, and remove similar legacy error classes. New `apply()` method of `SparkUnsupportedOperationException` extracts method and class name from stack traces automatically, and places them to error class parameters. ### Why are the changes needed? To improve code maintenance by avoid boilerplate code (extract class and method names automatically), and to clean up `error-classes.json`. ### Does this PR introduce _any_ user-facing change? Yes, it can if user's code depends on the error class or message format of `SparkUnsupportedOperationException`. ### How was this patch tested? By running new test: ``` $ build/sbt "test:testOnly *QueryCompilationErrorsSuite" ``` and the affected test suites: ``` $ build/sbt "core/testOnly *SparkThrowableSuite" $ build/sbt "test:testOnly *ShuffleSpecSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44757 from MaxGekk/unsupported_call-error-class. Authored-by: Max Gekk Signed-off-by: Max Gekk --- .../src/main/resources/error/error-classes.json| 181 + .../scala/org/apache/spark/SparkException.scala| 13 ++ docs/sql-error-conditions.md | 6 + .../sql/catalyst/expressions/UnsafeArrayData.java | 4 +- .../spark/sql/catalyst/expressions/UnsafeRow.java | 2 +- .../spark/sql/connector/write/WriteBuilder.java| 2 +- .../spark/sql/vectorized/ArrowColumnVector.java| 26 +-- .../apache/spark/sql/vectorized/ColumnarArray.java | 4 +- .../spark/sql/vectorized/ColumnarBatchRow.java | 6 +- .../apache/spark/sql/vectorized/ColumnarRow.java | 6 +- .../spark/sql/catalyst/ProjectingInternalRow.scala | 8 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/analysis/FunctionRegistry.scala | 14 +- .../catalog/FunctionExpressionBuilder.scala| 2 +- .../sql/catalyst/catalog/functionResources.scala | 2 +- .../spark/sql/catalyst/expressions/grouping.scala | 6 +- .../catalyst/expressions/namedExpressions.scala| 18 +- .../spark/sql/catalyst/expressions/ordering.scala | 2 +- .../spark/sql/catalyst/plans/joinTypes.scala | 2 +- .../sql/catalyst/plans/physical/partitioning.scala | 4 +- .../catalyst/util/ResolveDefaultColumnsUtil.scala | 2 +- .../spark/sql/catalyst/ShuffleSpecSuite.scala | 13 +- .../sql/errors/QueryCompilationErrorsSuite.scala | 14 +- 23 files changed, 99 insertions(+), 240 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 19817ced3356..2fa86de3daa3 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3592,6 +3592,12 @@ ], "sqlState" : "0A000" }, + "UNSUPPORTED_CALL" : { +"message" : [ + "Cannot call the method \"\" of the class \"\"." +], +"sqlState" : "0A000" + }, "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING" : { "message" : [ "The char/varchar type can't be used in the table schema.", @@ -7133,16 +7139,6 @@ "Cannot bind a V1 function." ] }, - "_LEGACY_ERROR_TEMP_3111" : { -"message" : [ - "" -] - }, - "_LEGACY_ERROR_TEMP_3112" : { -"message" : [ - "Operation unsupported for " -] - }, "_LEGACY_ERROR_TEMP_3113" : { "message" : [ "UnresolvedTableSpec doesn't have a data type" @@ -7153,76 +7149,11 @@ "UnresolvedTableSpec doesn't have a data type" ] }, - "_LEGACY_ERROR_TEMP_3115" : { -"message" : [ - "" -] - }, - "_LEGACY_ERROR_TEMP_3116" : { -"message" : [ - "" -] - }, - "_LEGACY_ERROR_TEMP_3117" : { -"message" : [ - "Cannot modify " -] - }, - "_LEGACY_ERROR_TEMP_3118" : { -"message" : [ - "" -] - }, - "_LEGACY_ERROR_TEMP_3119" : { -"message" : [ - "" -] - }, - "_LEGACY_ERROR_TEMP_3120" : { -"message" : [ - "" -] - }, "_LEGACY_ERROR_TEMP_3121" : { "message" : [ "A HllSketch instance cannot be updates with a Spark type" ] }, - "_LEGACY_ERROR_TEMP_3122" : { -"message" : [ - "" -] - }, -