[spark] branch master updated: [SPARK-44078][CONNECT][CORE] Add support for classloader/resource isolation

2023-06-21 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9d031ba8c99 [SPARK-44078][CONNECT][CORE] Add support for 
classloader/resource isolation
9d031ba8c99 is described below

commit 9d031ba8c995286e5f8892764e5108aa60f49238
Author: vicennial 
AuthorDate: Wed Jun 21 20:58:19 2023 -0400

[SPARK-44078][CONNECT][CORE] Add support for classloader/resource isolation

### What changes were proposed in this pull request?

This PR adds a `JobArtifactSet` which holds the jars/files/archives 
relevant to a particular Spark Job. Using this "set", we are able to support 
specifying visible/available resources for a job based on, for example, the 
SparkSession that the job belongs to.

With resource specification support, we are further able to extend this to 
support classloader/resource isolation on the executors. The executors would 
use the `uuid` from the `JobArtifactSet` to either create or obtain from a 
cache the 
[IsolatedSessionState](https://github.com/apache/spark/pull/41625/files#diff-d7a989c491f3cb77cca02c701496a9e2a3443f70af73b0d1ab0899239f3a789dR57)
 which holds the "state" (i.e classloaders, files, jars, archives etc) for that 
particular `uuid`.

Currently, the code will default to copying over resources from the 
`SparkContext` (the current/default behaviour) to avoid any behaviour changes. 
A follow-up PR would use this mechanism in Spark Connect to isolate resources 
among Spark Connect sessions.

### Why are the changes needed?

A current limitation of Scala UDFs is that a Spark cluster would only be 
able to support a single REPL at a time due to the fact that classloaders of 
different Spark Sessions (and therefore, Spark Connect sessions) aren't 
isolated from each other. Without isolation, REPL-generated class files and 
user-added JARs may conflict if there are multiple users of the cluster.

Thus, we need a mechanism to support isolated sessions (i.e isolated 
resources/classloader) so that each REPL user does not conflict with other 
users on the same cluster.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests + new suite `JobArtifactSetSuite`.

Closes #41625 from vicennial/SPARK-44078.

Authored-by: vicennial 
Signed-off-by: Herman van Hovell 
---
 .../scala/org/apache/spark/JobArtifactSet.scala| 123 +
 .../scala/org/apache/spark/executor/Executor.scala | 120 +---
 .../org/apache/spark/scheduler/ActiveJob.scala |   3 +
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  37 ---
 .../apache/spark/scheduler/DAGSchedulerEvent.scala |   2 +
 .../org/apache/spark/scheduler/ResultTask.scala|   5 +-
 .../apache/spark/scheduler/ShuffleMapTask.scala|   9 +-
 .../scala/org/apache/spark/scheduler/Task.scala|   2 +
 .../apache/spark/scheduler/TaskDescription.scala   |  61 ++
 .../apache/spark/scheduler/TaskSetManager.scala|   9 +-
 .../org/apache/spark/JobArtifactSetSuite.scala |  87 +++
 .../CoarseGrainedExecutorBackendSuite.scala|   7 +-
 .../org/apache/spark/executor/ExecutorSuite.scala  |  21 ++--
 .../CoarseGrainedSchedulerBackendSuite.scala   |  12 +-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala |   9 +-
 .../org/apache/spark/scheduler/FakeTask.scala  |   8 +-
 .../spark/scheduler/NotSerializableFakeTask.scala  |   4 +-
 .../apache/spark/scheduler/TaskContextSuite.scala  |  26 +++--
 .../spark/scheduler/TaskDescriptionSuite.scala |  18 +--
 .../spark/scheduler/TaskSchedulerImplSuite.scala   |   4 +-
 .../spark/scheduler/TaskSetManagerSuite.scala  |  12 +-
 .../MesosFineGrainedSchedulerBackendSuite.scala|  10 +-
 22 files changed, 436 insertions(+), 153 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/JobArtifactSet.scala 
b/core/src/main/scala/org/apache/spark/JobArtifactSet.scala
new file mode 100644
index 000..d87c25c0b7c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/JobArtifactSet.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 

[spark] branch master updated: [SPARK-44136][SS] Fixed an issue that StateManager may get materialized in executor instead of driver in FlatMapGroupsWithStateExec

2023-06-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fb4d6d9db27 [SPARK-44136][SS] Fixed an issue that StateManager may get 
materialized in executor instead of driver in FlatMapGroupsWithStateExec
fb4d6d9db27 is described below

commit fb4d6d9db27d0e9642de33d5b3f9915b334ee02c
Author: bogao007 
AuthorDate: Thu Jun 22 09:26:32 2023 +0900

[SPARK-44136][SS] Fixed an issue that StateManager may get materialized in 
executor instead of driver in FlatMapGroupsWithStateExec

### What changes were proposed in this pull request?

Fixed an issue that StateManager may get materialized in executor instead 
of driver in FlatMapGroupsWithStateExec. The ticket that brought this issue: 
https://issues.apache.org/jira/browse/SPARK-40411

The basic idea is to maintain the `stateManager` as `lazy val` but 
initialize it earlier in the `doExecute()` to force a lazy init at driver.

### Why are the changes needed?

Because without this change, the StateManager in FlatMapGroupsWithStateExec 
may get materialized in executor instead of driver which would cause unexpected 
behavior.

### Does this PR introduce _any_ user-facing change?

Yes

### How was this patch tested?

It's hard to write a unit test for this since it involves in both driver 
and executor which is hard to simulate through a unit test.

Closes #41693 from bogao007/SPARK-44136.

Authored-by: bogao007 
Signed-off-by: Hyukjin Kwon 
---
 .../spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala   | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index d30b9ad116f..3c3d55e6208 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -188,6 +188,7 @@ trait FlatMapGroupsWithStateExecBase
   }
 
   override protected def doExecute(): RDD[InternalRow] = {
+stateManager // force lazy init at driver
 metrics // force lazy init at driver
 
 // Throw errors early if parameters are not as expected


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-44136][SS] Fixed an issue that StateManager may get materialized in executor instead of driver in FlatMapGroupsWithStateExec

2023-06-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new e4476c60782 [SPARK-44136][SS] Fixed an issue that StateManager may get 
materialized in executor instead of driver in FlatMapGroupsWithStateExec
e4476c60782 is described below

commit e4476c60782b153cc9639497e6653f51c39401b1
Author: bogao007 
AuthorDate: Thu Jun 22 09:26:32 2023 +0900

[SPARK-44136][SS] Fixed an issue that StateManager may get materialized in 
executor instead of driver in FlatMapGroupsWithStateExec

### What changes were proposed in this pull request?

Fixed an issue that StateManager may get materialized in executor instead 
of driver in FlatMapGroupsWithStateExec. The ticket that brought this issue: 
https://issues.apache.org/jira/browse/SPARK-40411

The basic idea is to maintain the `stateManager` as `lazy val` but 
initialize it earlier in the `doExecute()` to force a lazy init at driver.

### Why are the changes needed?

Because without this change, the StateManager in FlatMapGroupsWithStateExec 
may get materialized in executor instead of driver which would cause unexpected 
behavior.

### Does this PR introduce _any_ user-facing change?

Yes

### How was this patch tested?

It's hard to write a unit test for this since it involves in both driver 
and executor which is hard to simulate through a unit test.

Closes #41693 from bogao007/SPARK-44136.

Authored-by: bogao007 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit fb4d6d9db27d0e9642de33d5b3f9915b334ee02c)
Signed-off-by: Hyukjin Kwon 
---
 .../spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala   | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index 760681e81c9..783226a8060 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -183,6 +183,7 @@ trait FlatMapGroupsWithStateExecBase
   }
 
   override protected def doExecute(): RDD[InternalRow] = {
+stateManager // force lazy init at driver
 metrics // force lazy init at driver
 
 // Throw errors early if parameters are not as expected


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43710][PS][CONNECT] Support `functions.date_part` for Spark Connect

2023-06-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bc20e85b0e1 [SPARK-43710][PS][CONNECT] Support `functions.date_part` 
for Spark Connect
bc20e85b0e1 is described below

commit bc20e85b0e1da510cc091dbd03f210ef9fc56b25
Author: Ruifeng Zheng 
AuthorDate: Thu Jun 22 08:47:27 2023 +0900

[SPARK-43710][PS][CONNECT] Support `functions.date_part` for Spark Connect

### What changes were proposed in this pull request?
switch to the [newly added `date_part` 
function](https://github.com/apache/spark/commit/8dc02863b926b9e0780b994f9ee6c5c1058d49a0)

### Why are the changes needed?
to support connect

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
existing UT

Closes #41691 from zhengruifeng/ps_date_part.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/indexes/timedelta.py |  6 +++---
 python/pyspark/pandas/spark/functions.py   | 32 ++
 2 files changed, 5 insertions(+), 33 deletions(-)

diff --git a/python/pyspark/pandas/indexes/timedelta.py 
b/python/pyspark/pandas/indexes/timedelta.py
index 564c484d968..e46d602e985 100644
--- a/python/pyspark/pandas/indexes/timedelta.py
+++ b/python/pyspark/pandas/indexes/timedelta.py
@@ -150,9 +150,9 @@ class TimedeltaIndex(Index):
 
 @no_type_check
 def get_seconds(scol):
-hour_scol = SF.date_part("HOUR", scol)
-minute_scol = SF.date_part("MINUTE", scol)
-second_scol = SF.date_part("SECOND", scol)
+hour_scol = F.date_part("HOUR", scol)
+minute_scol = F.date_part("MINUTE", scol)
+second_scol = F.date_part("SECOND", scol)
 return (
 F.when(
 hour_scol < 0,
diff --git a/python/pyspark/pandas/spark/functions.py 
b/python/pyspark/pandas/spark/functions.py
index a904071aee7..b33705263c7 100644
--- a/python/pyspark/pandas/spark/functions.py
+++ b/python/pyspark/pandas/spark/functions.py
@@ -17,15 +17,11 @@
 """
 Additional Spark functions used in pandas-on-Spark.
 """
-from typing import Union, no_type_check
+from typing import Union
 
 from pyspark import SparkContext
 import pyspark.sql.functions as F
-from pyspark.sql.column import (
-Column,
-_to_java_column,
-_create_column_from_literal,
-)
+from pyspark.sql.column import Column
 
 # For supporting Spark Connect
 from pyspark.sql.utils import is_remote
@@ -145,27 +141,3 @@ def repeat(col: Column, n: Union[int, Column]) -> Column:
 """
 _n = F.lit(n) if isinstance(n, int) else n
 return F.call_udf("repeat", col, _n)
-
-
-def date_part(field: Union[str, Column], source: Column) -> Column:
-"""
-Extracts a part of the date/timestamp or interval source.
-"""
-sc = SparkContext._active_spark_context
-field = (
-_to_java_column(field) if isinstance(field, Column) else 
_create_column_from_literal(field)
-)
-return _call_udf(sc, "date_part", field, _to_java_column(source))
-
-
-@no_type_check
-def _call_udf(sc, name, *cols):
-return Column(sc._jvm.functions.callUDF(name, _make_arguments(sc, *cols)))
-
-
-@no_type_check
-def _make_arguments(sc, *cols):
-java_arr = sc._gateway.new_array(sc._jvm.Column, len(cols))
-for i, col in enumerate(cols):
-java_arr[i] = col
-return java_arr


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44133][PYTHON] Upgrade MyPy from 0.920 to 0.982

2023-06-21 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 10751dc285c [SPARK-44133][PYTHON] Upgrade MyPy from 0.920 to 0.982
10751dc285c is described below

commit 10751dc285c5c639e3343a8abc26857407522822
Author: Hyukjin Kwon 
AuthorDate: Wed Jun 21 12:56:57 2023 -0700

[SPARK-44133][PYTHON] Upgrade MyPy from 0.920 to 0.982

### What changes were proposed in this pull request?

This PR upgrade MyPy version from 0.920 to 0.982.

### Why are the changes needed?

To detect type related changes better by static analysys.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

```bash
./dev/linter-python
```

Closes #41690 from HyukjinKwon/SPARK-44133.

Authored-by: Hyukjin Kwon 
Signed-off-by: Dongjoon Hyun 
---
 .github/workflows/build_and_test.yml   |  2 +-
 dev/requirements.txt   |  2 +-
 python/pyspark/ml/base.py  |  2 +-
 python/pyspark/ml/classification.py| 84 +--
 python/pyspark/ml/clustering.py| 36 
 python/pyspark/ml/connect/base.py  |  2 +-
 python/pyspark/ml/connect/classification.py|  2 +-
 python/pyspark/ml/feature.py   | 44 +-
 python/pyspark/ml/fpm.py   |  4 +-
 python/pyspark/ml/recommendation.py|  6 +-
 python/pyspark/ml/regression.py| 96 +++---
 .../pyspark/ml/tests/typing/test_clustering.yaml   |  6 +-
 python/pyspark/ml/tests/typing/test_evaluation.yml |  6 +-
 python/pyspark/ml/torch/distributor.py |  6 +-
 python/pyspark/ml/tree.py  | 16 ++--
 python/pyspark/ml/tuning.py|  2 +-
 python/pyspark/ml/util.py  |  4 +-
 python/pyspark/ml/wrapper.py   |  4 +-
 python/pyspark/mllib/classification.py |  6 +-
 python/pyspark/mllib/clustering.py | 18 ++--
 python/pyspark/mllib/evaluation.py | 38 -
 python/pyspark/mllib/feature.py|  8 +-
 python/pyspark/mllib/linalg/__init__.py|  4 +-
 python/pyspark/mllib/linalg/distributed.py |  6 +-
 python/pyspark/mllib/recommendation.py |  2 +-
 python/pyspark/mllib/regression.py |  4 +-
 python/pyspark/sql/observation.py  |  2 +-
 python/pyspark/sql/tests/typing/test_dataframe.yml |  4 +-
 python/pyspark/sql/tests/typing/test_functions.yml | 32 
 python/pyspark/sql/tests/typing/test_session.yml   |  7 +-
 python/pyspark/sql/types.py|  2 +-
 python/pyspark/streaming/context.py|  4 +-
 python/pyspark/tests/typing/test_rdd.yml   |  4 +-
 33 files changed, 235 insertions(+), 230 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index a03aa53dc88..47732a5c9f6 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -624,7 +624,7 @@ jobs:
 #   See also https://github.com/sphinx-doc/sphinx/issues/7551.
 # Jinja2 3.0.0+ causes error when building with Sphinx.
 #   See also https://issues.apache.org/jira/browse/SPARK-35375.
-python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 
'mypy==0.920' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc 
'jinja2<3.0.0' 'black==22.6.0'
+python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 
'mypy==0.982' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc 
'jinja2<3.0.0' 'black==22.6.0'
 python3.9 -m pip install 'pandas-stubs==1.2.0.53' ipython 
'grpcio==1.48.1' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0'
 - name: Python linter
   run: PYTHON_EXECUTABLE=python3.9 ./dev/lint-python
diff --git a/dev/requirements.txt b/dev/requirements.txt
index 1af7256e0b3..72da5dbe163 100644
--- a/dev/requirements.txt
+++ b/dev/requirements.txt
@@ -20,7 +20,7 @@ openpyxl
 coverage
 
 # Linter
-mypy==0.920
+mypy==0.982
 pytest-mypy-plugins==1.9.3
 flake8==3.9.0
 # See SPARK-38680.
diff --git a/python/pyspark/ml/base.py b/python/pyspark/ml/base.py
index 34c3aa9c62c..b94358d26fd 100644
--- a/python/pyspark/ml/base.py
+++ b/python/pyspark/ml/base.py
@@ -396,7 +396,7 @@ class PredictionModel(Model, _PredictorParams, Generic[T], 
metaclass=ABCMeta):
 """
 return self._set(predictionCol=value)
 
-@property  # type: ignore[misc]
+@property
 @abstractmethod
 @since("2.1.0")
 def numFeatures(self) -> int:
diff --git a/python/pyspark/ml/classification.py 

[spark] branch master updated: [SPARK-43915][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2438-2445]

2023-06-21 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bbcc438e5b3 [SPARK-43915][SQL] Assign names to the error class 
_LEGACY_ERROR_TEMP_[2438-2445]
bbcc438e5b3 is described below

commit bbcc438e5b3aef67bf430b6bb6e4f893d8e66d13
Author: Jiaan Geng 
AuthorDate: Wed Jun 21 21:20:01 2023 +0300

[SPARK-43915][SQL] Assign names to the error class 
_LEGACY_ERROR_TEMP_[2438-2445]

### What changes were proposed in this pull request?
The pr aims to assign names to the error class 
_LEGACY_ERROR_TEMP_[2438-2445].

### Why are the changes needed?
Improve the error framework.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Exists test cases updated.

Closes #41553 from beliefer/SPARK-43915.

Authored-by: Jiaan Geng 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   | 47 +-
 python/pyspark/sql/tests/test_udtf.py  |  8 +++-
 .../spark/sql/catalyst/analysis/Analyzer.scala |  4 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala  | 23 +--
 .../sql/catalyst/analysis/AnalysisSuite.scala  | 28 -
 .../analyzer-results/group-analytics.sql.out   |  2 +-
 .../analyzer-results/join-lateral.sql.out  |  4 +-
 .../udf/udf-group-analytics.sql.out|  2 +-
 .../sql-tests/results/group-analytics.sql.out  |  2 +-
 .../sql-tests/results/join-lateral.sql.out |  4 +-
 .../results/udf/udf-group-analytics.sql.out|  2 +-
 .../spark/sql/DataFrameSetOperationsSuite.scala| 44 ++--
 .../sql/connector/DataSourceV2FunctionSuite.scala  | 13 +-
 .../sql/connector/DeleteFromTableSuiteBase.scala   | 15 +--
 .../connector/DeltaBasedDeleteFromTableSuite.scala | 20 +
 .../sql/connector/DeltaBasedUpdateTableSuite.scala | 21 ++
 .../connector/GroupBasedDeleteFromTableSuite.scala | 22 +-
 .../sql/connector/GroupBasedUpdateTableSuite.scala | 23 ++-
 .../spark/sql/connector/UpdateTableSuiteBase.scala | 15 +--
 19 files changed, 195 insertions(+), 104 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 1d2f25b72f3..264d9b7c3a0 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -643,6 +643,11 @@
 ],
 "sqlState" : "23505"
   },
+  "DUPLICATED_METRICS_NAME" : {
+"message" : [
+  "The metric name is not unique: . The same name cannot be 
used for metrics with different results. However multiple instances of metrics 
with with same result and name are allowed (e.g. self-joins)."
+]
+  },
   "DUPLICATE_CLAUSES" : {
 "message" : [
   "Found duplicate clauses: . Please, remove one of them."
@@ -1237,6 +1242,11 @@
   }
 }
   },
+  "INVALID_NON_DETERMINISTIC_EXPRESSIONS" : {
+"message" : [
+  "The operator expects a deterministic expression, but the actual 
expression is ."
+]
+  },
   "INVALID_NUMERIC_LITERAL_RANGE" : {
 "message" : [
   "Numeric literal  is outside the valid range for 
 with minimum value of  and maximum value of . 
Please adjust the value accordingly."
@@ -1512,6 +1522,11 @@
 ],
 "sqlState" : "42604"
   },
+  "INVALID_UDF_IMPLEMENTATION" : {
+"message" : [
+  "Function  does not implement ScalarFunction or 
AggregateFunction."
+]
+  },
   "INVALID_URL" : {
 "message" : [
   "The url is invalid: . If necessary set  to \"false\" 
to bypass this error."
@@ -2458,6 +2473,11 @@
   " is a reserved namespace property, ."
 ]
   },
+  "SET_OPERATION_ON_MAP_TYPE" : {
+"message" : [
+  "Cannot have MAP type columns in DataFrame which calls set 
operations (INTERSECT, EXCEPT, etc.), but the type of column  is 
."
+]
+  },
   "SET_PROPERTIES_AND_DBPROPERTIES" : {
 "message" : [
   "set PROPERTIES and DBPROPERTIES at the same time."
@@ -5659,33 +5679,6 @@
   "Conflicting attributes: ."
 ]
   },
-  "_LEGACY_ERROR_TEMP_2438" : {
-"message" : [
-  "Cannot have map type columns in DataFrame which calls set 
operations(intersect, except, etc.), but the type of column  is 
."
-]
-  },
-  "_LEGACY_ERROR_TEMP_2439" : {
-"message" : [
-  "nondeterministic expressions are only allowed in Project, Filter, 
Aggregate or Window, found:",
-  "",
-  "in operator ."
-]
-  },
-  "_LEGACY_ERROR_TEMP_2443" : {
-"message" : [
-  "Multiple definitions of observed metrics named '': ."
-]
-  },
-  "_LEGACY_ERROR_TEMP_2444" : {
-"message" : [
-  "Function '' does not implement ScalarFunction or 
AggregateFunction."
-

[spark] branch master updated: [SPARK-43205][SQL][FOLLOWUP] remove unnecessary abstraction for `withIdentClause`

2023-06-21 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a4fb7cceb44 [SPARK-43205][SQL][FOLLOWUP] remove unnecessary 
abstraction for `withIdentClause`
a4fb7cceb44 is described below

commit a4fb7cceb441ddd30ce6613a27ba9b62402911fd
Author: Wenchen Fan 
AuthorDate: Wed Jun 21 10:32:36 2023 -0700

[SPARK-43205][SQL][FOLLOWUP] remove unnecessary abstraction for 
`withIdentClause`

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/41385 . This PR 
adds `withFuncIndentClause` for function identifiers, so that the related 
methods can be simpler.

### Why are the changes needed?

code cleanup

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #41631 from cloud-fan/followup.

Authored-by: Wenchen Fan 
Signed-off-by: Gengliang Wang 
---
 .../spark/sql/catalyst/parser/AstBuilder.scala | 70 --
 .../analyzer-results/identifier-clause.sql.out | 32 +-
 .../sql-tests/inputs/identifier-clause.sql | 14 ++---
 .../sql-tests/results/identifier-clause.sql.out| 14 ++---
 4 files changed, 70 insertions(+), 60 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index abfe64f72e7..07721424a86 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -66,31 +66,44 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] 
with SQLConfHelper wit
   protected def withIdentClause(
   ctx: IdentifierReferenceContext,
   builder: Seq[String] => LogicalPlan): LogicalPlan = {
-withIdentClause(
-  ctx.expression,
-  () => visitMultipartIdentifier(ctx.multipartIdentifier),
-  builder)
+val exprCtx = ctx.expression
+if (exprCtx != null) {
+  PlanWithUnresolvedIdentifier(withOrigin(exprCtx) { expression(exprCtx) 
}, builder)
+} else {
+  builder.apply(visitMultipartIdentifier(ctx.multipartIdentifier))
+}
   }
 
   protected def withIdentClause(
-  ctx: ExpressionContext,
-  getIdent: () => Seq[String],
+  ctx: IdentifierReferenceContext,
+  builder: Seq[String] => Expression): Expression = {
+val exprCtx = ctx.expression
+if (exprCtx != null) {
+  ExpressionWithUnresolvedIdentifier(withOrigin(exprCtx) { 
expression(exprCtx) }, builder)
+} else {
+  builder.apply(visitMultipartIdentifier(ctx.multipartIdentifier))
+}
+  }
+
+  protected def withFuncIdentClause(
+  ctx: FunctionNameContext,
   builder: Seq[String] => LogicalPlan): LogicalPlan = {
-if (ctx != null) {
-  PlanWithUnresolvedIdentifier(withOrigin(ctx) { expression(ctx) }, 
builder)
+val exprCtx = ctx.expression
+if (exprCtx != null) {
+  PlanWithUnresolvedIdentifier(withOrigin(exprCtx) { expression(exprCtx) 
}, builder)
 } else {
-  builder.apply(getIdent())
+  builder.apply(getFunctionMultiparts(ctx))
 }
   }
 
-  protected def withIdentClause(
-  ctx: ExpressionContext,
-  getIdent: () => Seq[String],
+  protected def withFuncIdentClause(
+  ctx: FunctionNameContext,
   builder: Seq[String] => Expression): Expression = {
-if (ctx != null) {
-  ExpressionWithUnresolvedIdentifier(withOrigin(ctx) { expression(ctx) }, 
builder)
+val exprCtx = ctx.expression
+if (exprCtx != null) {
+  ExpressionWithUnresolvedIdentifier(withOrigin(exprCtx) { 
expression(exprCtx) }, builder)
 } else {
-  builder.apply(getIdent())
+  builder.apply(getFunctionMultiparts(ctx))
 }
   }
 
@@ -1538,21 +1551,17 @@ class AstBuilder extends 
SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
   Seq.empty
 }
 
-withIdentClause(
-  func.functionName.expression,
-  () => getFunctionMultiparts(func.functionName),
-  name => {
-if (name.length > 1) {
-  throw QueryParsingErrors.invalidTableValuedFunctionNameError(name, 
ctx)
-}
+withFuncIdentClause(func.functionName, ident => {
+  if (ident.length > 1) {
+throw QueryParsingErrors.invalidTableValuedFunctionNameError(ident, 
ctx)
+  }
 
-val tvf = UnresolvedTableValuedFunction(name, 
func.expression.asScala.map(expression).toSeq)
+  val tvf = UnresolvedTableValuedFunction(ident, 
func.expression.asScala.map(expression).toSeq)
 
-val tvfAliases = if (aliases.nonEmpty) UnresolvedTVFAliases(name, tvf, 
aliases) else tvf
+  val tvfAliases = if (aliases.nonEmpty) 

[spark] branch master updated: [SPARK-44056][SQL] Include UDF name in UDF execution failure error message when available

2023-06-21 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6165f316063 [SPARK-44056][SQL] Include UDF name in UDF execution 
failure error message when available
6165f316063 is described below

commit 6165f31606344efdf35f060d07cee46b85948e38
Author: Rob Reeves 
AuthorDate: Wed Jun 21 18:00:36 2023 +0300

[SPARK-44056][SQL] Include UDF name in UDF execution failure error message 
when available

### What changes were proposed in this pull request?
This modifies the error message when a Scala UDF fails to execute by 
including the UDF name if it is available.

### Why are the changes needed?
If there are multiple UDFs defined in the same location with the same 
method signature it can be hard to identify which UDF causes the issue. The 
current function class alone does not give enough information on its own. 
Adding the UDF name, if available, makes it easier to identify the exact 
problematic UDF.

This is particularly helpful when the exception stack trace is not emitted 
due to a JVM performance optimization and codegen is enabled. Example in 3.1.1:
```
Caused by: org.apache.spark.SparkException: Failed to execute user defined 
function(UDFRegistration$$Lambda$666/1969461119: (bigint, string) => string)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.subExpr_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
at 
org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:249)
at 
org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:248)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:131)
at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:523)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1535)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:526)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
```

### Does this PR introduce _any_ user-facing change?
Yes, it adds the UDF name to the UDF failure error message. Before this 
change:
> [FAILED_EXECUTE_UDF] Failed to execute user defined function 
(QueryExecutionErrorsSuite$$Lambda$970/181260145: (string, int) => string).

After this change:
> [FAILED_EXECUTE_UDF] Failed to execute user defined function (nextChar in 
QueryExecutionErrorsSuite$$Lambda$970/181260145: (string, int) => string).

### How was this patch tested?
Unit test added.

Closes #41599 from robreeves/roreeves/roreeves/udf_error.

Lead-authored-by: Rob Reeves 
Co-authored-by: Rob Reeves 
Signed-off-by: Max Gekk 
---
 .../spark/sql/catalyst/expressions/ScalaUDF.scala  |  6 ++--
 .../spark/sql/errors/QueryExecutionErrors.scala|  4 +--
 .../sql/errors/QueryExecutionErrorsSuite.scala | 35 ++
 .../spark/sql/hive/execution/HiveUDFSuite.scala|  6 ++--
 4 files changed, 39 insertions(+), 12 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index 137a8976a40..40274a83340 100644
--- 

[spark] branch master updated: [SPARK-43742][SQL][FOLLOWUP] Do not use null literal as default value for non-nullable columns

2023-06-21 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9b1e124b0bd [SPARK-43742][SQL][FOLLOWUP] Do not use null literal as 
default value for non-nullable columns
9b1e124b0bd is described below

commit 9b1e124b0bd0082e7ee13de56c4380783f816834
Author: Wenchen Fan 
AuthorDate: Wed Jun 21 07:34:32 2023 -0700

[SPARK-43742][SQL][FOLLOWUP] Do not use null literal as default value for 
non-nullable columns

### What changes were proposed in this pull request?

A followup of https://github.com/apache/spark/pull/41262 to fix a mistake. 
If a column has no default value and is not nullable, we should fail if people 
want to use its default value via the explicit `DEFAULT` name, and do not fill 
missing columns in INSERT.

### Why are the changes needed?

fix a wrong behavior

### Does this PR introduce _any_ user-facing change?

yes, otherwise the DML command will fail later at runtime.

### How was this patch tested?

new tests

Closes #41656 from cloud-fan/def-val.

Lead-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/resources/error/error-classes.json   |   6 +
 .../sql/catalyst/analysis/AssignmentUtils.scala|   3 +-
 .../catalyst/analysis/TableOutputResolver.scala|   4 +-
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  |  47 ++-
 .../execution/command/PlanResolutionSuite.scala| 370 -
 5 files changed, 262 insertions(+), 168 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index e35adcfbb5a..1d2f25b72f3 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1777,6 +1777,12 @@
 },
 "sqlState" : "46110"
   },
+  "NO_DEFAULT_COLUMN_VALUE_AVAILABLE" : {
+"message" : [
+  "Can't determine the default value for  since it is not 
nullable and it has no default value."
+],
+"sqlState" : "42608"
+  },
   "NO_HANDLER_FOR_UDAF" : {
 "message" : [
   "No handler for UDAF ''. Use 
sparkSession.udf.register(...) instead."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
index 069cef6b361..fa953c90532 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
@@ -104,7 +104,8 @@ object AssignmentUtils extends SQLConfHelper with 
CastSupport {
 case assignment if assignment.key.semanticEquals(attr) => assignment
   }
   val resolvedValue = if (matchingAssignments.isEmpty) {
-val defaultExpr = getDefaultValueExprOrNullLit(attr, conf)
+val defaultExpr = getDefaultValueExprOrNullLit(
+  attr, conf.useNullsForMissingDefaultColumnValues)
 if (defaultExpr.isEmpty) {
   errors += s"No assignment for '${attr.name}'"
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 3b721cf5d0d..6718020685b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -67,7 +67,7 @@ object TableOutputResolver {
   val fillDefaultValue = supportColDefaultValue && actualExpectedCols.size 
> query.output.size
   val queryOutputCols = if (fillDefaultValue) {
 query.output ++ actualExpectedCols.drop(query.output.size).flatMap { 
expectedCol =>
-  getDefaultValueExprOrNullLit(expectedCol, conf)
+  getDefaultValueExprOrNullLit(expectedCol, 
conf.useNullsForMissingDefaultColumnValues)
 }
   } else {
 query.output
@@ -185,7 +185,7 @@ object TableOutputResolver {
   val newColPath = colPath :+ expectedCol.name
   if (matched.isEmpty) {
 val defaultExpr = if (fillDefaultValue) {
-  getDefaultValueExprOrNullLit(expectedCol, conf)
+  getDefaultValueExprOrNullLit(expectedCol, 
conf.useNullsForMissingDefaultColumnValues)
 } else {
   None
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 2169137685d..26efa8c8df2 100644
--- 

[spark] branch master updated (ac1e2223105 -> 6a70f756dd1)

2023-06-21 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from ac1e2223105 [SPARK-43903][PYTHON][CONNECT] Improve ArrayType input 
support in Arrow Python UDF
 add 6a70f756dd1 [SPARK-44109][CORE] Remove duplicate preferred locations 
of each RDD partition

No new revisions were added by this update.

Summary of changes:
 .../scala/org/apache/spark/MapOutputTracker.scala  |  4 +--
 .../org/apache/spark/MapOutputTrackerSuite.scala   | 33 ++
 2 files changed, 35 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43903][PYTHON][CONNECT] Improve ArrayType input support in Arrow Python UDF

2023-06-21 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ac1e2223105 [SPARK-43903][PYTHON][CONNECT] Improve ArrayType input 
support in Arrow Python UDF
ac1e2223105 is described below

commit ac1e22231055d7e59eec5dd8c6a807252aab8b7f
Author: Xinrong Meng 
AuthorDate: Wed Jun 21 17:22:00 2023 +0800

[SPARK-43903][PYTHON][CONNECT] Improve ArrayType input support in Arrow 
Python UDF

### What changes were proposed in this pull request?
Improve ArrayType input support in Arrow Python UDF.

Previously, ArrayType is mapped to a 'np.array'; now it is mapped to a 
`list` following Pickled Python UDF.

### Why are the changes needed?
Reach parity with Pickled Python UDF.

### Does this PR introduce _any_ user-facing change?
Yes.

FROM
```py
>>> df = spark.range(1).selectExpr("array(array(1, 2), array(3, 4)) as 
nested_array")
>>> df.select(udf(lambda x: str(x), returnType='string', 
useArrow=True)("nested_array")).first()
Row((nested_array)='[array([1, 2], dtype=int32) array([3, 4], 
dtype=int32)]')
```

TO
```py
>>> df = spark.range(1).selectExpr("array(array(1, 2), array(3, 4)) as 
nested_array"
>>> df.select(udf(lambda x: str(x), returnType='string', 
useArrow=True)("nested_array")).first()
Row((nested_array)='[[1, 2], [3, 4]]')
```

### How was this patch tested?
Unit tests.

Closes #41603 from xinrong-meng/ndarr.

Authored-by: Xinrong Meng 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/pandas/serializers.py  |  9 ++--
 python/pyspark/sql/pandas/types.py| 61 ---
 python/pyspark/sql/tests/test_arrow_python_udf.py | 17 +--
 python/pyspark/worker.py  |  2 +
 4 files changed, 62 insertions(+), 27 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 12d0bee88ad..307fcc33752 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -172,7 +172,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
 self._timezone = timezone
 self._safecheck = safecheck
 
-def arrow_to_pandas(self, arrow_column, struct_in_pandas="dict"):
+def arrow_to_pandas(self, arrow_column, struct_in_pandas="dict", 
ndarray_as_list=False):
 # If the given column is a date type column, creates a series of 
datetime.date directly
 # instead of creating datetime64[ns] as intermediate data to avoid 
overflow caused by
 # datetime64[ns] type handling.
@@ -186,6 +186,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
 timezone=self._timezone,
 struct_in_pandas=struct_in_pandas,
 error_on_duplicated_field_names=True,
+ndarray_as_list=ndarray_as_list,
 )
 return converter(s)
 
@@ -317,11 +318,13 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
 assign_cols_by_name,
 df_for_struct=False,
 struct_in_pandas="dict",
+ndarray_as_list=False,
 ):
 super(ArrowStreamPandasUDFSerializer, self).__init__(timezone, 
safecheck)
 self._assign_cols_by_name = assign_cols_by_name
 self._df_for_struct = df_for_struct
 self._struct_in_pandas = struct_in_pandas
+self._ndarray_as_list = ndarray_as_list
 
 def arrow_to_pandas(self, arrow_column):
 import pyarrow.types as types
@@ -331,14 +334,14 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
 
 series = [
 super(ArrowStreamPandasUDFSerializer, self)
-.arrow_to_pandas(column, self._struct_in_pandas)
+.arrow_to_pandas(column, self._struct_in_pandas, 
self._ndarray_as_list)
 .rename(field.name)
 for column, field in zip(arrow_column.flatten(), 
arrow_column.type)
 ]
 s = pd.concat(series, axis=1)
 else:
 s = super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(
-arrow_column, self._struct_in_pandas
+arrow_column, self._struct_in_pandas, self._ndarray_as_list
 )
 return s
 
diff --git a/python/pyspark/sql/pandas/types.py 
b/python/pyspark/sql/pandas/types.py
index 757deff6130..53362047604 100644
--- a/python/pyspark/sql/pandas/types.py
+++ b/python/pyspark/sql/pandas/types.py
@@ -494,6 +494,7 @@ def _create_converter_to_pandas(
 struct_in_pandas: Optional[str] = None,
 error_on_duplicated_field_names: bool = True,
 timestamp_utc_localized: bool = True,
+ndarray_as_list: bool = False,
 ) -> Callable[["pd.Series"], "pd.Series"]:
 """
 Create a 

[spark] branch master updated: [SPARK-44004][SQL] Assign name & improve error message for frequent LEGACY errors

2023-06-21 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 94031ead786 [SPARK-44004][SQL] Assign name & improve error message for 
frequent LEGACY errors
94031ead786 is described below

commit 94031ead78682bd5c1adab8b87e61055968c8998
Author: itholic 
AuthorDate: Wed Jun 21 10:36:04 2023 +0300

[SPARK-44004][SQL] Assign name & improve error message for frequent LEGACY 
errors

### What changes were proposed in this pull request?

This PR proposes to assign name & improve error message for frequent LEGACY 
errors.

### Why are the changes needed?

To improve the errors that most frequently occurring.

### Does this PR introduce _any_ user-facing change?

No API changes, it's only for errors.

### How was this patch tested?

The existing CI should passed.

Closes #41504 from itholic/naming_top_error_class.

Authored-by: itholic 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   | 80 +++---
 .../spark/sql/catalyst/analysis/Analyzer.scala |  4 +-
 .../catalyst/analysis/ResolveInlineTables.scala|  5 +-
 .../spark/sql/catalyst/analysis/unresolved.scala   |  3 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  | 22 +++---
 .../spark/sql/errors/QueryParsingErrors.scala  |  2 +-
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala |  5 +-
 .../catalyst/analysis/ResolveSubquerySuite.scala   | 11 ++-
 .../catalyst/parser/ExpressionParserSuite.scala| 10 +--
 .../analyzer-results/ansi/literals.sql.out | 10 +--
 .../columnresolution-negative.sql.out  |  6 +-
 .../analyzer-results/join-lateral.sql.out  |  6 +-
 .../sql-tests/analyzer-results/literals.sql.out| 10 +--
 .../analyzer-results/postgreSQL/boolean.sql.out|  5 +-
 .../postgreSQL/window_part3.sql.out|  5 +-
 .../postgreSQL/window_part4.sql.out|  5 +-
 .../table-valued-functions.sql.out |  4 +-
 .../sql-tests/results/ansi/literals.sql.out| 10 +--
 .../results/columnresolution-negative.sql.out  |  6 +-
 .../sql-tests/results/join-lateral.sql.out |  6 +-
 .../resources/sql-tests/results/literals.sql.out   | 10 +--
 .../sql-tests/results/postgreSQL/boolean.sql.out   |  5 +-
 .../results/postgreSQL/window_part3.sql.out|  5 +-
 .../results/postgreSQL/window_part4.sql.out|  5 +-
 .../results/table-valued-functions.sql.out |  4 +-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 12 ++--
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |  6 +-
 .../spark/sql/execution/SQLViewTestSuite.scala |  4 +-
 28 files changed, 134 insertions(+), 132 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index d9e729effeb..e35adcfbb5a 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -157,6 +157,11 @@
 ],
 "sqlState" : "22018"
   },
+  "CANNOT_PARSE_INTERVAL" : {
+"message" : [
+  "Unable to parse . Please ensure that the value provided 
is in a valid format for defining an interval. You can reference the 
documentation for the correct format. If the issue persists, please double 
check that the input value is not null or empty and try again."
+]
+  },
   "CANNOT_PARSE_JSON_FIELD" : {
 "message" : [
   "Cannot parse the field name  and the value  of 
the JSON token type  to target Spark data type ."
@@ -191,6 +196,11 @@
 ],
 "sqlState" : "0AKD0"
   },
+  "CANNOT_RESOLVE_STAR_EXPAND" : {
+"message" : [
+  "Cannot resolve .* given input columns . Please 
check that the specified table or struct exists and is accessible in the input 
columns."
+]
+  },
   "CANNOT_RESTORE_PERMISSIONS_FOR_PATH" : {
 "message" : [
   "Failed to set permissions on created path  back to ."
@@ -689,6 +699,11 @@
 ],
 "sqlState" : "42K04"
   },
+  "FAILED_SQL_EXPRESSION_EVALUATION" : {
+"message" : [
+  "Failed to evaluate the SQL expression: . Please check your 
syntax and ensure all required tables and columns are available."
+]
+  },
   "FIELD_NOT_FOUND" : {
 "message" : [
   "No such struct field  in ."
@@ -1222,6 +1237,11 @@
   }
 }
   },
+  "INVALID_NUMERIC_LITERAL_RANGE" : {
+"message" : [
+  "Numeric literal  is outside the valid range for 
 with minimum value of  and maximum value of . 
Please adjust the value accordingly."
+]
+  },
   "INVALID_OPTIONS" : {
 "message" : [
   "Invalid options:"
@@ -1497,6 +1517,11 @@
   "The url is invalid: . If necessary set  to \"false\" 
to bypass this error."
 ]
   },
+  "INVALID_VIEW_TEXT" : {
+"message" : [
+  "The view  

[spark] branch master updated: [SPARK-44106][PYTHON][CONNECT] Add `__repr__` for `GroupedData`

2023-06-21 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5facaece4df [SPARK-44106][PYTHON][CONNECT] Add `__repr__` for 
`GroupedData`
5facaece4df is described below

commit 5facaece4dfa1fa45e8c8f7bd7d92f11e2c91fd8
Author: Ruifeng Zheng 
AuthorDate: Wed Jun 21 14:48:51 2023 +0800

[SPARK-44106][PYTHON][CONNECT] Add `__repr__` for `GroupedData`

### What changes were proposed in this pull request?
Add `__repr__` for `GroupedData`

### Why are the changes needed?
 `GroupedData.__repr__` is missing

### Does this PR introduce _any_ user-facing change?
yes

1. On Scala side:
```
scala> val df = Seq(("414243", "4243")).toDF("e", "f")
df: org.apache.spark.sql.DataFrame = [e: string, f: string]

scala> df.groupBy("e")
res0: org.apache.spark.sql.RelationalGroupedDataset = 
RelationalGroupedDataset: [grouping expressions: [e: string], value: [e: 
string, f: string], type: GroupBy]

scala> df.groupBy(df.col("e"))
res1: org.apache.spark.sql.RelationalGroupedDataset = 
RelationalGroupedDataset: [grouping expressions: [e: string], value: [e: 
string, f: string], type: GroupBy]
```

2. On vanilla PySpark:

before this PR:
```
In [1]: df = spark.createDataFrame([("414243", "4243",)], ["e", "f"])

In [2]: df
Out[2]: DataFrame[e: string, f: string]

In [3]: df.groupBy("e")
Out[3]: 

In [4]: df.groupBy(df.e)
Out[4]: 

```

after this PR:
```
In [1]: df = spark.createDataFrame([("414243", "4243",)], ["e", "f"])

In [2]: df
Out[2]: DataFrame[e: string, f: string]

In [3]: df.groupBy("e")
Out[3]: GroupedData[grouping expressions: [e], value: [e: string, f: 
string], type: GroupBy]

In [4]: df.groupBy(df.e)
Out[4]: GroupedData[grouping expressions: [e: string], value: [e: string, 
f: string], type: GroupBy]
```

3. On Spark Connect Python Client:
before this PR:
```
In [1]: df = spark.createDataFrame([("414243", "4243",)], ["e", "f"])

In [2]: df
Out[2]: DataFrame[e: string, f: string]

In [3]: df.groupBy("e")
Out[3]: 

In [4]: df.groupBy(df.e)
Out[4]: 
```

after this PR:
```
In [1]: df = spark.createDataFrame([("414243", "4243",)], ["e", "f"])

In [2]: df
Out[2]: DataFrame[e: string, f: string]

In [3]: df.groupBy("e")
Out[3]: GroupedData[grouping expressions: [e], value: [e: string, f: 
string], type: GroupBy]

In [4]: df.groupBy(df.e)
Out[4]: GroupedData[grouping expressions: [e], value: [e: string, f: 
string], type: GroupBy] // different from vanilla PySpark
```

Note that since the expressions in Python Client are not resolved, the 
string can be different from vanilla PySpark.

### How was this patch tested?
added doctests

Closes #41674 from zhengruifeng/group_repr.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/group.py | 19 +++
 python/pyspark/sql/group.py | 11 +++
 2 files changed, 30 insertions(+)

diff --git a/python/pyspark/sql/connect/group.py 
b/python/pyspark/sql/connect/group.py
index e75c8029ef2..a393d2cb37e 100644
--- a/python/pyspark/sql/connect/group.py
+++ b/python/pyspark/sql/connect/group.py
@@ -83,6 +83,25 @@ class GroupedData:
 self._pivot_col = pivot_col
 self._pivot_values = pivot_values
 
+def __repr__(self) -> str:
+# the expressions are not resolved here,
+# so the string representation can be different from vanilla PySpark.
+grouping_str = ", ".join(str(e._expr) for e in self._grouping_cols)
+grouping_str = f"grouping expressions: [{grouping_str}]"
+
+value_str = ", ".join("%s: %s" % c for c in self._df.dtypes)
+
+if self._group_type == "groupby":
+type_str = "GroupBy"
+elif self._group_type == "rollup":
+type_str = "RollUp"
+elif self._group_type == "cube":
+type_str = "Cube"
+else:
+type_str = "Pivot"
+
+return f"GroupedData[{grouping_str}, value: [{value_str}], type: 
{type_str}]"
+
 @overload
 def agg(self, *exprs: Column) -> "DataFrame":
 ...
diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py
index e33e3d6ec5e..9568a971229 100644
--- a/python/pyspark/sql/group.py
+++ b/python/pyspark/sql/group.py
@@ -70,6 +70,14 @@ class GroupedData(PandasGroupedOpsMixin):
 self._df = df
 self.session: SparkSession = df.sparkSession
 
+def __repr__(self) -> str:
+index = 26  # index to truncate string from the JVM side
+jvm_string = self._jgd.toString()
+  

[spark] branch master updated: [SPARK-43511][CONNECT][SS] Implemented MapGroupsWithState and FlatMapGroupsWithState APIs for Spark Connect

2023-06-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8a16aed9a17 [SPARK-43511][CONNECT][SS] Implemented MapGroupsWithState 
and FlatMapGroupsWithState APIs for Spark Connect
8a16aed9a17 is described below

commit 8a16aed9a17269b4c8111779229507e3c28ba945
Author: bogao007 
AuthorDate: Wed Jun 21 15:35:34 2023 +0900

[SPARK-43511][CONNECT][SS] Implemented MapGroupsWithState and 
FlatMapGroupsWithState APIs for Spark Connect

### What changes were proposed in this pull request?

Implemented MapGroupsWithState and FlatMapGroupsWithState APIs for Spark 
Connect

### Why are the changes needed?

To support streaming state APIs in Spark Connect

### Does this PR introduce _any_ user-facing change?

yes

### How was this patch tested?

Added unit test

Closes #41558 from bogao007/sc-state-api.

Authored-by: bogao007 
Signed-off-by: Hyukjin Kwon 
---
 .../apache/spark/sql/KeyValueGroupedDataset.scala  | 398 +
 .../sql/KeyValueGroupedDatasetE2ETestSuite.scala   | 107 ++
 .../CheckConnectJvmClientCompatibility.scala   |   6 -
 .../FlatMapGroupsWithStateStreamingSuite.scala | 224 
 .../function/FlatMapGroupsWithStateFunction.java   |  39 ++
 .../java/function/MapGroupsWithStateFunction.java  |  38 ++
 .../main/protobuf/spark/connect/relations.proto|  16 +
 .../apache/spark/sql/connect/common/UdfUtils.scala |  26 ++
 .../apache/spark/sql/streaming/GroupState.scala| 336 +
 .../sql/connect/planner/SparkConnectPlanner.scala  |  92 -
 python/pyspark/sql/connect/proto/relations_pb2.py  |  24 +-
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  84 -
 12 files changed, 1359 insertions(+), 31 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 7b2fa3b52be..20c130b83cb 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -29,6 +29,7 @@ import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.ProductEncoder
 import org.apache.spark.sql.connect.common.UdfUtils
 import org.apache.spark.sql.expressions.ScalarUserDefinedFunction
 import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, 
OutputMode}
 
 /**
  * A [[Dataset]] has been logically grouped by a user specified grouping key. 
Users should not
@@ -460,6 +461,356 @@ abstract class KeyValueGroupedDataset[K, V] private[sql] 
() extends Serializable
 cogroupSorted(other)(thisSortExprs: _*)(otherSortExprs: _*)(
   UdfUtils.coGroupFunctionToScalaFunc(f))(encoder)
   }
+
+  protected def flatMapGroupsWithStateHelper[S: Encoder, U: Encoder](
+  outputMode: Option[OutputMode],
+  timeoutConf: GroupStateTimeout,
+  initialState: Option[KeyValueGroupedDataset[K, S]],
+  isMapGroupWithState: Boolean)(
+  func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U] = {
+throw new UnsupportedOperationException
+  }
+
+  /**
+   * (Scala-specific) Applies the given function to each group of data, while 
maintaining a
+   * user-defined per-group state. The result Dataset will represent the 
objects returned by the
+   * function. For a static batch Dataset, the function will be invoked once 
per group. For a
+   * streaming Dataset, the function will be invoked for each group repeatedly 
in every trigger,
+   * and updates to each group's state will be saved across invocations. See
+   * [[org.apache.spark.sql.streaming.GroupState]] for more details.
+   *
+   * @tparam S
+   *   The type of the user-defined state. Must be encodable to Spark SQL 
types.
+   * @tparam U
+   *   The type of the output objects. Must be encodable to Spark SQL types.
+   * @param func
+   *   Function to be called on every group.
+   *
+   * See [[Encoder]] for more details on what types are encodable to Spark SQL.
+   * @since 3.5.0
+   */
+  def mapGroupsWithState[S: Encoder, U: Encoder](
+  func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
+mapGroupsWithState(GroupStateTimeout.NoTimeout)(func)
+  }
+
+  /**
+   * (Scala-specific) Applies the given function to each group of data, while 
maintaining a
+   * user-defined per-group state. The result Dataset will represent the 
objects returned by the
+   * function. For a static batch Dataset, the function will be invoked once 
per group. For a
+   * streaming Dataset, the function will be invoked for each group repeatedly 
in every trigger,
+   * and updates