[spark] branch master updated: [SPARK-44078][CONNECT][CORE] Add support for classloader/resource isolation
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9d031ba8c99 [SPARK-44078][CONNECT][CORE] Add support for classloader/resource isolation 9d031ba8c99 is described below commit 9d031ba8c995286e5f8892764e5108aa60f49238 Author: vicennial AuthorDate: Wed Jun 21 20:58:19 2023 -0400 [SPARK-44078][CONNECT][CORE] Add support for classloader/resource isolation ### What changes were proposed in this pull request? This PR adds a `JobArtifactSet` which holds the jars/files/archives relevant to a particular Spark Job. Using this "set", we are able to support specifying visible/available resources for a job based on, for example, the SparkSession that the job belongs to. With resource specification support, we are further able to extend this to support classloader/resource isolation on the executors. The executors would use the `uuid` from the `JobArtifactSet` to either create or obtain from a cache the [IsolatedSessionState](https://github.com/apache/spark/pull/41625/files#diff-d7a989c491f3cb77cca02c701496a9e2a3443f70af73b0d1ab0899239f3a789dR57) which holds the "state" (i.e classloaders, files, jars, archives etc) for that particular `uuid`. Currently, the code will default to copying over resources from the `SparkContext` (the current/default behaviour) to avoid any behaviour changes. A follow-up PR would use this mechanism in Spark Connect to isolate resources among Spark Connect sessions. ### Why are the changes needed? A current limitation of Scala UDFs is that a Spark cluster would only be able to support a single REPL at a time due to the fact that classloaders of different Spark Sessions (and therefore, Spark Connect sessions) aren't isolated from each other. Without isolation, REPL-generated class files and user-added JARs may conflict if there are multiple users of the cluster. Thus, we need a mechanism to support isolated sessions (i.e isolated resources/classloader) so that each REPL user does not conflict with other users on the same cluster. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests + new suite `JobArtifactSetSuite`. Closes #41625 from vicennial/SPARK-44078. Authored-by: vicennial Signed-off-by: Herman van Hovell --- .../scala/org/apache/spark/JobArtifactSet.scala| 123 + .../scala/org/apache/spark/executor/Executor.scala | 120 +--- .../org/apache/spark/scheduler/ActiveJob.scala | 3 + .../org/apache/spark/scheduler/DAGScheduler.scala | 37 --- .../apache/spark/scheduler/DAGSchedulerEvent.scala | 2 + .../org/apache/spark/scheduler/ResultTask.scala| 5 +- .../apache/spark/scheduler/ShuffleMapTask.scala| 9 +- .../scala/org/apache/spark/scheduler/Task.scala| 2 + .../apache/spark/scheduler/TaskDescription.scala | 61 ++ .../apache/spark/scheduler/TaskSetManager.scala| 9 +- .../org/apache/spark/JobArtifactSetSuite.scala | 87 +++ .../CoarseGrainedExecutorBackendSuite.scala| 7 +- .../org/apache/spark/executor/ExecutorSuite.scala | 21 ++-- .../CoarseGrainedSchedulerBackendSuite.scala | 12 +- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 9 +- .../org/apache/spark/scheduler/FakeTask.scala | 8 +- .../spark/scheduler/NotSerializableFakeTask.scala | 4 +- .../apache/spark/scheduler/TaskContextSuite.scala | 26 +++-- .../spark/scheduler/TaskDescriptionSuite.scala | 18 +-- .../spark/scheduler/TaskSchedulerImplSuite.scala | 4 +- .../spark/scheduler/TaskSetManagerSuite.scala | 12 +- .../MesosFineGrainedSchedulerBackendSuite.scala| 10 +- 22 files changed, 436 insertions(+), 153 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/JobArtifactSet.scala b/core/src/main/scala/org/apache/spark/JobArtifactSet.scala new file mode 100644 index 000..d87c25c0b7c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/JobArtifactSet.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
[spark] branch master updated: [SPARK-44136][SS] Fixed an issue that StateManager may get materialized in executor instead of driver in FlatMapGroupsWithStateExec
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fb4d6d9db27 [SPARK-44136][SS] Fixed an issue that StateManager may get materialized in executor instead of driver in FlatMapGroupsWithStateExec fb4d6d9db27 is described below commit fb4d6d9db27d0e9642de33d5b3f9915b334ee02c Author: bogao007 AuthorDate: Thu Jun 22 09:26:32 2023 +0900 [SPARK-44136][SS] Fixed an issue that StateManager may get materialized in executor instead of driver in FlatMapGroupsWithStateExec ### What changes were proposed in this pull request? Fixed an issue that StateManager may get materialized in executor instead of driver in FlatMapGroupsWithStateExec. The ticket that brought this issue: https://issues.apache.org/jira/browse/SPARK-40411 The basic idea is to maintain the `stateManager` as `lazy val` but initialize it earlier in the `doExecute()` to force a lazy init at driver. ### Why are the changes needed? Because without this change, the StateManager in FlatMapGroupsWithStateExec may get materialized in executor instead of driver which would cause unexpected behavior. ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? It's hard to write a unit test for this since it involves in both driver and executor which is hard to simulate through a unit test. Closes #41693 from bogao007/SPARK-44136. Authored-by: bogao007 Signed-off-by: Hyukjin Kwon --- .../spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index d30b9ad116f..3c3d55e6208 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -188,6 +188,7 @@ trait FlatMapGroupsWithStateExecBase } override protected def doExecute(): RDD[InternalRow] = { +stateManager // force lazy init at driver metrics // force lazy init at driver // Throw errors early if parameters are not as expected - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-44136][SS] Fixed an issue that StateManager may get materialized in executor instead of driver in FlatMapGroupsWithStateExec
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new e4476c60782 [SPARK-44136][SS] Fixed an issue that StateManager may get materialized in executor instead of driver in FlatMapGroupsWithStateExec e4476c60782 is described below commit e4476c60782b153cc9639497e6653f51c39401b1 Author: bogao007 AuthorDate: Thu Jun 22 09:26:32 2023 +0900 [SPARK-44136][SS] Fixed an issue that StateManager may get materialized in executor instead of driver in FlatMapGroupsWithStateExec ### What changes were proposed in this pull request? Fixed an issue that StateManager may get materialized in executor instead of driver in FlatMapGroupsWithStateExec. The ticket that brought this issue: https://issues.apache.org/jira/browse/SPARK-40411 The basic idea is to maintain the `stateManager` as `lazy val` but initialize it earlier in the `doExecute()` to force a lazy init at driver. ### Why are the changes needed? Because without this change, the StateManager in FlatMapGroupsWithStateExec may get materialized in executor instead of driver which would cause unexpected behavior. ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? It's hard to write a unit test for this since it involves in both driver and executor which is hard to simulate through a unit test. Closes #41693 from bogao007/SPARK-44136. Authored-by: bogao007 Signed-off-by: Hyukjin Kwon (cherry picked from commit fb4d6d9db27d0e9642de33d5b3f9915b334ee02c) Signed-off-by: Hyukjin Kwon --- .../spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index 760681e81c9..783226a8060 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -183,6 +183,7 @@ trait FlatMapGroupsWithStateExecBase } override protected def doExecute(): RDD[InternalRow] = { +stateManager // force lazy init at driver metrics // force lazy init at driver // Throw errors early if parameters are not as expected - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43710][PS][CONNECT] Support `functions.date_part` for Spark Connect
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bc20e85b0e1 [SPARK-43710][PS][CONNECT] Support `functions.date_part` for Spark Connect bc20e85b0e1 is described below commit bc20e85b0e1da510cc091dbd03f210ef9fc56b25 Author: Ruifeng Zheng AuthorDate: Thu Jun 22 08:47:27 2023 +0900 [SPARK-43710][PS][CONNECT] Support `functions.date_part` for Spark Connect ### What changes were proposed in this pull request? switch to the [newly added `date_part` function](https://github.com/apache/spark/commit/8dc02863b926b9e0780b994f9ee6c5c1058d49a0) ### Why are the changes needed? to support connect ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? existing UT Closes #41691 from zhengruifeng/ps_date_part. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/indexes/timedelta.py | 6 +++--- python/pyspark/pandas/spark/functions.py | 32 ++ 2 files changed, 5 insertions(+), 33 deletions(-) diff --git a/python/pyspark/pandas/indexes/timedelta.py b/python/pyspark/pandas/indexes/timedelta.py index 564c484d968..e46d602e985 100644 --- a/python/pyspark/pandas/indexes/timedelta.py +++ b/python/pyspark/pandas/indexes/timedelta.py @@ -150,9 +150,9 @@ class TimedeltaIndex(Index): @no_type_check def get_seconds(scol): -hour_scol = SF.date_part("HOUR", scol) -minute_scol = SF.date_part("MINUTE", scol) -second_scol = SF.date_part("SECOND", scol) +hour_scol = F.date_part("HOUR", scol) +minute_scol = F.date_part("MINUTE", scol) +second_scol = F.date_part("SECOND", scol) return ( F.when( hour_scol < 0, diff --git a/python/pyspark/pandas/spark/functions.py b/python/pyspark/pandas/spark/functions.py index a904071aee7..b33705263c7 100644 --- a/python/pyspark/pandas/spark/functions.py +++ b/python/pyspark/pandas/spark/functions.py @@ -17,15 +17,11 @@ """ Additional Spark functions used in pandas-on-Spark. """ -from typing import Union, no_type_check +from typing import Union from pyspark import SparkContext import pyspark.sql.functions as F -from pyspark.sql.column import ( -Column, -_to_java_column, -_create_column_from_literal, -) +from pyspark.sql.column import Column # For supporting Spark Connect from pyspark.sql.utils import is_remote @@ -145,27 +141,3 @@ def repeat(col: Column, n: Union[int, Column]) -> Column: """ _n = F.lit(n) if isinstance(n, int) else n return F.call_udf("repeat", col, _n) - - -def date_part(field: Union[str, Column], source: Column) -> Column: -""" -Extracts a part of the date/timestamp or interval source. -""" -sc = SparkContext._active_spark_context -field = ( -_to_java_column(field) if isinstance(field, Column) else _create_column_from_literal(field) -) -return _call_udf(sc, "date_part", field, _to_java_column(source)) - - -@no_type_check -def _call_udf(sc, name, *cols): -return Column(sc._jvm.functions.callUDF(name, _make_arguments(sc, *cols))) - - -@no_type_check -def _make_arguments(sc, *cols): -java_arr = sc._gateway.new_array(sc._jvm.Column, len(cols)) -for i, col in enumerate(cols): -java_arr[i] = col -return java_arr - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44133][PYTHON] Upgrade MyPy from 0.920 to 0.982
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 10751dc285c [SPARK-44133][PYTHON] Upgrade MyPy from 0.920 to 0.982 10751dc285c is described below commit 10751dc285c5c639e3343a8abc26857407522822 Author: Hyukjin Kwon AuthorDate: Wed Jun 21 12:56:57 2023 -0700 [SPARK-44133][PYTHON] Upgrade MyPy from 0.920 to 0.982 ### What changes were proposed in this pull request? This PR upgrade MyPy version from 0.920 to 0.982. ### Why are the changes needed? To detect type related changes better by static analysys. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? ```bash ./dev/linter-python ``` Closes #41690 from HyukjinKwon/SPARK-44133. Authored-by: Hyukjin Kwon Signed-off-by: Dongjoon Hyun --- .github/workflows/build_and_test.yml | 2 +- dev/requirements.txt | 2 +- python/pyspark/ml/base.py | 2 +- python/pyspark/ml/classification.py| 84 +-- python/pyspark/ml/clustering.py| 36 python/pyspark/ml/connect/base.py | 2 +- python/pyspark/ml/connect/classification.py| 2 +- python/pyspark/ml/feature.py | 44 +- python/pyspark/ml/fpm.py | 4 +- python/pyspark/ml/recommendation.py| 6 +- python/pyspark/ml/regression.py| 96 +++--- .../pyspark/ml/tests/typing/test_clustering.yaml | 6 +- python/pyspark/ml/tests/typing/test_evaluation.yml | 6 +- python/pyspark/ml/torch/distributor.py | 6 +- python/pyspark/ml/tree.py | 16 ++-- python/pyspark/ml/tuning.py| 2 +- python/pyspark/ml/util.py | 4 +- python/pyspark/ml/wrapper.py | 4 +- python/pyspark/mllib/classification.py | 6 +- python/pyspark/mllib/clustering.py | 18 ++-- python/pyspark/mllib/evaluation.py | 38 - python/pyspark/mllib/feature.py| 8 +- python/pyspark/mllib/linalg/__init__.py| 4 +- python/pyspark/mllib/linalg/distributed.py | 6 +- python/pyspark/mllib/recommendation.py | 2 +- python/pyspark/mllib/regression.py | 4 +- python/pyspark/sql/observation.py | 2 +- python/pyspark/sql/tests/typing/test_dataframe.yml | 4 +- python/pyspark/sql/tests/typing/test_functions.yml | 32 python/pyspark/sql/tests/typing/test_session.yml | 7 +- python/pyspark/sql/types.py| 2 +- python/pyspark/streaming/context.py| 4 +- python/pyspark/tests/typing/test_rdd.yml | 4 +- 33 files changed, 235 insertions(+), 230 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index a03aa53dc88..47732a5c9f6 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -624,7 +624,7 @@ jobs: # See also https://github.com/sphinx-doc/sphinx/issues/7551. # Jinja2 3.0.0+ causes error when building with Sphinx. # See also https://issues.apache.org/jira/browse/SPARK-35375. -python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.920' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc 'jinja2<3.0.0' 'black==22.6.0' +python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.982' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc 'jinja2<3.0.0' 'black==22.6.0' python3.9 -m pip install 'pandas-stubs==1.2.0.53' ipython 'grpcio==1.48.1' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0' - name: Python linter run: PYTHON_EXECUTABLE=python3.9 ./dev/lint-python diff --git a/dev/requirements.txt b/dev/requirements.txt index 1af7256e0b3..72da5dbe163 100644 --- a/dev/requirements.txt +++ b/dev/requirements.txt @@ -20,7 +20,7 @@ openpyxl coverage # Linter -mypy==0.920 +mypy==0.982 pytest-mypy-plugins==1.9.3 flake8==3.9.0 # See SPARK-38680. diff --git a/python/pyspark/ml/base.py b/python/pyspark/ml/base.py index 34c3aa9c62c..b94358d26fd 100644 --- a/python/pyspark/ml/base.py +++ b/python/pyspark/ml/base.py @@ -396,7 +396,7 @@ class PredictionModel(Model, _PredictorParams, Generic[T], metaclass=ABCMeta): """ return self._set(predictionCol=value) -@property # type: ignore[misc] +@property @abstractmethod @since("2.1.0") def numFeatures(self) -> int: diff --git a/python/pyspark/ml/classification.py
[spark] branch master updated: [SPARK-43915][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2438-2445]
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bbcc438e5b3 [SPARK-43915][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2438-2445] bbcc438e5b3 is described below commit bbcc438e5b3aef67bf430b6bb6e4f893d8e66d13 Author: Jiaan Geng AuthorDate: Wed Jun 21 21:20:01 2023 +0300 [SPARK-43915][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2438-2445] ### What changes were proposed in this pull request? The pr aims to assign names to the error class _LEGACY_ERROR_TEMP_[2438-2445]. ### Why are the changes needed? Improve the error framework. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? Exists test cases updated. Closes #41553 from beliefer/SPARK-43915. Authored-by: Jiaan Geng Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 47 +- python/pyspark/sql/tests/test_udtf.py | 8 +++- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 23 +-- .../sql/catalyst/analysis/AnalysisSuite.scala | 28 - .../analyzer-results/group-analytics.sql.out | 2 +- .../analyzer-results/join-lateral.sql.out | 4 +- .../udf/udf-group-analytics.sql.out| 2 +- .../sql-tests/results/group-analytics.sql.out | 2 +- .../sql-tests/results/join-lateral.sql.out | 4 +- .../results/udf/udf-group-analytics.sql.out| 2 +- .../spark/sql/DataFrameSetOperationsSuite.scala| 44 ++-- .../sql/connector/DataSourceV2FunctionSuite.scala | 13 +- .../sql/connector/DeleteFromTableSuiteBase.scala | 15 +-- .../connector/DeltaBasedDeleteFromTableSuite.scala | 20 + .../sql/connector/DeltaBasedUpdateTableSuite.scala | 21 ++ .../connector/GroupBasedDeleteFromTableSuite.scala | 22 +- .../sql/connector/GroupBasedUpdateTableSuite.scala | 23 ++- .../spark/sql/connector/UpdateTableSuiteBase.scala | 15 +-- 19 files changed, 195 insertions(+), 104 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 1d2f25b72f3..264d9b7c3a0 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -643,6 +643,11 @@ ], "sqlState" : "23505" }, + "DUPLICATED_METRICS_NAME" : { +"message" : [ + "The metric name is not unique: . The same name cannot be used for metrics with different results. However multiple instances of metrics with with same result and name are allowed (e.g. self-joins)." +] + }, "DUPLICATE_CLAUSES" : { "message" : [ "Found duplicate clauses: . Please, remove one of them." @@ -1237,6 +1242,11 @@ } } }, + "INVALID_NON_DETERMINISTIC_EXPRESSIONS" : { +"message" : [ + "The operator expects a deterministic expression, but the actual expression is ." +] + }, "INVALID_NUMERIC_LITERAL_RANGE" : { "message" : [ "Numeric literal is outside the valid range for with minimum value of and maximum value of . Please adjust the value accordingly." @@ -1512,6 +1522,11 @@ ], "sqlState" : "42604" }, + "INVALID_UDF_IMPLEMENTATION" : { +"message" : [ + "Function does not implement ScalarFunction or AggregateFunction." +] + }, "INVALID_URL" : { "message" : [ "The url is invalid: . If necessary set to \"false\" to bypass this error." @@ -2458,6 +2473,11 @@ " is a reserved namespace property, ." ] }, + "SET_OPERATION_ON_MAP_TYPE" : { +"message" : [ + "Cannot have MAP type columns in DataFrame which calls set operations (INTERSECT, EXCEPT, etc.), but the type of column is ." +] + }, "SET_PROPERTIES_AND_DBPROPERTIES" : { "message" : [ "set PROPERTIES and DBPROPERTIES at the same time." @@ -5659,33 +5679,6 @@ "Conflicting attributes: ." ] }, - "_LEGACY_ERROR_TEMP_2438" : { -"message" : [ - "Cannot have map type columns in DataFrame which calls set operations(intersect, except, etc.), but the type of column is ." -] - }, - "_LEGACY_ERROR_TEMP_2439" : { -"message" : [ - "nondeterministic expressions are only allowed in Project, Filter, Aggregate or Window, found:", - "", - "in operator ." -] - }, - "_LEGACY_ERROR_TEMP_2443" : { -"message" : [ - "Multiple definitions of observed metrics named '': ." -] - }, - "_LEGACY_ERROR_TEMP_2444" : { -"message" : [ - "Function '' does not implement ScalarFunction or AggregateFunction." -
[spark] branch master updated: [SPARK-43205][SQL][FOLLOWUP] remove unnecessary abstraction for `withIdentClause`
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a4fb7cceb44 [SPARK-43205][SQL][FOLLOWUP] remove unnecessary abstraction for `withIdentClause` a4fb7cceb44 is described below commit a4fb7cceb441ddd30ce6613a27ba9b62402911fd Author: Wenchen Fan AuthorDate: Wed Jun 21 10:32:36 2023 -0700 [SPARK-43205][SQL][FOLLOWUP] remove unnecessary abstraction for `withIdentClause` ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41385 . This PR adds `withFuncIndentClause` for function identifiers, so that the related methods can be simpler. ### Why are the changes needed? code cleanup ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #41631 from cloud-fan/followup. Authored-by: Wenchen Fan Signed-off-by: Gengliang Wang --- .../spark/sql/catalyst/parser/AstBuilder.scala | 70 -- .../analyzer-results/identifier-clause.sql.out | 32 +- .../sql-tests/inputs/identifier-clause.sql | 14 ++--- .../sql-tests/results/identifier-clause.sql.out| 14 ++--- 4 files changed, 70 insertions(+), 60 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index abfe64f72e7..07721424a86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -66,31 +66,44 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit protected def withIdentClause( ctx: IdentifierReferenceContext, builder: Seq[String] => LogicalPlan): LogicalPlan = { -withIdentClause( - ctx.expression, - () => visitMultipartIdentifier(ctx.multipartIdentifier), - builder) +val exprCtx = ctx.expression +if (exprCtx != null) { + PlanWithUnresolvedIdentifier(withOrigin(exprCtx) { expression(exprCtx) }, builder) +} else { + builder.apply(visitMultipartIdentifier(ctx.multipartIdentifier)) +} } protected def withIdentClause( - ctx: ExpressionContext, - getIdent: () => Seq[String], + ctx: IdentifierReferenceContext, + builder: Seq[String] => Expression): Expression = { +val exprCtx = ctx.expression +if (exprCtx != null) { + ExpressionWithUnresolvedIdentifier(withOrigin(exprCtx) { expression(exprCtx) }, builder) +} else { + builder.apply(visitMultipartIdentifier(ctx.multipartIdentifier)) +} + } + + protected def withFuncIdentClause( + ctx: FunctionNameContext, builder: Seq[String] => LogicalPlan): LogicalPlan = { -if (ctx != null) { - PlanWithUnresolvedIdentifier(withOrigin(ctx) { expression(ctx) }, builder) +val exprCtx = ctx.expression +if (exprCtx != null) { + PlanWithUnresolvedIdentifier(withOrigin(exprCtx) { expression(exprCtx) }, builder) } else { - builder.apply(getIdent()) + builder.apply(getFunctionMultiparts(ctx)) } } - protected def withIdentClause( - ctx: ExpressionContext, - getIdent: () => Seq[String], + protected def withFuncIdentClause( + ctx: FunctionNameContext, builder: Seq[String] => Expression): Expression = { -if (ctx != null) { - ExpressionWithUnresolvedIdentifier(withOrigin(ctx) { expression(ctx) }, builder) +val exprCtx = ctx.expression +if (exprCtx != null) { + ExpressionWithUnresolvedIdentifier(withOrigin(exprCtx) { expression(exprCtx) }, builder) } else { - builder.apply(getIdent()) + builder.apply(getFunctionMultiparts(ctx)) } } @@ -1538,21 +1551,17 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit Seq.empty } -withIdentClause( - func.functionName.expression, - () => getFunctionMultiparts(func.functionName), - name => { -if (name.length > 1) { - throw QueryParsingErrors.invalidTableValuedFunctionNameError(name, ctx) -} +withFuncIdentClause(func.functionName, ident => { + if (ident.length > 1) { +throw QueryParsingErrors.invalidTableValuedFunctionNameError(ident, ctx) + } -val tvf = UnresolvedTableValuedFunction(name, func.expression.asScala.map(expression).toSeq) + val tvf = UnresolvedTableValuedFunction(ident, func.expression.asScala.map(expression).toSeq) -val tvfAliases = if (aliases.nonEmpty) UnresolvedTVFAliases(name, tvf, aliases) else tvf + val tvfAliases = if (aliases.nonEmpty)
[spark] branch master updated: [SPARK-44056][SQL] Include UDF name in UDF execution failure error message when available
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6165f316063 [SPARK-44056][SQL] Include UDF name in UDF execution failure error message when available 6165f316063 is described below commit 6165f31606344efdf35f060d07cee46b85948e38 Author: Rob Reeves AuthorDate: Wed Jun 21 18:00:36 2023 +0300 [SPARK-44056][SQL] Include UDF name in UDF execution failure error message when available ### What changes were proposed in this pull request? This modifies the error message when a Scala UDF fails to execute by including the UDF name if it is available. ### Why are the changes needed? If there are multiple UDFs defined in the same location with the same method signature it can be hard to identify which UDF causes the issue. The current function class alone does not give enough information on its own. Adding the UDF name, if available, makes it easier to identify the exact problematic UDF. This is particularly helpful when the exception stack trace is not emitted due to a JVM performance optimization and codegen is enabled. Example in 3.1.1: ``` Caused by: org.apache.spark.SparkException: Failed to execute user defined function(UDFRegistration$$Lambda$666/1969461119: (bigint, string) => string) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.subExpr_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:249) at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:248) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:131) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:523) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1535) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:526) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException ``` ### Does this PR introduce _any_ user-facing change? Yes, it adds the UDF name to the UDF failure error message. Before this change: > [FAILED_EXECUTE_UDF] Failed to execute user defined function (QueryExecutionErrorsSuite$$Lambda$970/181260145: (string, int) => string). After this change: > [FAILED_EXECUTE_UDF] Failed to execute user defined function (nextChar in QueryExecutionErrorsSuite$$Lambda$970/181260145: (string, int) => string). ### How was this patch tested? Unit test added. Closes #41599 from robreeves/roreeves/roreeves/udf_error. Lead-authored-by: Rob Reeves Co-authored-by: Rob Reeves Signed-off-by: Max Gekk --- .../spark/sql/catalyst/expressions/ScalaUDF.scala | 6 ++-- .../spark/sql/errors/QueryExecutionErrors.scala| 4 +-- .../sql/errors/QueryExecutionErrorsSuite.scala | 35 ++ .../spark/sql/hive/execution/HiveUDFSuite.scala| 6 ++-- 4 files changed, 39 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index 137a8976a40..40274a83340 100644 ---
[spark] branch master updated: [SPARK-43742][SQL][FOLLOWUP] Do not use null literal as default value for non-nullable columns
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9b1e124b0bd [SPARK-43742][SQL][FOLLOWUP] Do not use null literal as default value for non-nullable columns 9b1e124b0bd is described below commit 9b1e124b0bd0082e7ee13de56c4380783f816834 Author: Wenchen Fan AuthorDate: Wed Jun 21 07:34:32 2023 -0700 [SPARK-43742][SQL][FOLLOWUP] Do not use null literal as default value for non-nullable columns ### What changes were proposed in this pull request? A followup of https://github.com/apache/spark/pull/41262 to fix a mistake. If a column has no default value and is not nullable, we should fail if people want to use its default value via the explicit `DEFAULT` name, and do not fill missing columns in INSERT. ### Why are the changes needed? fix a wrong behavior ### Does this PR introduce _any_ user-facing change? yes, otherwise the DML command will fail later at runtime. ### How was this patch tested? new tests Closes #41656 from cloud-fan/def-val. Lead-authored-by: Wenchen Fan Co-authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun --- core/src/main/resources/error/error-classes.json | 6 + .../sql/catalyst/analysis/AssignmentUtils.scala| 3 +- .../catalyst/analysis/TableOutputResolver.scala| 4 +- .../catalyst/util/ResolveDefaultColumnsUtil.scala | 47 ++- .../execution/command/PlanResolutionSuite.scala| 370 - 5 files changed, 262 insertions(+), 168 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index e35adcfbb5a..1d2f25b72f3 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -1777,6 +1777,12 @@ }, "sqlState" : "46110" }, + "NO_DEFAULT_COLUMN_VALUE_AVAILABLE" : { +"message" : [ + "Can't determine the default value for since it is not nullable and it has no default value." +], +"sqlState" : "42608" + }, "NO_HANDLER_FOR_UDAF" : { "message" : [ "No handler for UDAF ''. Use sparkSession.udf.register(...) instead." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala index 069cef6b361..fa953c90532 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala @@ -104,7 +104,8 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { case assignment if assignment.key.semanticEquals(attr) => assignment } val resolvedValue = if (matchingAssignments.isEmpty) { -val defaultExpr = getDefaultValueExprOrNullLit(attr, conf) +val defaultExpr = getDefaultValueExprOrNullLit( + attr, conf.useNullsForMissingDefaultColumnValues) if (defaultExpr.isEmpty) { errors += s"No assignment for '${attr.name}'" } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 3b721cf5d0d..6718020685b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -67,7 +67,7 @@ object TableOutputResolver { val fillDefaultValue = supportColDefaultValue && actualExpectedCols.size > query.output.size val queryOutputCols = if (fillDefaultValue) { query.output ++ actualExpectedCols.drop(query.output.size).flatMap { expectedCol => - getDefaultValueExprOrNullLit(expectedCol, conf) + getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues) } } else { query.output @@ -185,7 +185,7 @@ object TableOutputResolver { val newColPath = colPath :+ expectedCol.name if (matched.isEmpty) { val defaultExpr = if (fillDefaultValue) { - getDefaultValueExprOrNullLit(expectedCol, conf) + getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues) } else { None } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala index 2169137685d..26efa8c8df2 100644 ---
[spark] branch master updated (ac1e2223105 -> 6a70f756dd1)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from ac1e2223105 [SPARK-43903][PYTHON][CONNECT] Improve ArrayType input support in Arrow Python UDF add 6a70f756dd1 [SPARK-44109][CORE] Remove duplicate preferred locations of each RDD partition No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/MapOutputTracker.scala | 4 +-- .../org/apache/spark/MapOutputTrackerSuite.scala | 33 ++ 2 files changed, 35 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43903][PYTHON][CONNECT] Improve ArrayType input support in Arrow Python UDF
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ac1e2223105 [SPARK-43903][PYTHON][CONNECT] Improve ArrayType input support in Arrow Python UDF ac1e2223105 is described below commit ac1e22231055d7e59eec5dd8c6a807252aab8b7f Author: Xinrong Meng AuthorDate: Wed Jun 21 17:22:00 2023 +0800 [SPARK-43903][PYTHON][CONNECT] Improve ArrayType input support in Arrow Python UDF ### What changes were proposed in this pull request? Improve ArrayType input support in Arrow Python UDF. Previously, ArrayType is mapped to a 'np.array'; now it is mapped to a `list` following Pickled Python UDF. ### Why are the changes needed? Reach parity with Pickled Python UDF. ### Does this PR introduce _any_ user-facing change? Yes. FROM ```py >>> df = spark.range(1).selectExpr("array(array(1, 2), array(3, 4)) as nested_array") >>> df.select(udf(lambda x: str(x), returnType='string', useArrow=True)("nested_array")).first() Row((nested_array)='[array([1, 2], dtype=int32) array([3, 4], dtype=int32)]') ``` TO ```py >>> df = spark.range(1).selectExpr("array(array(1, 2), array(3, 4)) as nested_array" >>> df.select(udf(lambda x: str(x), returnType='string', useArrow=True)("nested_array")).first() Row((nested_array)='[[1, 2], [3, 4]]') ``` ### How was this patch tested? Unit tests. Closes #41603 from xinrong-meng/ndarr. Authored-by: Xinrong Meng Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/pandas/serializers.py | 9 ++-- python/pyspark/sql/pandas/types.py| 61 --- python/pyspark/sql/tests/test_arrow_python_udf.py | 17 +-- python/pyspark/worker.py | 2 + 4 files changed, 62 insertions(+), 27 deletions(-) diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index 12d0bee88ad..307fcc33752 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -172,7 +172,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): self._timezone = timezone self._safecheck = safecheck -def arrow_to_pandas(self, arrow_column, struct_in_pandas="dict"): +def arrow_to_pandas(self, arrow_column, struct_in_pandas="dict", ndarray_as_list=False): # If the given column is a date type column, creates a series of datetime.date directly # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by # datetime64[ns] type handling. @@ -186,6 +186,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): timezone=self._timezone, struct_in_pandas=struct_in_pandas, error_on_duplicated_field_names=True, +ndarray_as_list=ndarray_as_list, ) return converter(s) @@ -317,11 +318,13 @@ class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer): assign_cols_by_name, df_for_struct=False, struct_in_pandas="dict", +ndarray_as_list=False, ): super(ArrowStreamPandasUDFSerializer, self).__init__(timezone, safecheck) self._assign_cols_by_name = assign_cols_by_name self._df_for_struct = df_for_struct self._struct_in_pandas = struct_in_pandas +self._ndarray_as_list = ndarray_as_list def arrow_to_pandas(self, arrow_column): import pyarrow.types as types @@ -331,14 +334,14 @@ class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer): series = [ super(ArrowStreamPandasUDFSerializer, self) -.arrow_to_pandas(column, self._struct_in_pandas) +.arrow_to_pandas(column, self._struct_in_pandas, self._ndarray_as_list) .rename(field.name) for column, field in zip(arrow_column.flatten(), arrow_column.type) ] s = pd.concat(series, axis=1) else: s = super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas( -arrow_column, self._struct_in_pandas +arrow_column, self._struct_in_pandas, self._ndarray_as_list ) return s diff --git a/python/pyspark/sql/pandas/types.py b/python/pyspark/sql/pandas/types.py index 757deff6130..53362047604 100644 --- a/python/pyspark/sql/pandas/types.py +++ b/python/pyspark/sql/pandas/types.py @@ -494,6 +494,7 @@ def _create_converter_to_pandas( struct_in_pandas: Optional[str] = None, error_on_duplicated_field_names: bool = True, timestamp_utc_localized: bool = True, +ndarray_as_list: bool = False, ) -> Callable[["pd.Series"], "pd.Series"]: """ Create a
[spark] branch master updated: [SPARK-44004][SQL] Assign name & improve error message for frequent LEGACY errors
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 94031ead786 [SPARK-44004][SQL] Assign name & improve error message for frequent LEGACY errors 94031ead786 is described below commit 94031ead78682bd5c1adab8b87e61055968c8998 Author: itholic AuthorDate: Wed Jun 21 10:36:04 2023 +0300 [SPARK-44004][SQL] Assign name & improve error message for frequent LEGACY errors ### What changes were proposed in this pull request? This PR proposes to assign name & improve error message for frequent LEGACY errors. ### Why are the changes needed? To improve the errors that most frequently occurring. ### Does this PR introduce _any_ user-facing change? No API changes, it's only for errors. ### How was this patch tested? The existing CI should passed. Closes #41504 from itholic/naming_top_error_class. Authored-by: itholic Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 80 +++--- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 +- .../catalyst/analysis/ResolveInlineTables.scala| 5 +- .../spark/sql/catalyst/analysis/unresolved.scala | 3 +- .../spark/sql/errors/QueryCompilationErrors.scala | 22 +++--- .../spark/sql/errors/QueryParsingErrors.scala | 2 +- .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 5 +- .../catalyst/analysis/ResolveSubquerySuite.scala | 11 ++- .../catalyst/parser/ExpressionParserSuite.scala| 10 +-- .../analyzer-results/ansi/literals.sql.out | 10 +-- .../columnresolution-negative.sql.out | 6 +- .../analyzer-results/join-lateral.sql.out | 6 +- .../sql-tests/analyzer-results/literals.sql.out| 10 +-- .../analyzer-results/postgreSQL/boolean.sql.out| 5 +- .../postgreSQL/window_part3.sql.out| 5 +- .../postgreSQL/window_part4.sql.out| 5 +- .../table-valued-functions.sql.out | 4 +- .../sql-tests/results/ansi/literals.sql.out| 10 +-- .../results/columnresolution-negative.sql.out | 6 +- .../sql-tests/results/join-lateral.sql.out | 6 +- .../resources/sql-tests/results/literals.sql.out | 10 +-- .../sql-tests/results/postgreSQL/boolean.sql.out | 5 +- .../results/postgreSQL/window_part3.sql.out| 5 +- .../results/postgreSQL/window_part4.sql.out| 5 +- .../results/table-valued-functions.sql.out | 4 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 12 ++-- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 6 +- .../spark/sql/execution/SQLViewTestSuite.scala | 4 +- 28 files changed, 134 insertions(+), 132 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index d9e729effeb..e35adcfbb5a 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -157,6 +157,11 @@ ], "sqlState" : "22018" }, + "CANNOT_PARSE_INTERVAL" : { +"message" : [ + "Unable to parse . Please ensure that the value provided is in a valid format for defining an interval. You can reference the documentation for the correct format. If the issue persists, please double check that the input value is not null or empty and try again." +] + }, "CANNOT_PARSE_JSON_FIELD" : { "message" : [ "Cannot parse the field name and the value of the JSON token type to target Spark data type ." @@ -191,6 +196,11 @@ ], "sqlState" : "0AKD0" }, + "CANNOT_RESOLVE_STAR_EXPAND" : { +"message" : [ + "Cannot resolve .* given input columns . Please check that the specified table or struct exists and is accessible in the input columns." +] + }, "CANNOT_RESTORE_PERMISSIONS_FOR_PATH" : { "message" : [ "Failed to set permissions on created path back to ." @@ -689,6 +699,11 @@ ], "sqlState" : "42K04" }, + "FAILED_SQL_EXPRESSION_EVALUATION" : { +"message" : [ + "Failed to evaluate the SQL expression: . Please check your syntax and ensure all required tables and columns are available." +] + }, "FIELD_NOT_FOUND" : { "message" : [ "No such struct field in ." @@ -1222,6 +1237,11 @@ } } }, + "INVALID_NUMERIC_LITERAL_RANGE" : { +"message" : [ + "Numeric literal is outside the valid range for with minimum value of and maximum value of . Please adjust the value accordingly." +] + }, "INVALID_OPTIONS" : { "message" : [ "Invalid options:" @@ -1497,6 +1517,11 @@ "The url is invalid: . If necessary set to \"false\" to bypass this error." ] }, + "INVALID_VIEW_TEXT" : { +"message" : [ + "The view
[spark] branch master updated: [SPARK-44106][PYTHON][CONNECT] Add `__repr__` for `GroupedData`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5facaece4df [SPARK-44106][PYTHON][CONNECT] Add `__repr__` for `GroupedData` 5facaece4df is described below commit 5facaece4dfa1fa45e8c8f7bd7d92f11e2c91fd8 Author: Ruifeng Zheng AuthorDate: Wed Jun 21 14:48:51 2023 +0800 [SPARK-44106][PYTHON][CONNECT] Add `__repr__` for `GroupedData` ### What changes were proposed in this pull request? Add `__repr__` for `GroupedData` ### Why are the changes needed? `GroupedData.__repr__` is missing ### Does this PR introduce _any_ user-facing change? yes 1. On Scala side: ``` scala> val df = Seq(("414243", "4243")).toDF("e", "f") df: org.apache.spark.sql.DataFrame = [e: string, f: string] scala> df.groupBy("e") res0: org.apache.spark.sql.RelationalGroupedDataset = RelationalGroupedDataset: [grouping expressions: [e: string], value: [e: string, f: string], type: GroupBy] scala> df.groupBy(df.col("e")) res1: org.apache.spark.sql.RelationalGroupedDataset = RelationalGroupedDataset: [grouping expressions: [e: string], value: [e: string, f: string], type: GroupBy] ``` 2. On vanilla PySpark: before this PR: ``` In [1]: df = spark.createDataFrame([("414243", "4243",)], ["e", "f"]) In [2]: df Out[2]: DataFrame[e: string, f: string] In [3]: df.groupBy("e") Out[3]: In [4]: df.groupBy(df.e) Out[4]: ``` after this PR: ``` In [1]: df = spark.createDataFrame([("414243", "4243",)], ["e", "f"]) In [2]: df Out[2]: DataFrame[e: string, f: string] In [3]: df.groupBy("e") Out[3]: GroupedData[grouping expressions: [e], value: [e: string, f: string], type: GroupBy] In [4]: df.groupBy(df.e) Out[4]: GroupedData[grouping expressions: [e: string], value: [e: string, f: string], type: GroupBy] ``` 3. On Spark Connect Python Client: before this PR: ``` In [1]: df = spark.createDataFrame([("414243", "4243",)], ["e", "f"]) In [2]: df Out[2]: DataFrame[e: string, f: string] In [3]: df.groupBy("e") Out[3]: In [4]: df.groupBy(df.e) Out[4]: ``` after this PR: ``` In [1]: df = spark.createDataFrame([("414243", "4243",)], ["e", "f"]) In [2]: df Out[2]: DataFrame[e: string, f: string] In [3]: df.groupBy("e") Out[3]: GroupedData[grouping expressions: [e], value: [e: string, f: string], type: GroupBy] In [4]: df.groupBy(df.e) Out[4]: GroupedData[grouping expressions: [e], value: [e: string, f: string], type: GroupBy] // different from vanilla PySpark ``` Note that since the expressions in Python Client are not resolved, the string can be different from vanilla PySpark. ### How was this patch tested? added doctests Closes #41674 from zhengruifeng/group_repr. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/group.py | 19 +++ python/pyspark/sql/group.py | 11 +++ 2 files changed, 30 insertions(+) diff --git a/python/pyspark/sql/connect/group.py b/python/pyspark/sql/connect/group.py index e75c8029ef2..a393d2cb37e 100644 --- a/python/pyspark/sql/connect/group.py +++ b/python/pyspark/sql/connect/group.py @@ -83,6 +83,25 @@ class GroupedData: self._pivot_col = pivot_col self._pivot_values = pivot_values +def __repr__(self) -> str: +# the expressions are not resolved here, +# so the string representation can be different from vanilla PySpark. +grouping_str = ", ".join(str(e._expr) for e in self._grouping_cols) +grouping_str = f"grouping expressions: [{grouping_str}]" + +value_str = ", ".join("%s: %s" % c for c in self._df.dtypes) + +if self._group_type == "groupby": +type_str = "GroupBy" +elif self._group_type == "rollup": +type_str = "RollUp" +elif self._group_type == "cube": +type_str = "Cube" +else: +type_str = "Pivot" + +return f"GroupedData[{grouping_str}, value: [{value_str}], type: {type_str}]" + @overload def agg(self, *exprs: Column) -> "DataFrame": ... diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index e33e3d6ec5e..9568a971229 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -70,6 +70,14 @@ class GroupedData(PandasGroupedOpsMixin): self._df = df self.session: SparkSession = df.sparkSession +def __repr__(self) -> str: +index = 26 # index to truncate string from the JVM side +jvm_string = self._jgd.toString() +
[spark] branch master updated: [SPARK-43511][CONNECT][SS] Implemented MapGroupsWithState and FlatMapGroupsWithState APIs for Spark Connect
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8a16aed9a17 [SPARK-43511][CONNECT][SS] Implemented MapGroupsWithState and FlatMapGroupsWithState APIs for Spark Connect 8a16aed9a17 is described below commit 8a16aed9a17269b4c8111779229507e3c28ba945 Author: bogao007 AuthorDate: Wed Jun 21 15:35:34 2023 +0900 [SPARK-43511][CONNECT][SS] Implemented MapGroupsWithState and FlatMapGroupsWithState APIs for Spark Connect ### What changes were proposed in this pull request? Implemented MapGroupsWithState and FlatMapGroupsWithState APIs for Spark Connect ### Why are the changes needed? To support streaming state APIs in Spark Connect ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? Added unit test Closes #41558 from bogao007/sc-state-api. Authored-by: bogao007 Signed-off-by: Hyukjin Kwon --- .../apache/spark/sql/KeyValueGroupedDataset.scala | 398 + .../sql/KeyValueGroupedDatasetE2ETestSuite.scala | 107 ++ .../CheckConnectJvmClientCompatibility.scala | 6 - .../FlatMapGroupsWithStateStreamingSuite.scala | 224 .../function/FlatMapGroupsWithStateFunction.java | 39 ++ .../java/function/MapGroupsWithStateFunction.java | 38 ++ .../main/protobuf/spark/connect/relations.proto| 16 + .../apache/spark/sql/connect/common/UdfUtils.scala | 26 ++ .../apache/spark/sql/streaming/GroupState.scala| 336 + .../sql/connect/planner/SparkConnectPlanner.scala | 92 - python/pyspark/sql/connect/proto/relations_pb2.py | 24 +- python/pyspark/sql/connect/proto/relations_pb2.pyi | 84 - 12 files changed, 1359 insertions(+), 31 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 7b2fa3b52be..20c130b83cb 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.ProductEncoder import org.apache.spark.sql.connect.common.UdfUtils import org.apache.spark.sql.expressions.ScalarUserDefinedFunction import org.apache.spark.sql.functions.col +import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode} /** * A [[Dataset]] has been logically grouped by a user specified grouping key. Users should not @@ -460,6 +461,356 @@ abstract class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable cogroupSorted(other)(thisSortExprs: _*)(otherSortExprs: _*)( UdfUtils.coGroupFunctionToScalaFunc(f))(encoder) } + + protected def flatMapGroupsWithStateHelper[S: Encoder, U: Encoder]( + outputMode: Option[OutputMode], + timeoutConf: GroupStateTimeout, + initialState: Option[KeyValueGroupedDataset[K, S]], + isMapGroupWithState: Boolean)( + func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U] = { +throw new UnsupportedOperationException + } + + /** + * (Scala-specific) Applies the given function to each group of data, while maintaining a + * user-defined per-group state. The result Dataset will represent the objects returned by the + * function. For a static batch Dataset, the function will be invoked once per group. For a + * streaming Dataset, the function will be invoked for each group repeatedly in every trigger, + * and updates to each group's state will be saved across invocations. See + * [[org.apache.spark.sql.streaming.GroupState]] for more details. + * + * @tparam S + * The type of the user-defined state. Must be encodable to Spark SQL types. + * @tparam U + * The type of the output objects. Must be encodable to Spark SQL types. + * @param func + * Function to be called on every group. + * + * See [[Encoder]] for more details on what types are encodable to Spark SQL. + * @since 3.5.0 + */ + def mapGroupsWithState[S: Encoder, U: Encoder]( + func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = { +mapGroupsWithState(GroupStateTimeout.NoTimeout)(func) + } + + /** + * (Scala-specific) Applies the given function to each group of data, while maintaining a + * user-defined per-group state. The result Dataset will represent the objects returned by the + * function. For a static batch Dataset, the function will be invoked once per group. For a + * streaming Dataset, the function will be invoked for each group repeatedly in every trigger, + * and updates