[spark] branch branch-3.4 updated: [SPARK-42924][SQL][CONNECT][PYTHON] Clarify the comment of parameterized SQL args

2023-03-26 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new dde9de60483 [SPARK-42924][SQL][CONNECT][PYTHON] Clarify the comment of 
parameterized SQL args
dde9de60483 is described below

commit dde9de60483ebb0b57013387dcfb8d5075cb6d0c
Author: Max Gekk 
AuthorDate: Mon Mar 27 08:54:18 2023 +0300

[SPARK-42924][SQL][CONNECT][PYTHON] Clarify the comment of parameterized 
SQL args

### What changes were proposed in this pull request?
In the PR, I propose to clarify the comment of `args` in parameterized 
`sql()`.

### Why are the changes needed?
To make the comment more clear and highlight that input strings are parsed 
(not evaluated), and considered as SQL literal expressions. Also while parsing 
the fragments w/ SQL comments in the string values are skipped.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By checking coding style:
```
$ ./dev/lint-python
$ ./dev/scalastyle
```

Closes #40508 from MaxGekk/parameterized-sql-doc.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
(cherry picked from commit c55c7ea6fc92c3733543d5f3d99eb00921cbe564)
Signed-off-by: Max Gekk 
---
 .../jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala | 10 --
 .../common/src/main/protobuf/spark/connect/commands.proto  |  5 -
 .../common/src/main/protobuf/spark/connect/relations.proto |  5 -
 python/pyspark/pandas/sql_formatter.py |  6 --
 python/pyspark/sql/connect/proto/commands_pb2.pyi  |  6 +-
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  6 +-
 python/pyspark/sql/session.py  |  6 --
 .../src/main/scala/org/apache/spark/sql/SparkSession.scala | 10 --
 8 files changed, 42 insertions(+), 12 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index f1e82507393..548545b969d 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -213,7 +213,10 @@ class SparkSession private[sql] (
* @param sqlText
*   A SQL statement with named parameters to execute.
* @param args
-   *   A map of parameter names to literal values.
+   *   A map of parameter names to string values that are parsed as SQL 
literal expressions. For
+   *   example, map keys: "rank", "name", "birthdate"; map values: "1", 
"'Steven'",
+   *   "DATE'2023-03-21'". The fragments of string values belonged to SQL 
comments are skipped
+   *   while parsing.
*
* @since 3.4.0
*/
@@ -229,7 +232,10 @@ class SparkSession private[sql] (
* @param sqlText
*   A SQL statement with named parameters to execute.
* @param args
-   *   A map of parameter names to literal values.
+   *   A map of parameter names to string values that are parsed as SQL 
literal expressions. For
+   *   example, map keys: "rank", "name", "birthdate"; map values: "1", 
"'Steven'",
+   *   "DATE'2023-03-21'". The fragments of string values belonged to SQL 
comments are skipped
+   *   while parsing.
*
* @since 3.4.0
*/
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
index 3ffbe83bded..604421fdd4f 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
@@ -53,7 +53,10 @@ message SqlCommand {
   // (Required) SQL Query.
   string sql = 1;
 
-  // (Optional) A map of parameter names to literal values.
+  // (Optional) A map of parameter names to string values that are parsed as
+  // SQL literal expressions. For example, map keys: "rank", "name", 
"birthdate";
+  // map values: "1", "'Steven'", "DATE'2023-03-21'". The fragments of string 
values
+  // belonged to SQL comments are skipped while parsing.
   map args = 2;
 }
 
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index 9dec94411a9..976bd68e7fe 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -108,7 +108,10 @@ message SQL {
   // (Required) The SQL query.
   string query = 1;
 
-  // (Optional) A map of parameter names to literal values.
+  // (Optional) A map of parameter names to string values that are parsed a

[spark] branch master updated: [SPARK-42924][SQL][CONNECT][PYTHON] Clarify the comment of parameterized SQL args

2023-03-26 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c55c7ea6fc9 [SPARK-42924][SQL][CONNECT][PYTHON] Clarify the comment of 
parameterized SQL args
c55c7ea6fc9 is described below

commit c55c7ea6fc92c3733543d5f3d99eb00921cbe564
Author: Max Gekk 
AuthorDate: Mon Mar 27 08:54:18 2023 +0300

[SPARK-42924][SQL][CONNECT][PYTHON] Clarify the comment of parameterized 
SQL args

### What changes were proposed in this pull request?
In the PR, I propose to clarify the comment of `args` in parameterized 
`sql()`.

### Why are the changes needed?
To make the comment more clear and highlight that input strings are parsed 
(not evaluated), and considered as SQL literal expressions. Also while parsing 
the fragments w/ SQL comments in the string values are skipped.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By checking coding style:
```
$ ./dev/lint-python
$ ./dev/scalastyle
```

Closes #40508 from MaxGekk/parameterized-sql-doc.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 .../jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala | 10 --
 .../common/src/main/protobuf/spark/connect/commands.proto  |  5 -
 .../common/src/main/protobuf/spark/connect/relations.proto |  5 -
 python/pyspark/pandas/sql_formatter.py |  6 --
 python/pyspark/sql/connect/proto/commands_pb2.pyi  |  6 +-
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  6 +-
 python/pyspark/sql/session.py  |  6 --
 .../src/main/scala/org/apache/spark/sql/SparkSession.scala | 10 --
 8 files changed, 42 insertions(+), 12 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index f1e82507393..548545b969d 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -213,7 +213,10 @@ class SparkSession private[sql] (
* @param sqlText
*   A SQL statement with named parameters to execute.
* @param args
-   *   A map of parameter names to literal values.
+   *   A map of parameter names to string values that are parsed as SQL 
literal expressions. For
+   *   example, map keys: "rank", "name", "birthdate"; map values: "1", 
"'Steven'",
+   *   "DATE'2023-03-21'". The fragments of string values belonged to SQL 
comments are skipped
+   *   while parsing.
*
* @since 3.4.0
*/
@@ -229,7 +232,10 @@ class SparkSession private[sql] (
* @param sqlText
*   A SQL statement with named parameters to execute.
* @param args
-   *   A map of parameter names to literal values.
+   *   A map of parameter names to string values that are parsed as SQL 
literal expressions. For
+   *   example, map keys: "rank", "name", "birthdate"; map values: "1", 
"'Steven'",
+   *   "DATE'2023-03-21'". The fragments of string values belonged to SQL 
comments are skipped
+   *   while parsing.
*
* @since 3.4.0
*/
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
index 3ffbe83bded..604421fdd4f 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
@@ -53,7 +53,10 @@ message SqlCommand {
   // (Required) SQL Query.
   string sql = 1;
 
-  // (Optional) A map of parameter names to literal values.
+  // (Optional) A map of parameter names to string values that are parsed as
+  // SQL literal expressions. For example, map keys: "rank", "name", 
"birthdate";
+  // map values: "1", "'Steven'", "DATE'2023-03-21'". The fragments of string 
values
+  // belonged to SQL comments are skipped while parsing.
   map args = 2;
 }
 
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index 9dec94411a9..976bd68e7fe 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -108,7 +108,10 @@ message SQL {
   // (Required) The SQL query.
   string query = 1;
 
-  // (Optional) A map of parameter names to literal values.
+  // (Optional) A map of parameter names to string values that are parsed as
+  // SQL literal expressions. For example, map keys: "rank", "name", 
"birthdate";
+  // map values: "1", "

[spark] branch branch-3.4 updated: [SPARK-42899][SQL][FOLLOWUP] Project.reconcileColumnType should use KnownNotNull instead of AssertNotNull

2023-03-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new aba1c3b3e66 [SPARK-42899][SQL][FOLLOWUP] Project.reconcileColumnType 
should use KnownNotNull instead of AssertNotNull
aba1c3b3e66 is described below

commit aba1c3b3e66e430fa093271af243eee9f30d1d3c
Author: Takuya UESHIN 
AuthorDate: Mon Mar 27 11:19:12 2023 +0800

[SPARK-42899][SQL][FOLLOWUP] Project.reconcileColumnType should use 
KnownNotNull instead of AssertNotNull

### What changes were proposed in this pull request?

This is a follow-up of #40526.

`Project.reconcileColumnType` should use `KnownNotNull` instead of 
`AssertNotNull`, also only when `col.nullable`.

### Why are the changes needed?

There is a better expression, `KnownNotNull`, for this kind of issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #40546 from ueshin/issues/SPARK-42899/KnownNotNull.

Authored-by: Takuya UESHIN 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 62b9763a6fd9437647021bbb4433034566ba0a42)
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/plans/logical/basicLogicalOperators.scala   | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index ca6203a9c96..cdb4ba3fe22 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -23,7 +23,6 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
 import 
org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
TypedImperativeAggregate}
-import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition}
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
@@ -119,7 +118,11 @@ object Project {
   case (StructType(fields), expected: StructType) =>
 val newFields = reorderFields(
   fields.zipWithIndex.map { case (f, index) =>
-(f.name, GetStructField(AssertNotNull(col, columnPath), index))
+if (col.nullable) {
+  (f.name, GetStructField(KnownNotNull(col), index))
+} else {
+  (f.name, GetStructField(col, index))
+}
   },
   expected.fields,
   columnPath,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-42899][SQL][FOLLOWUP] Project.reconcileColumnType should use KnownNotNull instead of AssertNotNull

2023-03-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 62b9763a6fd [SPARK-42899][SQL][FOLLOWUP] Project.reconcileColumnType 
should use KnownNotNull instead of AssertNotNull
62b9763a6fd is described below

commit 62b9763a6fd9437647021bbb4433034566ba0a42
Author: Takuya UESHIN 
AuthorDate: Mon Mar 27 11:19:12 2023 +0800

[SPARK-42899][SQL][FOLLOWUP] Project.reconcileColumnType should use 
KnownNotNull instead of AssertNotNull

### What changes were proposed in this pull request?

This is a follow-up of #40526.

`Project.reconcileColumnType` should use `KnownNotNull` instead of 
`AssertNotNull`, also only when `col.nullable`.

### Why are the changes needed?

There is a better expression, `KnownNotNull`, for this kind of issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #40546 from ueshin/issues/SPARK-42899/KnownNotNull.

Authored-by: Takuya UESHIN 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/plans/logical/basicLogicalOperators.scala   | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index b77370fc5e7..09807f53c6a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -23,7 +23,6 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
 import 
org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
TypedImperativeAggregate}
-import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition}
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
@@ -120,7 +119,11 @@ object Project {
   case (StructType(fields), expected: StructType) =>
 val newFields = reorderFields(
   fields.zipWithIndex.map { case (f, index) =>
-(f.name, GetStructField(AssertNotNull(col, columnPath), index))
+if (col.nullable) {
+  (f.name, GetStructField(KnownNotNull(col), index))
+} else {
+  (f.name, GetStructField(col, index))
+}
   },
   expected.fields,
   columnPath,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (06bf544973f -> 02c33524028)

2023-03-26 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 06bf544973f [SPARK-42896][SQL][PYTHON] Make `mapInPandas` / 
`mapInArrow` support barrier mode execution
 add 02c33524028 [SPARK-42921][SQL][TESTS] Split 
`timestampNTZ/datetime-special.sql` into w/ and w/o `ansi` suffix to pass sql 
analyzer test in ansi mode

No new revisions were added by this update.

Summary of changes:
 .../{datetime-special.sql.out => datetime-special-ansi.sql.out}   | 4 ++--
 .../timestampNTZ/{timestamp-ansi.sql => datetime-special-ansi.sql}| 2 +-
 .../test/resources/sql-tests/inputs/timestampNTZ/datetime-special.sql | 1 +
 .../{datetime-special.sql.out => datetime-special-ansi.sql.out}   | 0
 .../spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala  | 4 +++-
 5 files changed, 7 insertions(+), 4 deletions(-)
 copy 
sql/core/src/test/resources/sql-tests/analyzer-results/timestampNTZ/{datetime-special.sql.out
 => datetime-special-ansi.sql.out} (62%)
 copy 
sql/core/src/test/resources/sql-tests/inputs/timestampNTZ/{timestamp-ansi.sql 
=> datetime-special-ansi.sql} (54%)
 copy 
sql/core/src/test/resources/sql-tests/results/timestampNTZ/{datetime-special.sql.out
 => datetime-special-ansi.sql.out} (100%)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-42896][SQL][PYTHON] Make `mapInPandas` / `mapInArrow` support barrier mode execution

2023-03-26 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 06bf544973f [SPARK-42896][SQL][PYTHON] Make `mapInPandas` / 
`mapInArrow` support barrier mode execution
06bf544973f is described below

commit 06bf544973f4e221c569487473fbe3268543ebb7
Author: Weichen Xu 
AuthorDate: Mon Mar 27 09:39:48 2023 +0800

[SPARK-42896][SQL][PYTHON] Make `mapInPandas` / `mapInArrow` support 
barrier mode execution

### What changes were proposed in this pull request?

Make mapInPandas / mapInArrow support barrier mode execution

### Why are the changes needed?

This is the preparation PR for supporting mapInPandas / mapInArrow barrier 
execution in spark connect mode. The feature is required by machine learning 
use cases.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Closes #40520 from WeichenXu123/barrier-udf.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 .../sql/connect/planner/SparkConnectPlanner.scala  |  6 +++--
 python/pyspark/sql/pandas/map_ops.py   | 26 ++
 .../catalyst/analysis/DeduplicateRelations.scala   |  4 ++--
 .../plans/logical/pythonLogicalOperators.scala |  6 +++--
 .../sql/catalyst/analysis/AnalysisSuite.scala  |  3 ++-
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 10 +
 .../spark/sql/execution/SparkStrategies.scala  |  8 +++
 .../sql/execution/python/MapInBatchExec.scala  | 10 -
 .../sql/execution/python/MapInPandasExec.scala |  3 ++-
 .../execution/python/PythonMapInArrowExec.scala|  3 ++-
 10 files changed, 57 insertions(+), 22 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 13052ec9b01..e7911ccdf11 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -489,12 +489,14 @@ class SparkConnectPlanner(val session: SparkSession) {
 logical.MapInPandas(
   pythonUdf,
   pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-  transformRelation(rel.getInput))
+  transformRelation(rel.getInput),
+  false)
   case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
 logical.PythonMapInArrow(
   pythonUdf,
   pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-  transformRelation(rel.getInput))
+  transformRelation(rel.getInput),
+  false)
   case _ =>
 throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} 
is not supported")
 }
diff --git a/python/pyspark/sql/pandas/map_ops.py 
b/python/pyspark/sql/pandas/map_ops.py
index 47b17578ae1..a4c0c94844b 100644
--- a/python/pyspark/sql/pandas/map_ops.py
+++ b/python/pyspark/sql/pandas/map_ops.py
@@ -32,7 +32,7 @@ class PandasMapOpsMixin:
 """
 
 def mapInPandas(
-self, func: "PandasMapIterFunction", schema: Union[StructType, str]
+self, func: "PandasMapIterFunction", schema: Union[StructType, str], 
isBarrier: bool = False
 ) -> "DataFrame":
 """
 Maps an iterator of batches in the current :class:`DataFrame` using a 
Python native
@@ -60,6 +60,7 @@ class PandasMapOpsMixin:
 schema : :class:`pyspark.sql.types.DataType` or str
 the return type of the `func` in PySpark. The value can be either a
 :class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
+isBarrier : Use barrier mode execution if True.
 
 Examples
 
@@ -74,6 +75,14 @@ class PandasMapOpsMixin:
 +---+---+
 |  1| 21|
 +---+---+
+>>> # Set isBarrier=True to force the "mapInPandas" stage running in 
barrier mode,
+>>> # it ensures all python UDF workers in the stage will be launched 
concurrently.
+>>> df.mapInPandas(filter_func, df.schema, isBarrier=True).show()  # 
doctest: +SKIP
++---+---+
+| id|age|
++---+---+
+|  1| 21|
++---+---+
 
 Notes
 -
@@ -93,11 +102,11 @@ class PandasMapOpsMixin:
 func, returnType=schema, 
functionType=PythonEvalType.SQL_MAP_PANDAS_ITER_UDF
 )  # type: ignore[call-overload]
 udf_column = udf(*[self[col] for col in self.columns])
-jdf = self._jdf.mapInPandas(udf_column._jc.expr())
+jdf = self._jdf.mapInPandas(udf_column._jc.expr(), isBarrier)
 return DataFrame(jdf, self.sparkSession)
 
 def 

[spark] branch branch-3.4 updated: [SPARK-42920][CONNECT][PYTHON] Enable tests for UDF with UDT

2023-03-26 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 31ede7330a3 [SPARK-42920][CONNECT][PYTHON] Enable tests for UDF with 
UDT
31ede7330a3 is described below

commit 31ede7330a314b18faa591a9313ed31c5c8b63c1
Author: Takuya UESHIN 
AuthorDate: Mon Mar 27 09:35:33 2023 +0900

[SPARK-42920][CONNECT][PYTHON] Enable tests for UDF with UDT

### What changes were proposed in this pull request?

Enables tests for UDF with UDT.

### Why are the changes needed?

Now that UDF with UDT should work, the related tests should be enabled to 
see if it works.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Enabled/modified the related tests.

Closes #40549 from ueshin/issues/SPARK-42920/udf_with_udt.

Authored-by: Takuya UESHIN 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 80f8664e8278335788d8fa1dd00654f3eaec8ed6)
Signed-off-by: Hyukjin Kwon 
---
 .../pyspark/sql/tests/connect/test_parity_types.py |  4 +--
 python/pyspark/sql/tests/test_types.py | 38 +++---
 2 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_types.py 
b/python/pyspark/sql/tests/connect/test_parity_types.py
index a2f81fbf25e..aacf5793b2b 100644
--- a/python/pyspark/sql/tests/connect/test_parity_types.py
+++ b/python/pyspark/sql/tests/connect/test_parity_types.py
@@ -84,8 +84,8 @@ class TypesParityTests(TypesTestsMixin, 
ReusedConnectTestCase):
 super().test_infer_schema_upcast_int_to_string()
 
 @unittest.skip("Spark Connect does not support RDD but the tests depend on 
them.")
-def test_udf_with_udt(self):
-super().test_udf_with_udt()
+def test_rdd_with_udt(self):
+super().test_rdd_with_udt()
 
 @unittest.skip("Requires JVM access.")
 def test_udt(self):
diff --git a/python/pyspark/sql/tests/test_types.py 
b/python/pyspark/sql/tests/test_types.py
index bee899e928e..5d6476b47f4 100644
--- a/python/pyspark/sql/tests/test_types.py
+++ b/python/pyspark/sql/tests/test_types.py
@@ -25,8 +25,7 @@ import sys
 import unittest
 
 from pyspark.sql import Row
-from pyspark.sql.functions import col
-from pyspark.sql.udf import UserDefinedFunction
+from pyspark.sql import functions as F
 from pyspark.errors import AnalysisException
 from pyspark.sql.types import (
 ByteType,
@@ -381,7 +380,7 @@ class TypesTestsMixin:
 try:
 self.spark.sql("set 
spark.sql.legacy.allowNegativeScaleOfDecimal=true")
 df = self.spark.createDataFrame([(1,), (11,)], ["value"])
-ret = df.select(col("value").cast(DecimalType(1, -1))).collect()
+ret = df.select(F.col("value").cast(DecimalType(1, -1))).collect()
 actual = list(map(lambda r: int(r.value), ret))
 self.assertEqual(actual, [0, 10])
 finally:
@@ -548,8 +547,6 @@ class TypesTestsMixin:
 df.collect()
 
 def test_complex_nested_udt_in_df(self):
-from pyspark.sql.functions import udf
-
 schema = StructType().add("key", LongType()).add("val", 
PythonOnlyUDT())
 df = self.spark.createDataFrame(
 [(i % 3, PythonOnlyPoint(float(i), float(i))) for i in range(10)], 
schema=schema
@@ -558,7 +555,7 @@ class TypesTestsMixin:
 
 gd = df.groupby("key").agg({"val": "collect_list"})
 gd.collect()
-udf = udf(lambda k, v: [(k, v[0])], ArrayType(df.schema))
+udf = F.udf(lambda k, v: [(k, v[0])], ArrayType(df.schema))
 gd.select(udf(*gd)).collect()
 
 def test_udt_with_none(self):
@@ -667,20 +664,27 @@ class TypesTestsMixin:
 def test_udf_with_udt(self):
 row = Row(label=1.0, point=ExamplePoint(1.0, 2.0))
 df = self.spark.createDataFrame([row])
-self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first())
-udf = UserDefinedFunction(lambda p: p.y, DoubleType())
+udf = F.udf(lambda p: p.y, DoubleType())
 self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
-udf2 = UserDefinedFunction(lambda p: ExamplePoint(p.x + 1, p.y + 1), 
ExamplePointUDT())
+udf2 = F.udf(lambda p: ExamplePoint(p.x + 1, p.y + 1), 
ExamplePointUDT())
 self.assertEqual(ExamplePoint(2.0, 3.0), 
df.select(udf2(df.point)).first()[0])
 
 row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0))
 df = self.spark.createDataFrame([row])
-self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first())
-udf = UserDefinedFunction(lambda p: p.y, DoubleType())
+udf = F.udf(lambda p: p.y, DoubleType())
 self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
-udf2 = UserDefinedFunction(lambda p: PythonOnlyPo

[spark] branch master updated: [SPARK-42920][CONNECT][PYTHON] Enable tests for UDF with UDT

2023-03-26 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 80f8664e827 [SPARK-42920][CONNECT][PYTHON] Enable tests for UDF with 
UDT
80f8664e827 is described below

commit 80f8664e8278335788d8fa1dd00654f3eaec8ed6
Author: Takuya UESHIN 
AuthorDate: Mon Mar 27 09:35:33 2023 +0900

[SPARK-42920][CONNECT][PYTHON] Enable tests for UDF with UDT

### What changes were proposed in this pull request?

Enables tests for UDF with UDT.

### Why are the changes needed?

Now that UDF with UDT should work, the related tests should be enabled to 
see if it works.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Enabled/modified the related tests.

Closes #40549 from ueshin/issues/SPARK-42920/udf_with_udt.

Authored-by: Takuya UESHIN 
Signed-off-by: Hyukjin Kwon 
---
 .../pyspark/sql/tests/connect/test_parity_types.py |  4 +--
 python/pyspark/sql/tests/test_types.py | 38 +++---
 2 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_types.py 
b/python/pyspark/sql/tests/connect/test_parity_types.py
index a2f81fbf25e..aacf5793b2b 100644
--- a/python/pyspark/sql/tests/connect/test_parity_types.py
+++ b/python/pyspark/sql/tests/connect/test_parity_types.py
@@ -84,8 +84,8 @@ class TypesParityTests(TypesTestsMixin, 
ReusedConnectTestCase):
 super().test_infer_schema_upcast_int_to_string()
 
 @unittest.skip("Spark Connect does not support RDD but the tests depend on 
them.")
-def test_udf_with_udt(self):
-super().test_udf_with_udt()
+def test_rdd_with_udt(self):
+super().test_rdd_with_udt()
 
 @unittest.skip("Requires JVM access.")
 def test_udt(self):
diff --git a/python/pyspark/sql/tests/test_types.py 
b/python/pyspark/sql/tests/test_types.py
index bee899e928e..5d6476b47f4 100644
--- a/python/pyspark/sql/tests/test_types.py
+++ b/python/pyspark/sql/tests/test_types.py
@@ -25,8 +25,7 @@ import sys
 import unittest
 
 from pyspark.sql import Row
-from pyspark.sql.functions import col
-from pyspark.sql.udf import UserDefinedFunction
+from pyspark.sql import functions as F
 from pyspark.errors import AnalysisException
 from pyspark.sql.types import (
 ByteType,
@@ -381,7 +380,7 @@ class TypesTestsMixin:
 try:
 self.spark.sql("set 
spark.sql.legacy.allowNegativeScaleOfDecimal=true")
 df = self.spark.createDataFrame([(1,), (11,)], ["value"])
-ret = df.select(col("value").cast(DecimalType(1, -1))).collect()
+ret = df.select(F.col("value").cast(DecimalType(1, -1))).collect()
 actual = list(map(lambda r: int(r.value), ret))
 self.assertEqual(actual, [0, 10])
 finally:
@@ -548,8 +547,6 @@ class TypesTestsMixin:
 df.collect()
 
 def test_complex_nested_udt_in_df(self):
-from pyspark.sql.functions import udf
-
 schema = StructType().add("key", LongType()).add("val", 
PythonOnlyUDT())
 df = self.spark.createDataFrame(
 [(i % 3, PythonOnlyPoint(float(i), float(i))) for i in range(10)], 
schema=schema
@@ -558,7 +555,7 @@ class TypesTestsMixin:
 
 gd = df.groupby("key").agg({"val": "collect_list"})
 gd.collect()
-udf = udf(lambda k, v: [(k, v[0])], ArrayType(df.schema))
+udf = F.udf(lambda k, v: [(k, v[0])], ArrayType(df.schema))
 gd.select(udf(*gd)).collect()
 
 def test_udt_with_none(self):
@@ -667,20 +664,27 @@ class TypesTestsMixin:
 def test_udf_with_udt(self):
 row = Row(label=1.0, point=ExamplePoint(1.0, 2.0))
 df = self.spark.createDataFrame([row])
-self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first())
-udf = UserDefinedFunction(lambda p: p.y, DoubleType())
+udf = F.udf(lambda p: p.y, DoubleType())
 self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
-udf2 = UserDefinedFunction(lambda p: ExamplePoint(p.x + 1, p.y + 1), 
ExamplePointUDT())
+udf2 = F.udf(lambda p: ExamplePoint(p.x + 1, p.y + 1), 
ExamplePointUDT())
 self.assertEqual(ExamplePoint(2.0, 3.0), 
df.select(udf2(df.point)).first()[0])
 
 row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0))
 df = self.spark.createDataFrame([row])
-self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first())
-udf = UserDefinedFunction(lambda p: p.y, DoubleType())
+udf = F.udf(lambda p: p.y, DoubleType())
 self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
-udf2 = UserDefinedFunction(lambda p: PythonOnlyPoint(p.x + 1, p.y + 
1), PythonOnlyUDT())
+udf2 = F.udf(lambda p: PythonOnlyPoint(p.x + 1, p.y + 1), 
Pytho

[spark] branch master updated (b8bd03b9f78 -> 10cf85051b3)

2023-03-26 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from b8bd03b9f78 [MINOR][CORE] Remove unused variables and method in Spark 
listeners
 add 10cf85051b3 [SPARK-42914][PYTHON] Reuse 
`transformUnregisteredFunction` for `DistributedSequenceID`

No new revisions were added by this update.

Summary of changes:
 .../main/protobuf/spark/connect/expressions.proto  |   5 -
 .../sql/connect/planner/SparkConnectPlanner.scala  |  11 +-
 .../connect/planner/SparkConnectPlannerSuite.scala |  22 +---
 python/pyspark/sql/connect/expressions.py  |   5 +-
 .../pyspark/sql/connect/proto/expressions_pb2.py   | 121 +
 .../pyspark/sql/connect/proto/expressions_pb2.pyi  |  18 ---
 .../sql/tests/connect/test_connect_column.py   |   2 +-
 7 files changed, 62 insertions(+), 122 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-42911][PYTHON][3.4] Introduce more basic exceptions

2023-03-26 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 1b95b4d6cfc [SPARK-42911][PYTHON][3.4] Introduce more basic exceptions
1b95b4d6cfc is described below

commit 1b95b4d6cfc13db031c9f31729e7b551207a0cc3
Author: Takuya UESHIN 
AuthorDate: Mon Mar 27 09:26:47 2023 +0900

[SPARK-42911][PYTHON][3.4] Introduce more basic exceptions

### What changes were proposed in this pull request?

Introduces more basic exceptions.

- ArithmeticException
- ArrayIndexOutOfBoundsException
- DateTimeException
- NumberFormatException
- SparkRuntimeException

### Why are the changes needed?

There are more exceptions that Spark throws but PySpark doesn't capture.

We should introduce more basic exceptions; otherwise we still see 
`Py4JJavaError` or `SparkConnectGrpcException`.

```py
>>> spark.conf.set("spark.sql.ansi.enabled", True)
>>> spark.sql("select 1/0")
DataFrame[(1 / 0): double]
>>> spark.sql("select 1/0").show()
Traceback (most recent call last):
...
py4j.protocol.Py4JJavaError: An error occurred while calling o44.showString.
: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by 
zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If 
necessary set "spark.sql.ansi.enabled" to "false" to bypass this error.
== SQL(line 1, position 8) ==
select 1/0
   ^^^

at 
org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:225)
... JVM's stacktrace
```

```py
>>> spark.sql("select 1/0").show()
Traceback (most recent call last):
...
pyspark.errors.exceptions.connect.SparkConnectGrpcException: 
(org.apache.spark.SparkArithmeticException) [DIVIDE_BY_ZERO] Division by zero. 
Use `try_divide` to tolerate divisor being 0 and return NULL instead. If 
necessary set "spark.sql.ansi.enabled" to "false" to bypass this error.
== SQL(line 1, position 8) ==
select 1/0
   ^^^
```

### Does this PR introduce _any_ user-facing change?

The error message is more readable.

```py
>>> spark.sql("select 1/0").show()
Traceback (most recent call last):
...
pyspark.errors.exceptions.captured.ArithmeticException: [DIVIDE_BY_ZERO] 
Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL 
instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this 
error.
== SQL(line 1, position 8) ==
select 1/0
   ^^^
```

or

```py
>>> spark.sql("select 1/0").show()
Traceback (most recent call last):
...
pyspark.errors.exceptions.connect.ArithmeticException: [DIVIDE_BY_ZERO] 
Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL 
instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this 
error.
== SQL(line 1, position 8) ==
select 1/0
   ^^^
```

### How was this patch tested?

Added the related tests.

Closes #40547 from ueshin/issues/SPARK-42911/3.4/exceptions.

Authored-by: Takuya UESHIN 
Signed-off-by: Hyukjin Kwon 
---
 dev/sparktestsupport/modules.py|  2 +
 python/pyspark/errors/__init__.py  | 10 
 python/pyspark/errors/exceptions/base.py   | 36 ++-
 python/pyspark/errors/exceptions/captured.py   | 52 +++-
 python/pyspark/errors/exceptions/connect.py| 66 +
 .../sql/tests/connect/test_parity_errors.py| 36 +++
 python/pyspark/sql/tests/test_errors.py| 69 ++
 7 files changed, 255 insertions(+), 16 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 2b202bc333e..29bc39e14bf 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -476,6 +476,7 @@ pyspark_sql = Module(
 "pyspark.sql.tests.test_context",
 "pyspark.sql.tests.test_dataframe",
 "pyspark.sql.tests.test_datasources",
+"pyspark.sql.tests.test_errors",
 "pyspark.sql.tests.test_functions",
 "pyspark.sql.tests.test_group",
 "pyspark.sql.tests.pandas.test_pandas_cogrouped_map",
@@ -524,6 +525,7 @@ pyspark_connect = Module(
 "pyspark.sql.tests.connect.test_connect_function",
 "pyspark.sql.tests.connect.test_connect_column",
 "pyspark.sql.tests.connect.test_parity_datasources",
+"pyspark.sql.tests.connect.test_parity_errors",
 "pyspark.sql.tests.connect.test_parity_catalog",
 "pyspark.sql.tests.connect.test_parity_conf",
 "pyspark.sql.tests.connect.test_parity_serde",
diff --git a/pyt