[spark] branch master updated: [SPARK-40810][SQL] Use SparkIllegalArgumentException instead of IllegalArgumentException in CreateDatabaseCommand & AlterDatabaseSetLocationCommand
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0110b1ed88d [SPARK-40810][SQL] Use SparkIllegalArgumentException instead of IllegalArgumentException in CreateDatabaseCommand & AlterDatabaseSetLocationCommand 0110b1ed88d is described below commit 0110b1ed88d3c140e1bdf04888740070f1d9d992 Author: panbingkun AuthorDate: Wed Oct 19 10:33:22 2022 +0500 [SPARK-40810][SQL] Use SparkIllegalArgumentException instead of IllegalArgumentException in CreateDatabaseCommand & AlterDatabaseSetLocationCommand ### What changes were proposed in this pull request? This pr aims to use SparkIllegalArgumentException instead of IllegalArgumentException in CreateDatabaseCommand & AlterDatabaseSetLocationCommand. ### Why are the changes needed? When I work on https://issues.apache.org/jira/browse/SPARK-40790, I found when `location` is empty, DDL command(CreateDatabaseCommand & AlterDatabaseSetLocationCommand) throw IllegalArgumentException, it seem not to fit into the new error framework. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existed UT. Closes #38274 from panbingkun/setNamespaceLocation_error. Authored-by: panbingkun Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 5 + .../apache/spark/sql/errors/QueryExecutionErrors.scala| 6 ++ .../sql/catalyst/analysis/ResolveSessionCatalog.scala | 10 +- .../execution/datasources/v2/DataSourceV2Strategy.scala | 9 + .../command/AlterNamespaceSetLocationSuiteBase.scala | 12 .../sql/execution/command/CreateNamespaceSuiteBase.scala | 15 --- 6 files changed, 45 insertions(+), 12 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 7f42d8acc53..0cfb6861c77 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -837,6 +837,11 @@ } } }, + "UNSUPPORTED_EMPTY_LOCATION" : { +"message" : [ + "Unsupported empty location." +] + }, "UNSUPPORTED_FEATURE" : { "message" : [ "The feature is not supported:" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 89c0cf5fafa..5edffc87b84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2657,4 +2657,10 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { "numElements" -> numElements.toString, "size" -> elementSize.toString)) } + + def unsupportedEmptyLocationError(): SparkIllegalArgumentException = { +new SparkIllegalArgumentException( + errorClass = "UNSUPPORTED_EMPTY_LOCATION", + messageParameters = Map.empty) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 56236f0d2ad..d00d07150b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.commons.lang3.StringUtils + import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils} @@ -27,7 +29,7 @@ import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDe import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1, DataSource} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 @@ -131,6 +133,9 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AlterDatabasePropertiesCommand(db, properties) case SetNamespaceLocation(DatabaseInSessionCatalog(db), location) if
[spark] branch master updated: [SPARK-40826][SS] Add additional checkpoint rename file check
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0aa877e2819 [SPARK-40826][SS] Add additional checkpoint rename file check 0aa877e2819 is described below commit 0aa877e2819d72204045876d7e916627d9b7f679 Author: Liang-Chi Hsieh AuthorDate: Tue Oct 18 22:05:53 2022 -0700 [SPARK-40826][SS] Add additional checkpoint rename file check ### What changes were proposed in this pull request? This adds additional checkpoint rename file check. ### Why are the changes needed? We encountered an issue recently that one customer's structured streaming job failed to read delta file. The temporary file exists but it was not successfully renamed to final delta file path. We currently don't check if renamed file exists but assume it successful. As the result, failing to read delta file assumed to be committed in last batch makes re-triggering the job impossible. We should be able to do a check against checkpoint renamed file to prevent such difficulty in advance. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #38291 from viirya/add_file_check. Authored-by: Liang-Chi Hsieh Signed-off-by: Dongjoon Hyun --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 10 ++ .../spark/sql/execution/streaming/CheckpointFileManager.scala | 7 +++ 2 files changed, 17 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2f96209222b..a99a795018d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1830,6 +1830,14 @@ object SQLConf { .stringConf .createWithDefault("lz4") + val CHECKPOINT_RENAMEDFILE_CHECK_ENABLED = +buildConf("spark.sql.streaming.checkpoint.renamedFileCheck.enabled") + .doc("When true, Spark will validate if renamed checkpoint file exists.") + .internal() + .version("3.4.0") + .booleanConf + .createWithDefault(false) + /** * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places should be updated * together. @@ -4234,6 +4242,8 @@ class SQLConf extends Serializable with Logging { def stateStoreCompressionCodec: String = getConf(STATE_STORE_COMPRESSION_CODEC) + def checkpointRenamedFileCheck: Boolean = getConf(CHECKPOINT_RENAMEDFILE_CHECK_ENABLED) + def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED) def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index cf5d54fd20a..013efd3c7ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -158,6 +158,13 @@ object CheckpointFileManager extends Logging { s"Failed to rename temp file $tempPath to $finalPath because file exists", fe) if (!overwriteIfPossible) throw fe } + +// Optionally, check if the renamed file exists +if (SQLConf.get.checkpointRenamedFileCheck && !fm.exists(finalPath)) { + throw new IllegalStateException(s"Renamed temp file $tempPath to $finalPath. " + +s"But $finalPath does not exist.") +} + logInfo(s"Renamed temp file $tempPath to $finalPath") } finally { terminated = true - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40655][PYTHON][PROTOBUF] PySpark support for from_protobuf and to_protobuf
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ed9db14e8c7 [SPARK-40655][PYTHON][PROTOBUF] PySpark support for from_protobuf and to_protobuf ed9db14e8c7 is described below commit ed9db14e8c79f32a9a3420d908e449f48b555120 Author: SandishKumarHN AuthorDate: Wed Oct 19 12:04:25 2022 +0900 [SPARK-40655][PYTHON][PROTOBUF] PySpark support for from_protobuf and to_protobuf From SandishKumarHN(sanysandishgmail.com) and Mohan Parthasarathy(mposdev21gmail.com) This PR follows main PR https://github.com/apache/spark/pull/37972 The following is an example of how to use from_protobuf and to_protobuf in Pyspark. ```python data = [("1", (2, "Alice", 109200))] ddl_schema = "key STRING, value STRUCT" df = spark.createDataFrame(data, ddl_schema) desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' ... '26F746F33') import tempfile # Writing a protobuf description into a file, generated by using # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file with tempfile.TemporaryDirectory() as tmp_dir: ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir ... with open(desc_file_path, "wb") as f: ... _ = f.write(bytearray.fromhex(desc_hex)) ... f.flush() ... message_name = 'SimpleMessage' ... proto_df = df.select(to_protobuf(df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) ... proto_df = proto_df.select(from_protobuf(proto_df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) ++ |value | ++ |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]| ++ +--+ |value | +--+ |{2, Alice, 109200}| +--+ ``` ### Tests Covered - from_protobuf / to_protobuf (functions.py) Closes #38212 from SandishKumarHN/PYSPARK_PROTOBUF. Authored-by: SandishKumarHN Signed-off-by: Hyukjin Kwon --- .github/workflows/build_and_test.yml | 2 +- .../src/test/resources/protobuf/pyspark_test.proto | 32 +++ dev/sparktestsupport/modules.py| 14 +- dev/sparktestsupport/utils.py | 16 +- python/docs/source/reference/pyspark.sql/index.rst | 1 + .../pyspark.sql/{index.rst => protobuf.rst}| 30 +-- python/pyspark/sql/protobuf/__init__.py| 18 ++ python/pyspark/sql/protobuf/functions.py | 215 + 8 files changed, 296 insertions(+), 32 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index f6f5f026537..64dbe30012c 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -153,7 +153,7 @@ jobs: streaming, sql-kafka-0-10, streaming-kafka-0-10, mllib-local, mllib, yarn, mesos, kubernetes, hadoop-cloud, spark-ganglia-lgpl, -connect +connect, protobuf # Here, we split Hive and SQL tests into some of slow ones and the rest of them. included-tags: [""] excluded-tags: [""] diff --git a/connector/protobuf/src/test/resources/protobuf/pyspark_test.proto b/connector/protobuf/src/test/resources/protobuf/pyspark_test.proto new file mode 100644 index 000..8750371349a --- /dev/null +++ b/connector/protobuf/src/test/resources/protobuf/pyspark_test.proto @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the
[spark] branch master updated (646d7160bcf -> 14d8604eacb)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 646d7160bcf [SPARK-40816][CONNECT][PYTHON] Rename LogicalPlan.collect to LogicalPlan.to_proto add 14d8604eacb [SPARK-40823][CONNECT] Connect Proto should carry unparsed identifiers No new revisions were added by this update. Summary of changes: .../main/protobuf/spark/connect/expressions.proto | 2 +- .../main/protobuf/spark/connect/relations.proto| 2 +- .../org/apache/spark/sql/connect/dsl/package.scala | 6 +- .../sql/connect/planner/SparkConnectPlanner.scala | 7 ++- .../connect/planner/SparkConnectPlannerSuite.scala | 19 +++--- python/pyspark/sql/connect/column.py | 14 ++--- python/pyspark/sql/connect/plan.py | 10 ++-- .../pyspark/sql/connect/proto/expressions_pb2.py | 26 .../pyspark/sql/connect/proto/expressions_pb2.pyi | 14 ++--- python/pyspark/sql/connect/proto/relations_pb2.py | 70 +++--- python/pyspark/sql/connect/proto/relations_pb2.pyi | 14 ++--- 11 files changed, 91 insertions(+), 93 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark-docker] branch master updated: [SPARK-40833] Cleanup apt lists cache
This is an automated email from the ASF dual-hosted git repository. yikun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark-docker.git The following commit(s) were added to refs/heads/master by this push: new 95f5a1f [SPARK-40833] Cleanup apt lists cache 95f5a1f is described below commit 95f5a1f3e846ad3b6550e151fa76b70f6fe0b946 Author: Yikun Jiang AuthorDate: Wed Oct 19 10:17:58 2022 +0800 [SPARK-40833] Cleanup apt lists cache ### What changes were proposed in this pull request? Remove unused apt lists cache and apply `./add-dockerfiles.sh 3.3.0` ### Why are the changes needed? Clean cache to reduce docker image size. This is also [recommanded](https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#run) by docker community: ``` $ docker run --user 0:0 -ti apache/spark bash root5d1ca347279e:/opt/spark/work-dir# ls /var/lib/apt/lists/ auxfiles lock deb.debian.org_debian_dists_bullseye-updates_InRelease partial deb.debian.org_debian_dists_bullseye-updates_main_binary-arm64_Packages.lz4 security.debian.org_debian-security_dists_bullseye-security_InRelease deb.debian.org_debian_dists_bullseye_InRelease security.debian.org_debian-security_dists_bullseye-security_main_binary-arm64_Packages.lz4 deb.debian.org_debian_dists_bullseye_main_binary-arm64_Packages.lz4 root5d1ca347279e:/opt/spark/work-dir# du --max-depth=1 -h /var/lib/apt/lists/ 4.0K/var/lib/apt/lists/partial 4.0K/var/lib/apt/lists/auxfiles 17M /var/lib/apt/lists/ ``` ### Does this PR introduce _any_ user-facing change? Yes in some level, image size is reduced. ### How was this patch tested? K8s CI passed Closes #14 from Yikun/clean-apt-list. Authored-by: Yikun Jiang Signed-off-by: Yikun Jiang --- 3.3.0/scala2.12-java11-python3-r-ubuntu/Dockerfile | 3 ++- 3.3.0/scala2.12-java11-python3-ubuntu/Dockerfile | 3 ++- 3.3.0/scala2.12-java11-r-ubuntu/Dockerfile | 3 ++- 3.3.0/scala2.12-java11-ubuntu/Dockerfile | 3 ++- Dockerfile.template| 3 ++- 5 files changed, 10 insertions(+), 5 deletions(-) diff --git a/3.3.0/scala2.12-java11-python3-r-ubuntu/Dockerfile b/3.3.0/scala2.12-java11-python3-r-ubuntu/Dockerfile index 5dbc973..be9cbb0 100644 --- a/3.3.0/scala2.12-java11-python3-r-ubuntu/Dockerfile +++ b/3.3.0/scala2.12-java11-python3-r-ubuntu/Dockerfile @@ -38,7 +38,8 @@ RUN set -ex && \ ln -sv /bin/bash /bin/sh && \ echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \ chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \ -rm -rf /var/cache/apt/* +rm -rf /var/cache/apt/* && \ +rm -rf /var/lib/apt/lists/* # Install Apache Spark # https://downloads.apache.org/spark/KEYS diff --git a/3.3.0/scala2.12-java11-python3-ubuntu/Dockerfile b/3.3.0/scala2.12-java11-python3-ubuntu/Dockerfile index 85e06ce..096c7eb 100644 --- a/3.3.0/scala2.12-java11-python3-ubuntu/Dockerfile +++ b/3.3.0/scala2.12-java11-python3-ubuntu/Dockerfile @@ -37,7 +37,8 @@ RUN set -ex && \ ln -sv /bin/bash /bin/sh && \ echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \ chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \ -rm -rf /var/cache/apt/* +rm -rf /var/cache/apt/* && \ +rm -rf /var/lib/apt/lists/* # Install Apache Spark # https://downloads.apache.org/spark/KEYS diff --git a/3.3.0/scala2.12-java11-r-ubuntu/Dockerfile b/3.3.0/scala2.12-java11-r-ubuntu/Dockerfile index 753d585..2e085a2 100644 --- a/3.3.0/scala2.12-java11-r-ubuntu/Dockerfile +++ b/3.3.0/scala2.12-java11-r-ubuntu/Dockerfile @@ -35,7 +35,8 @@ RUN set -ex && \ ln -sv /bin/bash /bin/sh && \ echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \ chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \ -rm -rf /var/cache/apt/* +rm -rf /var/cache/apt/* && \ +rm -rf /var/lib/apt/lists/* # Install Apache Spark # https://downloads.apache.org/spark/KEYS diff --git a/3.3.0/scala2.12-java11-ubuntu/Dockerfile b/3.3.0/scala2.12-java11-ubuntu/Dockerfile index 1e4c604..5858e2d 100644 --- a/3.3.0/scala2.12-java11-ubuntu/Dockerfile +++ b/3.3.0/scala2.12-java11-ubuntu/Dockerfile @@ -34,7 +34,8 @@ RUN set -ex && \ ln -sv /bin/bash /bin/sh && \ echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \ chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \ -rm -rf /var/cache/apt/* +rm -rf /var/cache/apt/* && \ +rm -rf /var/lib/apt/lists/* # Install Apache Spark # https://downloads.apache.org/spark/KEYS diff --git a/Dockerfile.template b/Dockerfile.template index 2001281..a031b16 100644 --- a/Dockerfile.template +++ b/Dockerfile.template @@ -44,7 +44,8 @@ RUN set -ex && \ ln -sv
[spark-docker] branch master updated: [SPARK-40832][DOCS] Add README for spark-docker
This is an automated email from the ASF dual-hosted git repository. yikun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark-docker.git The following commit(s) were added to refs/heads/master by this push: new c1353a3 [SPARK-40832][DOCS] Add README for spark-docker c1353a3 is described below commit c1353a377176d9f2a84641323840130bd160e436 Author: Yikun Jiang AuthorDate: Wed Oct 19 10:16:41 2022 +0800 [SPARK-40832][DOCS] Add README for spark-docker ### What changes were proposed in this pull request? Add README for spark-docker ### Why are the changes needed? Although the PR of DOI has not been merged yet, but we'd better to briefly explain what this repository does. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Preview manually: https://user-images.githubusercontent.com/1736354/196381318-cb3d72e1-1ba7-479c-82cb-4412dde91179.png;> Closes #13 from Yikun/readme. Authored-by: Yikun Jiang Signed-off-by: Yikun Jiang --- README.md | 18 ++ 1 file changed, 18 insertions(+) diff --git a/README.md b/README.md new file mode 100644 index 000..87286dc --- /dev/null +++ b/README.md @@ -0,0 +1,18 @@ +# Apache Spark Official Dockerfiles + +## What is Apache Spark? + +Spark is a unified analytics engine for large-scale data processing. It provides +high-level APIs in Scala, Java, Python, and R, and an optimized engine that +supports general computation graphs for data analysis. It also supports a +rich set of higher-level tools including Spark SQL for SQL and DataFrames, +pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing, +and Structured Streaming for stream processing. + +https://spark.apache.org/ + +## About this repository + +This repository contains the Dockerfiles used to build the Apache Spark Docker Image. + +See more in [SPARK-40513: SPIP: Support Docker Official Image for Spark](https://issues.apache.org/jira/browse/SPARK-40513). - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40816][CONNECT][PYTHON] Rename LogicalPlan.collect to LogicalPlan.to_proto
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 646d7160bcf [SPARK-40816][CONNECT][PYTHON] Rename LogicalPlan.collect to LogicalPlan.to_proto 646d7160bcf is described below commit 646d7160bcf4f1f89fd3bf632cb49bcd58199f58 Author: Rui Wang AuthorDate: Wed Oct 19 10:13:10 2022 +0900 [SPARK-40816][CONNECT][PYTHON] Rename LogicalPlan.collect to LogicalPlan.to_proto ### What changes were proposed in this pull request? The `collect` method in `class LogicalPlan` is really to generate connect proto plan. It's confusing to use `collect` which overlaps with `collect` in dataframe API that returns materialized data. This PR proposes to rename this method to `to_proto` to match its implementation. ### Why are the changes needed? Improve codebase readability. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38279 from amaliujia/rename_logical_plan_collect2. Authored-by: Rui Wang Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/dataframe.py | 4 ++-- python/pyspark/sql/connect/plan.py | 12 +++- .../sql/tests/connect/test_connect_column_expressions.py | 2 +- python/pyspark/sql/tests/connect/test_connect_plan_only.py | 8 python/pyspark/sql/tests/connect/test_connect_select_ops.py | 4 ++-- 5 files changed, 20 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 31215b4da79..8e34419fc11 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -247,12 +247,12 @@ class DataFrame(object): raise Exception("Cannot collect on empty plan.") if self._session is None: raise Exception("Cannot collect on empty session.") -query = self._plan.collect(self._session) +query = self._plan.to_proto(self._session) return self._session._to_pandas(query) def explain(self) -> str: if self._plan is not None: -query = self._plan.collect(self._session) +query = self._plan.to_proto(self._session) if self._session is None: raise Exception("Cannot analyze without RemoteSparkSession.") return self._session.analyze(query).explain_string diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index da7c5cf5698..9351998c195 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -80,9 +80,19 @@ class LogicalPlan(object): return test_plan == plan -def collect( +def to_proto( self, session: Optional["RemoteSparkSession"] = None, debug: bool = False ) -> proto.Plan: +""" +Generates connect proto plan based on this LogicalPlan. + +Parameters +-- +session : :class:`RemoteSparkSession`, optional. +a session that connects remote spark cluster. +debug: bool +if enabled, the proto plan will be printed. +""" plan = proto.Plan() plan.root.CopyFrom(self.plan(session)) diff --git a/python/pyspark/sql/tests/connect/test_connect_column_expressions.py b/python/pyspark/sql/tests/connect/test_connect_column_expressions.py index 74f5343a9c1..ca2cc216ff2 100644 --- a/python/pyspark/sql/tests/connect/test_connect_column_expressions.py +++ b/python/pyspark/sql/tests/connect/test_connect_column_expressions.py @@ -46,7 +46,7 @@ class SparkConnectColumnExpressionSuite(PlanOnlyTestFixture): def test_column_literals(self): df = c.DataFrame.withPlan(p.Read("table")) lit_df = df.select(fun.lit(10)) -self.assertIsNotNone(lit_df._plan.collect(None)) +self.assertIsNotNone(lit_df._plan.to_proto(None)) self.assertIsNotNone(fun.lit(10).to_plan(None)) plan = fun.lit(10).to_plan(None) diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py b/python/pyspark/sql/tests/connect/test_connect_plan_only.py index 03cedd56de5..c547000bdcf 100644 --- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py +++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py @@ -27,13 +27,13 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture): generation but do not call Spark.""" def test_simple_project(self): -plan = self.connect.readTable(table_name=self.tbl_name)._plan.collect(self.connect) +plan = self.connect.readTable(table_name=self.tbl_name)._plan.to_proto(self.connect) self.assertIsNotNone(plan.root, "Root relation must be set")
[spark] branch master updated (4ad29829bf5 -> 7af39b61ff3)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 4ad29829bf5 [SPARK-40829][SQL] STORED AS serde in CREATE TABLE LIKE view does not work add 7af39b61ff3 [SPARK-40828][CONNECT][PYTHON][TESTING] Drop Python test tables before and after unit tests No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/connect/test_connect_basic.py | 18 ++ python/pyspark/testing/connectutils.py | 3 +-- 2 files changed, 15 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-40829][SQL] STORED AS serde in CREATE TABLE LIKE view does not work
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new db2974bea86 [SPARK-40829][SQL] STORED AS serde in CREATE TABLE LIKE view does not work db2974bea86 is described below commit db2974bea86b9f01bde8e2b1507b639adc8b9660 Author: zhangbutao AuthorDate: Tue Oct 18 14:54:39 2022 -0700 [SPARK-40829][SQL] STORED AS serde in CREATE TABLE LIKE view does not work ### What changes were proposed in this pull request? After [SPARK-29839](https://issues.apache.org/jira/browse/SPARK-29839), we could create a table with specife based a existing view, but the serde of created is always parquet. However, if we use USING syntax ([SPARK-29421](https://issues.apache.org/jira/browse/SPARK-29421)) to create a table with specified serde based a view, we can get the correct serde. ### Why are the changes needed? We should add specified serde for the created table when using `create table like view stored as` syntax. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit Test Closes #38295 from zhangbutao/SPARK-40829. Authored-by: zhangbutao Signed-off-by: Dongjoon Hyun (cherry picked from commit 4ad29829bf53fff26172845312b334008bc4cb68) Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/execution/command/tables.scala| 4 ++-- .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 18 ++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 7b8216eb757..1047a042a50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -94,10 +94,10 @@ case class CreateTableLikeCommand( DataSource.lookupDataSource(provider.get, sparkSession.sessionState.conf) } provider -} else if (sourceTableDesc.tableType == CatalogTableType.VIEW) { - Some(sparkSession.sessionState.conf.defaultDataSourceName) } else if (fileFormat.inputFormat.isDefined) { Some(DDLUtils.HIVE_PROVIDER) +} else if (sourceTableDesc.tableType == CatalogTableType.VIEW) { + Some(sparkSession.sessionState.conf.defaultDataSourceName) } else { sourceTableDesc.provider } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 4926d251bc5..def88431d0c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -2750,6 +2750,24 @@ class HiveDDLSuite } } + test("Create Table LIKE VIEW STORED AS Hive Format") { +val catalog = spark.sessionState.catalog +withView("v") { + sql("CREATE TEMPORARY VIEW v AS SELECT 1 AS A, 1 AS B;") + hiveFormats.foreach { tableType => +val expectedSerde = HiveSerDe.sourceToSerDe(tableType) +withTable("t") { + sql(s"CREATE TABLE t LIKE v STORED AS $tableType") + val table = catalog.getTableMetadata(TableIdentifier("t")) + assert(table.provider == Some("hive")) + assert(table.storage.serde == expectedSerde.get.serde) + assert(table.storage.inputFormat == expectedSerde.get.inputFormat) + assert(table.storage.outputFormat == expectedSerde.get.outputFormat) +} + } +} + } + test("Create Table LIKE with specified TBLPROPERTIES") { val catalog = spark.sessionState.catalog withTable("s", "t") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2acfc1dbca9 -> 4ad29829bf5)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 2acfc1dbca9 [SPARK-40369][CORE][SQL] Migrate the type check failures of calls via reflection onto error classes add 4ad29829bf5 [SPARK-40829][SQL] STORED AS serde in CREATE TABLE LIKE view does not work No new revisions were added by this update. Summary of changes: .../apache/spark/sql/execution/command/tables.scala| 4 ++-- .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 18 ++ 2 files changed, 20 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-40829][SQL] STORED AS serde in CREATE TABLE LIKE view does not work
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 0487e8130d5 [SPARK-40829][SQL] STORED AS serde in CREATE TABLE LIKE view does not work 0487e8130d5 is described below commit 0487e8130d502f62288721c00731511525868976 Author: zhangbutao AuthorDate: Tue Oct 18 14:54:39 2022 -0700 [SPARK-40829][SQL] STORED AS serde in CREATE TABLE LIKE view does not work ### What changes were proposed in this pull request? After [SPARK-29839](https://issues.apache.org/jira/browse/SPARK-29839), we could create a table with specife based a existing view, but the serde of created is always parquet. However, if we use USING syntax ([SPARK-29421](https://issues.apache.org/jira/browse/SPARK-29421)) to create a table with specified serde based a view, we can get the correct serde. ### Why are the changes needed? We should add specified serde for the created table when using `create table like view stored as` syntax. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit Test Closes #38295 from zhangbutao/SPARK-40829. Authored-by: zhangbutao Signed-off-by: Dongjoon Hyun (cherry picked from commit 4ad29829bf53fff26172845312b334008bc4cb68) Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/execution/command/tables.scala| 4 ++-- .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 18 ++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index ac4bb8395a3..60f43ca0ff4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -95,10 +95,10 @@ case class CreateTableLikeCommand( DataSource.lookupDataSource(provider.get, sparkSession.sessionState.conf) } provider -} else if (sourceTableDesc.tableType == CatalogTableType.VIEW) { - Some(sparkSession.sessionState.conf.defaultDataSourceName) } else if (fileFormat.inputFormat.isDefined) { Some(DDLUtils.HIVE_PROVIDER) +} else if (sourceTableDesc.tableType == CatalogTableType.VIEW) { + Some(sparkSession.sessionState.conf.defaultDataSourceName) } else { sourceTableDesc.provider } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index c4cef44b6cc..c3e16e02941 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -2750,6 +2750,24 @@ class HiveDDLSuite } } + test("Create Table LIKE VIEW STORED AS Hive Format") { +val catalog = spark.sessionState.catalog +withView("v") { + sql("CREATE TEMPORARY VIEW v AS SELECT 1 AS A, 1 AS B;") + hiveFormats.foreach { tableType => +val expectedSerde = HiveSerDe.sourceToSerDe(tableType) +withTable("t") { + sql(s"CREATE TABLE t LIKE v STORED AS $tableType") + val table = catalog.getTableMetadata(TableIdentifier("t")) + assert(table.provider == Some("hive")) + assert(table.storage.serde == expectedSerde.get.serde) + assert(table.storage.inputFormat == expectedSerde.get.inputFormat) + assert(table.storage.outputFormat == expectedSerde.get.outputFormat) +} + } +} + } + test("Create Table LIKE with specified TBLPROPERTIES") { val catalog = spark.sessionState.catalog withTable("s", "t") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40369][CORE][SQL] Migrate the type check failures of calls via reflection onto error classes
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2acfc1dbca9 [SPARK-40369][CORE][SQL] Migrate the type check failures of calls via reflection onto error classes 2acfc1dbca9 is described below commit 2acfc1dbca975a2a4a38124fe8ebe464aa1663a9 Author: yangjie01 AuthorDate: Tue Oct 18 20:04:56 2022 +0500 [SPARK-40369][CORE][SQL] Migrate the type check failures of calls via reflection onto error classes ### What changes were proposed in this pull request? This pr replace `TypeCheckFailure` by `DataTypeMismatch` in `CallMethodViaReflection`. ### Why are the changes needed? Migration onto error classes unifies Spark SQL error messages. ### Does this PR introduce _any_ user-facing change? Yes. The PR changes user-facing error messages. ### How was this patch tested? - Pass GitHub Actions Closes #38294 from LuciferYang/SPARK-40369. Authored-by: yangjie01 Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 10 +++ .../expressions/CallMethodViaReflection.scala | 72 +- .../expressions/CallMethodViaReflectionSuite.scala | 53 3 files changed, 105 insertions(+), 30 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 3e97029b154..7f42d8acc53 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -233,6 +233,11 @@ "The lower bound of a window frame must be to the upper bound." ] }, + "UNEXPECTED_CLASS_TYPE" : { +"message" : [ + "class not found" +] + }, "UNEXPECTED_INPUT_TYPE" : { "message" : [ "parameter requires type, however, is of type." @@ -243,6 +248,11 @@ "The must not be null" ] }, + "UNEXPECTED_STATIC_METHOD" : { +"message" : [ + "cannot find a static method that matches the argument types in " +] + }, "UNSPECIFIED_FRAME" : { "message" : [ "Cannot use an UnspecifiedFrame. This should have been converted during analysis." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala index 7cb830d1156..db2053707b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala @@ -21,7 +21,8 @@ import java.lang.reflect.{Method, Modifier} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TypeCheckResult} -import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLExpr, toSQLType} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -61,20 +62,56 @@ case class CallMethodViaReflection(children: Seq[Expression]) override def checkInputDataTypes(): TypeCheckResult = { if (children.size < 2) { - TypeCheckFailure("requires at least two arguments") -} else if (!children.take(2).forall(e => e.dataType == StringType && e.foldable)) { - // The first two arguments must be string type. - TypeCheckFailure("first two arguments should be string literals") -} else if (!classExists) { - TypeCheckFailure(s"class $className not found") -} else if (children.slice(2, children.length) -.exists(e => !CallMethodViaReflection.typeMapping.contains(e.dataType))) { - TypeCheckFailure("arguments from the third require boolean, byte, short, " + -"integer, long, float, double or string expressions") -} else if (method == null) { - TypeCheckFailure(s"cannot find a static method that matches the argument types in $className") + DataTypeMismatch( +errorSubClass = "WRONG_NUM_PARAMS", +messageParameters = Map("actualNum" -> children.length.toString)) } else { - TypeCheckSuccess + val unexpectedParameter = children.zipWithIndex.collectFirst { +case (e, 0) if !(e.dataType == StringType && e.foldable) => + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" -> "class", +
[spark] branch master updated (11bc5ce19ef -> 480ca17dd36)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 11bc5ce19ef [SPARK-40368][SQL] Migrate Bloom Filter type check failures onto error classes add 480ca17dd36 [SPARK-40615][SQL] Check unsupported data types when decorrelating subqueries No new revisions were added by this update. Summary of changes: core/src/main/resources/error/error-classes.json | 5 + .../catalyst/optimizer/DecorrelateInnerQuery.scala | 18 +-- .../spark/sql/errors/QueryCompilationErrors.scala | 11 + .../scala/org/apache/spark/sql/SubquerySuite.scala | 26 ++ 4 files changed, 58 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40368][SQL] Migrate Bloom Filter type check failures onto error classes
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 11bc5ce19ef [SPARK-40368][SQL] Migrate Bloom Filter type check failures onto error classes 11bc5ce19ef is described below commit 11bc5ce19ef5db9e4c9c6925a31a85070a0c0495 Author: lvshaokang AuthorDate: Tue Oct 18 18:19:12 2022 +0500 [SPARK-40368][SQL] Migrate Bloom Filter type check failures onto error classes ### What changes were proposed in this pull request? In the PR, I propose to use error classes in the case of type check failure in Bloom Filter expressions. ### Why are the changes needed? Migration onto error classes unifies Spark SQL error messages. ### Does this PR introduce _any_ user-facing change? Yes. The PR changes user-facing error messages. ### How was this patch tested? ``` build/sbt "sql/testOnly *SQLQueryTestSuite" build/sbt "test:testOnly org.apache.spark.SparkThrowableSuite" build/sbt "test:testOnly *BloomFilterAggregateQuerySuite" ``` Closes #38251 from lvshaokang/SPARK-40368. Authored-by: lvshaokang Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 10 +++ .../expressions/BloomFilterMightContain.scala | 25 ++-- .../spark/sql/BloomFilterAggregateQuerySuite.scala | 72 +++--- 3 files changed, 92 insertions(+), 15 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 1474d800f72..3ffbedff4c9 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -101,6 +101,16 @@ "the binary operator requires the input type , not ." ] }, + "BLOOM_FILTER_BINARY_OP_WRONG_TYPE" : { +"message" : [ + "The Bloom filter binary input to should be either a constant value or a scalar subquery expression, but it's ." +] + }, + "BLOOM_FILTER_WRONG_TYPE" : { +"message" : [ + "Input to function should have been followed by a value with , but it's [, ]." +] + }, "CANNOT_CONVERT_TO_JSON" : { "message" : [ "Unable to convert column of type to JSON." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala index 24d1dd69d9d..5cb19d36b80 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala @@ -21,6 +21,8 @@ import java.io.ByteArrayInputStream import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch +import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLExpr, toSQLId, toSQLType} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, JavaCode, TrueLiteral} import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE @@ -59,12 +61,25 @@ case class BloomFilterMightContain( if !subquery.containsPattern(OUTER_REFERENCE) => TypeCheckResult.TypeCheckSuccess case _ => -TypeCheckResult.TypeCheckFailure(s"The Bloom filter binary input to $prettyName " + - "should be either a constant value or a scalar subquery expression") +DataTypeMismatch( + errorSubClass = "BLOOM_FILTER_BINARY_OP_WRONG_TYPE", + messageParameters = Map( +"functionName" -> toSQLId(prettyName), +"actual" -> toSQLExpr(bloomFilterExpression) + ) +) } - case _ => TypeCheckResult.TypeCheckFailure(s"Input to function $prettyName should have " + -s"been ${BinaryType.simpleString} followed by a value with ${LongType.simpleString}, " + -s"but it's [${left.dataType.catalogString}, ${right.dataType.catalogString}].") + case _ => +DataTypeMismatch( + errorSubClass = "BLOOM_FILTER_WRONG_TYPE", + messageParameters = Map( +"functionName" -> toSQLId(prettyName), +"expectedLeft" -> toSQLType(BinaryType), +"expectedRight" -> toSQLType(LongType), +"actualLeft" -> toSQLType(left.dataType), +"actualRight" -> toSQLType(right.dataType) + ) +) } } diff --git
[spark] branch master updated (a9da92498f0 -> 740d52bf9e9)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from a9da92498f0 [SPARK-40538][CONNECT] Improve built-in function support for Python client add 740d52bf9e9 [MINOR][CORE][SQL] Add missing s prefix to enable string interpolation No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala| 2 +- .../sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala| 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40538][CONNECT] Improve built-in function support for Python client
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a9da92498f0 [SPARK-40538][CONNECT] Improve built-in function support for Python client a9da92498f0 is described below commit a9da92498f0968eab21590845abbf1987ee9f1cd Author: Martin Grund AuthorDate: Tue Oct 18 20:08:36 2022 +0900 [SPARK-40538][CONNECT] Improve built-in function support for Python client ### What changes were proposed in this pull request? This patch changes the way simple scalar built-in functions are resolved in the Python Spark Connect client. Previously, it was trying to manually load specific functions. With the changes in this patch, the trivial binary operators like `<`, `+`, ... are mapped to their name equivalents in Spark so that the dynamic function lookup works. In addition, it cleans up the Scala planner side to remove the now unnecessary code translating the trivial binary expressions into their equivalent functions. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT, E2E Closes #38270 from grundprinzip/spark-40538. Authored-by: Martin Grund Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/sql/connect/dsl/package.scala | 38 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 25 ++- .../connect/planner/SparkConnectPlannerSuite.scala | 2 +- .../connect/planner/SparkConnectProtoSuite.scala | 28 python/pyspark/sql/connect/column.py | 80 +++--- .../sql/tests/connect/test_connect_basic.py| 12 .../connect/test_connect_column_expressions.py | 29 .../sql/tests/connect/test_connect_plan_only.py| 2 +- 8 files changed, 156 insertions(+), 60 deletions(-) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 579f190156f..0c392130562 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -92,6 +92,44 @@ package object dsl { .build() } +/** + * Create an unresolved function from name parts. + * + * @param nameParts + * @param args + * @return + * Expression wrapping the unresolved function. + */ +def callFunction(nameParts: Seq[String], args: Seq[proto.Expression]): proto.Expression = { + proto.Expression +.newBuilder() +.setUnresolvedFunction( + proto.Expression.UnresolvedFunction +.newBuilder() +.addAllParts(nameParts.asJava) +.addAllArguments(args.asJava)) +.build() +} + +/** + * Creates an UnresolvedFunction from a single identifier. + * + * @param name + * @param args + * @return + * Expression wrapping the unresolved function. + */ +def callFunction(name: String, args: Seq[proto.Expression]): proto.Expression = { + proto.Expression +.newBuilder() +.setUnresolvedFunction( + proto.Expression.UnresolvedFunction +.newBuilder() +.addParts(name) +.addAllArguments(args.asJava)) +.build() +} + implicit def intToLiteral(i: Int): proto.Expression = proto.Expression .newBuilder() diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 61352c17a23..7ffce908221 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -197,10 +197,6 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { limitExpr = expressions.Literal(limit.getLimit, IntegerType)) } - private def lookupFunction(name: String, args: Seq[Expression]): Expression = { -UnresolvedFunction(Seq(name), args, isDistinct = false) - } - /** * Translates a scalar function from proto to the Catalyst expression. * @@ -211,21 +207,14 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { * @return */ private def transformScalarFunction(fun: proto.Expression.UnresolvedFunction): Expression = { -val funName = fun.getPartsList.asScala.mkString(".") -funName match { - case "gt" => -assert(fun.getArgumentsCount == 2, "`gt` function must have two arguments.") -expressions.GreaterThan( -
[spark] branch master updated: [SPARK-39783][SQL] Quote qualifiedName to fix backticks for column candidates in error messages
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fc4643be31a [SPARK-39783][SQL] Quote qualifiedName to fix backticks for column candidates in error messages fc4643be31a is described below commit fc4643be31a463cc1096d37f71548f39e99ace32 Author: Enrico Minack AuthorDate: Tue Oct 18 15:04:55 2022 +0500 [SPARK-39783][SQL] Quote qualifiedName to fix backticks for column candidates in error messages ### What changes were proposed in this pull request? The `NamedExpression.qualifiedName` is a concatenation of qualifiers and the name, joined by `dots`. If those contain `dots`, the result `qualifiedName` is ambiguous. Quoting those if they contain `dots` fixes this, while this also fixes quoting column candidates in the error messages `UNRESOLVED_COLUMN` and `UNRESOLVED_MAP_KEY`: `UNRESOLVED_COLUMN`: ``` Seq((0)).toDF("the.id").select("the.id").show() ``` The error message should read org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN] A column or function parameter with name `the`.`id` cannot be resolved. Did you mean one of the following? [`the.id`]; while it was: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN] A column or function parameter with name `the`.`id` cannot be resolved. Did you mean one of the following? [`the`.`id`]; `UNRESOLVED_MAP_KEY`: ``` Seq((0)).toDF("id") .select(map(lit("key"), lit(1)).as("map"), lit(2).as("other.column")) .select($"`map`"($"nonexisting")).show() ``` The error message should read Cannot resolve column `nonexisting` as a map key. If the key is a string literal, please add single quotes around it. Otherwise did you mean one of the following column(s)? [`map`, `other.column`]; while it was: Cannot resolve column `nonexisting` as a map key. If the key is a string literal, please add single quotes around it. Otherwise did you mean one of the following column(s)? [`map`, `other`.`column`]; ### Why are the changes needed? The current quoting is wrong and `qualifiedName` is ambiguous if `name` or `qualifiers` contain `dots`. ### Does this PR introduce _any_ user-facing change? It corrects the error message. ### How was this patch tested? This is tested in `AnalysisErrorSuite`, `DatasetSuite` and `QueryCompilationErrorsSuite.scala`. Closes #38256 from EnricoMi/branch-correct-backticks-error-message. Authored-by: Enrico Minack Signed-off-by: Max Gekk --- .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../catalyst/expressions/namedExpressions.scala| 8 ++-- .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 20 ++ .../sql/catalyst/analysis/TestRelations.scala | 2 + .../scala/org/apache/spark/sql/DatasetSuite.scala | 45 ++ .../org/apache/spark/sql/DatasetUnpivotSuite.scala | 3 +- .../sql/errors/QueryCompilationErrorsSuite.scala | 16 7 files changed, 89 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 441e696bfb8..b185b38797b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3479,7 +3479,7 @@ class Analyzer(override val catalogManager: CatalogManager) i.userSpecifiedCols.map { col => i.table.resolve(Seq(col), resolver).getOrElse { - val candidates = i.table.output.map(_.name) + val candidates = i.table.output.map(_.qualifiedName) val orderedCandidates = StringUtils.orderStringsBySimilarity(col, candidates) throw QueryCompilationErrors .unresolvedAttributeError("UNRESOLVED_COLUMN", col, orderedCandidates, i.origin) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 4181edcb8c6..99e5f411bdb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -71,11 +71,11 @@ trait NamedExpression extends Expression { def exprId: ExprId /** - * Returns a dot separated fully qualified name for this attribute. Given that there can be - * multiple qualifiers, it is possible that there are other possible way to refer to this - * attribute. + *